Changeset 337

Show
Ignore:
Timestamp:
05/31/07 11:22:12 (2 years ago)
Author:
blackhedd
Message:

more work on the popen implementation.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • version_0/ext/ed.cpp

    r334 r337  
    191191{ 
    192192        // TODO: This is something of a hack, or at least it's a static method of the wrong class. 
     193        // TODO: Poor polymorphism here. We should be calling one virtual method 
     194        // instead of hacking out the runtime information of the target object. 
    193195        ConnectionDescriptor *cd = dynamic_cast <ConnectionDescriptor*> (Bindable_t::GetObject (binding)); 
    194196        if (cd) 
     
    197199        if (ds) 
    198200                return ds->SendOutboundData (data, data_length); 
     201        PipeDescriptor *ps = dynamic_cast <PipeDescriptor*> (Bindable_t::GetObject (binding)); 
     202        if (ps) 
     203                return ps->SendOutboundData (data, data_length); 
    199204        return -1; 
    200205} 
  • version_0/ext/ed.h

    r336 r337  
    6363                virtual bool GetPeername (struct sockaddr*) {return false;} 
    6464 
    65     virtual void StartTls() {} 
    66  
    67     // Properties: return 0/1 to signify T/F, and handle the values 
    68     // through arguments. 
    69     virtual int GetCommInactivityTimeout (int *value) {return 0;} 
    70     virtual int SetCommInactivityTimeout (int *value) {return 0;} 
     65               virtual void StartTls() {} 
     66 
     67               // Properties: return 0/1 to signify T/F, and handle the values 
     68               // through arguments. 
     69               virtual int GetCommInactivityTimeout (int *value) {return 0;} 
     70               virtual int SetCommInactivityTimeout (int *value) {return 0;} 
    7171 
    7272        protected: 
     
    119119                virtual bool GetPeername (struct sockaddr*); 
    120120 
    121     virtual int GetCommInactivityTimeout (int *value); 
    122     virtual int SetCommInactivityTimeout (int *value); 
     121               virtual int GetCommInactivityTimeout (int *value); 
     122               virtual int SetCommInactivityTimeout (int *value); 
    123123 
    124124        protected: 
     
    244244                virtual bool SelectForWrite(); 
    245245 
    246         protected: 
    247                 EventMachine_t *MyEventMachine; 
     246                int SendOutboundData (const char*, int); 
     247                virtual int GetOutboundDataSize() {return OutboundDataSize;} 
     248 
     249        protected: 
     250                struct OutboundPage { 
     251                        OutboundPage (const char *b, int l, int o=0): Buffer(b), Length(l), Offset(o) {} 
     252                        void Free() {if (Buffer) free ((char*)Buffer); } 
     253                        const char *Buffer; 
     254                        int Length; 
     255                        int Offset; 
     256                }; 
     257 
     258        protected: 
     259                bool bReadAttemptedAfterClose; 
     260                time_t LastIo; 
     261                int InactivityTimeout; 
     262                FILE *MyStream; 
     263 
     264                deque<OutboundPage> OutboundPages; 
     265                int OutboundDataSize; 
     266 
     267        private: 
     268                void _DispatchInboundData (const char *buffer, int size); 
    248269}; 
    249270 
  • version_0/ext/pipe.cpp

    r336 r337  
    2626 
    2727PipeDescriptor::PipeDescriptor (FILE *fp): 
    28         EventableDescriptor (fileno (fp)) 
    29 
    30 
     28        EventableDescriptor (fileno (fp)), 
     29        bReadAttemptedAfterClose (false), 
     30        LastIo (gCurrentLoopTime), 
     31        InactivityTimeout (0), 
     32        MyStream (fp), 
     33        OutboundDataSize (0) 
     34
     35
     36 
     37/******************************* 
     38PipeDescriptor::~PipeDescriptor 
     39*******************************/ 
     40 
     41PipeDescriptor::~PipeDescriptor() 
     42
     43        // Run down any stranded outbound data. 
     44        for (size_t i=0; i < OutboundPages.size(); i++) 
     45                OutboundPages[i].Free(); 
     46 
     47        /* As a virtual destructor, we come here before the base-class 
     48         * destructor that closes our file-descriptor. Calling pclose 
     49         * doesn't seem to bother the base-class destructor any, and it's 
     50         * required for cleaning up the subprocess zombie. 
     51         * Eventually we may need to refactor some of this stuff if there 
     52         * are undesirable interactions. 
     53         * Note that calling pclose on a still-running subprocess called 
     54         * with mode "r" will often cause the subprocess to catch SIGPIPE. 
     55         * This is part of the behavior of popen and not something EM is doing. 
     56         * 
     57         * Something weirder and worse happens with mode "w" - 
     58         * pclose WILL HANG irretrievably if the subprocess doesn't 
     59         * close when we tell it to. It will see a close on its end 
     60         * of the pipe (the read end), but that doesn't mean it will 
     61         * be written so as to exit when that happens. 
     62         * pclose waits for the subprocess to terminate. 
     63         * (Is there a flavor of popen that has a timeout or a no-hang 
     64         * option?) 
     65         * 
     66         * pclose returns the termination status of the subprocess. 
     67         * Someday we may need to make that available to the caller, 
     68         * possibly as an argument to the UNBOUND event. 
     69         */ 
     70        pclose (MyStream); 
     71
     72 
     73 
     74 
     75/******************** 
     76PipeDescriptor::Read 
     77********************/ 
     78 
     79void PipeDescriptor::Read() 
     80
     81        int sd = GetSocket(); 
     82        if (sd == INVALID_SOCKET) { 
     83                assert (!bReadAttemptedAfterClose); 
     84                bReadAttemptedAfterClose = true; 
     85                return; 
     86        } 
     87 
     88        LastIo = gCurrentLoopTime; 
     89 
     90        int total_bytes_read = 0; 
     91        char readbuffer [16 * 1024]; 
     92 
     93        for (int i=0; i < 10; i++) { 
     94                // Don't read just one buffer and then move on. This is faster 
     95                // if there is a lot of incoming. 
     96                // But don't read indefinitely. Give other sockets a chance to run. 
     97                // NOTICE, we're reading one less than the buffer size. 
     98                // That's so we can put a guard byte at the end of what we send 
     99                // to user code. 
     100                // Use read instead of recv, which on Linux gives a "socket operation 
     101                // on nonsocket" error. 
     102                 
     103 
     104                int r = read (sd, readbuffer, sizeof(readbuffer) - 1); 
     105                //cerr << "<R:" << r << ">"; 
     106 
     107                if (r > 0) { 
     108                        total_bytes_read += r; 
     109                        LastRead = gCurrentLoopTime; 
     110 
     111                        // Add a null-terminator at the the end of the buffer 
     112                        // that we will send to the callback. 
     113                        // DO NOT EVER CHANGE THIS. We want to explicitly allow users 
     114                        // to be able to depend on this behavior, so they will have 
     115                        // the option to do some things faster. Additionally it's 
     116                        // a security guard against buffer overflows. 
     117                        readbuffer [r] = 0; 
     118                        if (EventCallback) 
     119                                (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r); 
     120                        } 
     121                else if (r == 0) { 
     122                        break; 
     123                } 
     124                else { 
     125                        // Basically a would-block, meaning we've read everything there is to read. 
     126                        break; 
     127                } 
     128 
     129        } 
     130 
     131 
     132        if (total_bytes_read == 0) { 
     133                // If we read no data on a socket that selected readable, 
     134                // it generally means the other end closed the connection gracefully. 
     135                bCloseNow = true; 
     136        } 
     137 
     138
     139 
     140/********************* 
     141PipeDescriptor::Write 
     142*********************/ 
     143 
     144void PipeDescriptor::Write() 
     145
     146        int sd = GetSocket(); 
     147        assert (sd != INVALID_SOCKET); 
     148 
     149        LastIo = gCurrentLoopTime; 
     150        char output_buffer [16 * 1024]; 
     151        size_t nbytes = 0; 
     152 
     153        while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { 
     154                OutboundPage *op = &(OutboundPages[0]); 
     155                if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { 
     156                        memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); 
     157                        nbytes += (op->Length - op->Offset); 
     158                        op->Free(); 
     159                        OutboundPages.pop_front(); 
     160                } 
     161                else { 
     162                        int len = sizeof(output_buffer) - nbytes; 
     163                        memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); 
     164                        op->Offset += len; 
     165                        nbytes += len; 
     166                } 
     167        } 
     168 
     169        // We should never have gotten here if there were no data to write, 
     170        // so assert that as a sanity check. 
     171        // Don't bother to make sure nbytes is less than output_buffer because 
     172        // if it were we probably would have crashed already. 
     173        assert (nbytes > 0); 
     174 
     175        assert (GetSocket() != INVALID_SOCKET); 
     176        int bytes_written = write (GetSocket(), output_buffer, nbytes); 
     177 
     178        if (bytes_written > 0) { 
     179                OutboundDataSize -= bytes_written; 
     180                if ((size_t)bytes_written < nbytes) { 
     181                        int len = nbytes - bytes_written; 
     182                        char *buffer = (char*) malloc (len + 1); 
     183                        if (!buffer) 
     184                                throw std::runtime_error ("bad alloc throwing back data"); 
     185                        memcpy (buffer, output_buffer + bytes_written, len); 
     186                        buffer [len] = 0; 
     187                        OutboundPages.push_front (OutboundPage (buffer, len)); 
     188                } 
     189        } 
     190        else { 
     191                #ifdef OS_UNIX 
     192                if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) 
     193                #endif 
     194                #ifdef OS_WIN32 
     195                if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK)) 
     196                #endif 
     197                        Close(); 
     198        } 
     199
     200 
     201 
     202/************************* 
     203PipeDescriptor::Heartbeat 
     204*************************/ 
     205 
     206void PipeDescriptor::Heartbeat() 
     207
     208        // If an inactivity timeout is defined, then check for it. 
     209        if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) 
     210                bCloseNow = true; 
     211
     212 
     213 
     214/***************************** 
     215PipeDescriptor::SelectForRead 
     216*****************************/ 
     217 
     218bool PipeDescriptor::SelectForRead() 
     219
     220        /* Pipe descriptors, being local by definition, don't have 
     221         * a pending state, so this is simpler than for the 
     222         * ConnectionDescriptor object. 
     223         */ 
     224        return true; 
     225
     226 
     227/****************************** 
     228PipeDescriptor::SelectForWrite 
     229******************************/ 
     230 
     231bool PipeDescriptor::SelectForWrite() 
     232
     233        /* Pipe descriptors, being local by definition, don't have 
     234         * a pending state, so this is simpler than for the 
     235         * ConnectionDescriptor object. 
     236         */ 
     237        return (GetOutboundDataSize() > 0); 
     238
     239 
     240 
     241 
     242 
     243/********************************* 
     244PipeDescriptor::SendOutboundData 
     245********************************/ 
     246 
     247int PipeDescriptor::SendOutboundData (const char *data, int length) 
     248
     249        if (bCloseNow || bCloseAfterWriting) 
     250                return 0; 
     251 
     252        if (!data && (length > 0)) 
     253                throw std::runtime_error ("bad outbound data"); 
     254        char *buffer = (char *) malloc (length + 1); 
     255        if (!buffer) 
     256                throw std::runtime_error ("no allocation for outbound data"); 
     257        memcpy (buffer, data, length); 
     258        buffer [length] = 0; 
     259        OutboundPages.push_back (OutboundPage (buffer, length)); 
     260        OutboundDataSize += length; 
     261        return length; 
     262
     263 
  • version_0/lib/eventmachine.rb

    r336 r337  
    818818  #-- 
    819819  # 
    820   def self::popen cmd, mode="r" 
    821           EventMachine::invoke_popen cmd, mode 
     820  def self::popen cmd, mode="r", handler=nil 
     821        klass = if (handler and handler.is_a?(Class)) 
     822                handler 
     823        else 
     824                Class.new( Connection ) {handler and include handler} 
     825        end 
     826 
     827        s = invoke_popen cmd, mode 
     828        c = klass.new s 
     829        @conns[s] = c 
     830        yield(c) if block_given? 
     831        c 
    822832  end 
    823833