28 | | EventableDescriptor (fileno (fp)) |
---|
29 | | { |
---|
30 | | } |
---|
| 28 | EventableDescriptor (fileno (fp)), |
---|
| 29 | bReadAttemptedAfterClose (false), |
---|
| 30 | LastIo (gCurrentLoopTime), |
---|
| 31 | InactivityTimeout (0), |
---|
| 32 | MyStream (fp), |
---|
| 33 | OutboundDataSize (0) |
---|
| 34 | { |
---|
| 35 | } |
---|
| 36 | |
---|
| 37 | /******************************* |
---|
| 38 | PipeDescriptor::~PipeDescriptor |
---|
| 39 | *******************************/ |
---|
| 40 | |
---|
| 41 | PipeDescriptor::~PipeDescriptor() |
---|
| 42 | { |
---|
| 43 | // Run down any stranded outbound data. |
---|
| 44 | for (size_t i=0; i < OutboundPages.size(); i++) |
---|
| 45 | OutboundPages[i].Free(); |
---|
| 46 | |
---|
| 47 | /* As a virtual destructor, we come here before the base-class |
---|
| 48 | * destructor that closes our file-descriptor. Calling pclose |
---|
| 49 | * doesn't seem to bother the base-class destructor any, and it's |
---|
| 50 | * required for cleaning up the subprocess zombie. |
---|
| 51 | * Eventually we may need to refactor some of this stuff if there |
---|
| 52 | * are undesirable interactions. |
---|
| 53 | * Note that calling pclose on a still-running subprocess called |
---|
| 54 | * with mode "r" will often cause the subprocess to catch SIGPIPE. |
---|
| 55 | * This is part of the behavior of popen and not something EM is doing. |
---|
| 56 | * |
---|
| 57 | * Something weirder and worse happens with mode "w" - |
---|
| 58 | * pclose WILL HANG irretrievably if the subprocess doesn't |
---|
| 59 | * close when we tell it to. It will see a close on its end |
---|
| 60 | * of the pipe (the read end), but that doesn't mean it will |
---|
| 61 | * be written so as to exit when that happens. |
---|
| 62 | * pclose waits for the subprocess to terminate. |
---|
| 63 | * (Is there a flavor of popen that has a timeout or a no-hang |
---|
| 64 | * option?) |
---|
| 65 | * |
---|
| 66 | * pclose returns the termination status of the subprocess. |
---|
| 67 | * Someday we may need to make that available to the caller, |
---|
| 68 | * possibly as an argument to the UNBOUND event. |
---|
| 69 | */ |
---|
| 70 | pclose (MyStream); |
---|
| 71 | } |
---|
| 72 | |
---|
| 73 | |
---|
| 74 | |
---|
| 75 | /******************** |
---|
| 76 | PipeDescriptor::Read |
---|
| 77 | ********************/ |
---|
| 78 | |
---|
| 79 | void PipeDescriptor::Read() |
---|
| 80 | { |
---|
| 81 | int sd = GetSocket(); |
---|
| 82 | if (sd == INVALID_SOCKET) { |
---|
| 83 | assert (!bReadAttemptedAfterClose); |
---|
| 84 | bReadAttemptedAfterClose = true; |
---|
| 85 | return; |
---|
| 86 | } |
---|
| 87 | |
---|
| 88 | LastIo = gCurrentLoopTime; |
---|
| 89 | |
---|
| 90 | int total_bytes_read = 0; |
---|
| 91 | char readbuffer [16 * 1024]; |
---|
| 92 | |
---|
| 93 | for (int i=0; i < 10; i++) { |
---|
| 94 | // Don't read just one buffer and then move on. This is faster |
---|
| 95 | // if there is a lot of incoming. |
---|
| 96 | // But don't read indefinitely. Give other sockets a chance to run. |
---|
| 97 | // NOTICE, we're reading one less than the buffer size. |
---|
| 98 | // That's so we can put a guard byte at the end of what we send |
---|
| 99 | // to user code. |
---|
| 100 | // Use read instead of recv, which on Linux gives a "socket operation |
---|
| 101 | // on nonsocket" error. |
---|
| 102 | |
---|
| 103 | |
---|
| 104 | int r = read (sd, readbuffer, sizeof(readbuffer) - 1); |
---|
| 105 | //cerr << "<R:" << r << ">"; |
---|
| 106 | |
---|
| 107 | if (r > 0) { |
---|
| 108 | total_bytes_read += r; |
---|
| 109 | LastRead = gCurrentLoopTime; |
---|
| 110 | |
---|
| 111 | // Add a null-terminator at the the end of the buffer |
---|
| 112 | // that we will send to the callback. |
---|
| 113 | // DO NOT EVER CHANGE THIS. We want to explicitly allow users |
---|
| 114 | // to be able to depend on this behavior, so they will have |
---|
| 115 | // the option to do some things faster. Additionally it's |
---|
| 116 | // a security guard against buffer overflows. |
---|
| 117 | readbuffer [r] = 0; |
---|
| 118 | if (EventCallback) |
---|
| 119 | (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r); |
---|
| 120 | } |
---|
| 121 | else if (r == 0) { |
---|
| 122 | break; |
---|
| 123 | } |
---|
| 124 | else { |
---|
| 125 | // Basically a would-block, meaning we've read everything there is to read. |
---|
| 126 | break; |
---|
| 127 | } |
---|
| 128 | |
---|
| 129 | } |
---|
| 130 | |
---|
| 131 | |
---|
| 132 | if (total_bytes_read == 0) { |
---|
| 133 | // If we read no data on a socket that selected readable, |
---|
| 134 | // it generally means the other end closed the connection gracefully. |
---|
| 135 | bCloseNow = true; |
---|
| 136 | } |
---|
| 137 | |
---|
| 138 | } |
---|
| 139 | |
---|
| 140 | /********************* |
---|
| 141 | PipeDescriptor::Write |
---|
| 142 | *********************/ |
---|
| 143 | |
---|
| 144 | void PipeDescriptor::Write() |
---|
| 145 | { |
---|
| 146 | int sd = GetSocket(); |
---|
| 147 | assert (sd != INVALID_SOCKET); |
---|
| 148 | |
---|
| 149 | LastIo = gCurrentLoopTime; |
---|
| 150 | char output_buffer [16 * 1024]; |
---|
| 151 | size_t nbytes = 0; |
---|
| 152 | |
---|
| 153 | while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { |
---|
| 154 | OutboundPage *op = &(OutboundPages[0]); |
---|
| 155 | if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { |
---|
| 156 | memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); |
---|
| 157 | nbytes += (op->Length - op->Offset); |
---|
| 158 | op->Free(); |
---|
| 159 | OutboundPages.pop_front(); |
---|
| 160 | } |
---|
| 161 | else { |
---|
| 162 | int len = sizeof(output_buffer) - nbytes; |
---|
| 163 | memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); |
---|
| 164 | op->Offset += len; |
---|
| 165 | nbytes += len; |
---|
| 166 | } |
---|
| 167 | } |
---|
| 168 | |
---|
| 169 | // We should never have gotten here if there were no data to write, |
---|
| 170 | // so assert that as a sanity check. |
---|
| 171 | // Don't bother to make sure nbytes is less than output_buffer because |
---|
| 172 | // if it were we probably would have crashed already. |
---|
| 173 | assert (nbytes > 0); |
---|
| 174 | |
---|
| 175 | assert (GetSocket() != INVALID_SOCKET); |
---|
| 176 | int bytes_written = write (GetSocket(), output_buffer, nbytes); |
---|
| 177 | |
---|
| 178 | if (bytes_written > 0) { |
---|
| 179 | OutboundDataSize -= bytes_written; |
---|
| 180 | if ((size_t)bytes_written < nbytes) { |
---|
| 181 | int len = nbytes - bytes_written; |
---|
| 182 | char *buffer = (char*) malloc (len + 1); |
---|
| 183 | if (!buffer) |
---|
| 184 | throw std::runtime_error ("bad alloc throwing back data"); |
---|
| 185 | memcpy (buffer, output_buffer + bytes_written, len); |
---|
| 186 | buffer [len] = 0; |
---|
| 187 | OutboundPages.push_front (OutboundPage (buffer, len)); |
---|
| 188 | } |
---|
| 189 | } |
---|
| 190 | else { |
---|
| 191 | #ifdef OS_UNIX |
---|
| 192 | if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) |
---|
| 193 | #endif |
---|
| 194 | #ifdef OS_WIN32 |
---|
| 195 | if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK)) |
---|
| 196 | #endif |
---|
| 197 | Close(); |
---|
| 198 | } |
---|
| 199 | } |
---|
| 200 | |
---|
| 201 | |
---|
| 202 | /************************* |
---|
| 203 | PipeDescriptor::Heartbeat |
---|
| 204 | *************************/ |
---|
| 205 | |
---|
| 206 | void PipeDescriptor::Heartbeat() |
---|
| 207 | { |
---|
| 208 | // If an inactivity timeout is defined, then check for it. |
---|
| 209 | if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) |
---|
| 210 | bCloseNow = true; |
---|
| 211 | } |
---|
| 212 | |
---|
| 213 | |
---|
| 214 | /***************************** |
---|
| 215 | PipeDescriptor::SelectForRead |
---|
| 216 | *****************************/ |
---|
| 217 | |
---|
| 218 | bool PipeDescriptor::SelectForRead() |
---|
| 219 | { |
---|
| 220 | /* Pipe descriptors, being local by definition, don't have |
---|
| 221 | * a pending state, so this is simpler than for the |
---|
| 222 | * ConnectionDescriptor object. |
---|
| 223 | */ |
---|
| 224 | return true; |
---|
| 225 | } |
---|
| 226 | |
---|
| 227 | /****************************** |
---|
| 228 | PipeDescriptor::SelectForWrite |
---|
| 229 | ******************************/ |
---|
| 230 | |
---|
| 231 | bool PipeDescriptor::SelectForWrite() |
---|
| 232 | { |
---|
| 233 | /* Pipe descriptors, being local by definition, don't have |
---|
| 234 | * a pending state, so this is simpler than for the |
---|
| 235 | * ConnectionDescriptor object. |
---|
| 236 | */ |
---|
| 237 | return (GetOutboundDataSize() > 0); |
---|
| 238 | } |
---|
| 239 | |
---|
| 240 | |
---|
| 241 | |
---|
| 242 | |
---|
| 243 | /********************************* |
---|
| 244 | PipeDescriptor::SendOutboundData |
---|
| 245 | ********************************/ |
---|
| 246 | |
---|
| 247 | int PipeDescriptor::SendOutboundData (const char *data, int length) |
---|
| 248 | { |
---|
| 249 | if (bCloseNow || bCloseAfterWriting) |
---|
| 250 | return 0; |
---|
| 251 | |
---|
| 252 | if (!data && (length > 0)) |
---|
| 253 | throw std::runtime_error ("bad outbound data"); |
---|
| 254 | char *buffer = (char *) malloc (length + 1); |
---|
| 255 | if (!buffer) |
---|
| 256 | throw std::runtime_error ("no allocation for outbound data"); |
---|
| 257 | memcpy (buffer, data, length); |
---|
| 258 | buffer [length] = 0; |
---|
| 259 | OutboundPages.push_back (OutboundPage (buffer, length)); |
---|
| 260 | OutboundDataSize += length; |
---|
| 261 | return length; |
---|
| 262 | } |
---|
| 263 | |
---|