Changeset 372

Show
Ignore:
Timestamp:
06/07/07 15:31:12 (2 years ago)
Author:
blackhedd
Message:

epoll support for #defer (loop-breaker)

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • version_0/ext/ed.cpp

    r367 r372  
    4444****************************************/ 
    4545 
    46 EventableDescriptor::EventableDescriptor (int sd): 
     46EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em): 
    4747        EventCallback (NULL), 
    4848        LastRead (0), 
     
    5050        MySocket (sd), 
    5151        bCloseNow (false), 
    52         bCloseAfterWriting (false) 
     52        bCloseAfterWriting (false), 
     53        bCallbackUnbind (true), 
     54        MyEventMachine (em) 
    5355{ 
    5456        /* There are three ways to close a socket, all of which should 
     
    7577        if (sd == INVALID_SOCKET) 
    7678                throw std::runtime_error ("bad eventable descriptor"); 
     79        if (MyEventMachine == NULL) 
     80                throw std::runtime_error ("bad em in eventable descriptor"); 
    7781        CreatedAt = gCurrentLoopTime; 
    7882 
     
    8993EventableDescriptor::~EventableDescriptor() 
    9094{ 
    91         if (EventCallback
     95        if (EventCallback && bCallbackUnbind
    9296                (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_UNBOUND, NULL, 0); 
    9397        Close(); 
     
    156160******************************************/ 
    157161 
    158 ConnectionDescriptor::ConnectionDescriptor (EventMachine_t *em, int sd): 
    159         EventableDescriptor (sd), 
     162ConnectionDescriptor::ConnectionDescriptor (int sd, EventMachine_t *em): 
     163        EventableDescriptor (sd, em), 
    160164        bConnectPending (false), 
    161165        bReadAttemptedAfterClose (false), 
     
    166170        bIsServer (false), 
    167171        LastIo (gCurrentLoopTime), 
    168         InactivityTimeout (0), 
    169         MyEventMachine (em) 
     172        InactivityTimeout (0) 
    170173{ 
    171174        #ifdef HAVE_EPOLL 
     
    691694 
    692695 
    693 /************************************** 
    694 AcceptorDescriptor::AcceptorDescriptor 
    695 **************************************/ 
    696  
    697 AcceptorDescriptor::AcceptorDescriptor (EventMachine_t *parent_em, int sd): 
    698         EventableDescriptor (sd), 
    699         MyEventMachine (parent_em) 
     696/**************************************** 
     697LoopbreakDescriptor::LoopbreakDescriptor 
     698****************************************/ 
     699 
     700LoopbreakDescriptor::LoopbreakDescriptor (int sd, EventMachine_t *parent_em): 
     701        EventableDescriptor (sd, parent_em) 
    700702{ 
    701703        /* This is really bad and ugly. Change someday if possible. 
     
    704706         */ 
    705707 
    706         if (!MyEventMachine) 
    707                 throw std::runtime_error ("bad event-machine passed to acceptor"); 
    708  
     708        bCallbackUnbind = false; 
     709 
     710        #ifdef HAVE_EPOLL 
     711        EpollEvent.events = EPOLLIN; 
     712        #endif 
     713
     714 
     715 
     716 
     717 
     718/************************* 
     719LoopbreakDescriptor::Read 
     720*************************/ 
     721 
     722void LoopbreakDescriptor::Read() 
     723
     724        // TODO, refactor, this code is probably in the wrong place. 
     725        assert (MyEventMachine); 
     726        MyEventMachine->_ReadLoopBreaker(); 
     727
     728 
     729 
     730/************************** 
     731LoopbreakDescriptor::Write 
     732**************************/ 
     733 
     734void LoopbreakDescriptor::Write() 
     735
     736  // Why are we here? 
     737  throw std::runtime_error ("bad code path in loopbreak"); 
     738
     739 
     740/************************************** 
     741AcceptorDescriptor::AcceptorDescriptor 
     742**************************************/ 
     743 
     744AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em): 
     745        EventableDescriptor (sd, parent_em) 
     746
    709747        #ifdef HAVE_EPOLL 
    710748        EpollEvent.events = EPOLLIN; 
     
    772810 
    773811 
    774                 ConnectionDescriptor *cd = new ConnectionDescriptor (MyEventMachine, sd); 
     812                ConnectionDescriptor *cd = new ConnectionDescriptor (sd, MyEventMachine); 
    775813                if (!cd) 
    776814                        throw std::runtime_error ("no newly accepted connection"); 
     
    816854**************************************/ 
    817855 
    818 DatagramDescriptor::DatagramDescriptor (int sd): 
    819         EventableDescriptor (sd), 
     856DatagramDescriptor::DatagramDescriptor (int sd, EventMachine_t *parent_em): 
     857        EventableDescriptor (sd, parent_em), 
    820858        OutboundDataSize (0), 
    821859        LastIo (gCurrentLoopTime), 
  • version_0/ext/ed.h

    r365 r372  
    3737{ 
    3838        public: 
    39                 EventableDescriptor (int); 
     39                EventableDescriptor (int, EventMachine_t*); 
    4040                virtual ~EventableDescriptor(); 
    4141 
     
    8787                bool bCloseNow; 
    8888                bool bCloseAfterWriting; 
     89                bool bCallbackUnbind; 
    8990 
    9091                #ifdef HAVE_EPOLL 
     
    9293                #endif 
    9394 
    94 }; 
    95  
     95                EventMachine_t *MyEventMachine; 
     96}; 
     97 
     98 
     99 
     100/************************* 
     101class LoopbreakDescriptor 
     102*************************/ 
     103 
     104class LoopbreakDescriptor: public EventableDescriptor 
     105
     106        public: 
     107                LoopbreakDescriptor (int, EventMachine_t*); 
     108                virtual ~LoopbreakDescriptor() {} 
     109 
     110                virtual void Read(); 
     111                virtual void Write(); 
     112                virtual void Heartbeat() {} 
     113 
     114                virtual bool SelectForRead() {return true;} 
     115                virtual bool SelectForWrite() {return false;} 
     116}; 
    96117 
    97118 
     
    103124{ 
    104125        public: 
    105                 ConnectionDescriptor (EventMachine_t*, int); 
     126                ConnectionDescriptor (int, EventMachine_t*); 
    106127                virtual ~ConnectionDescriptor(); 
    107128 
     
    155176                int InactivityTimeout; 
    156177 
    157         protected: 
    158                 EventMachine_t *MyEventMachine; 
    159  
    160178        private: 
    161179                void _WriteOutboundData(); 
     
    174192{ 
    175193        public: 
    176                 DatagramDescriptor (int); 
     194                DatagramDescriptor (int, EventMachine_t*); 
    177195                virtual ~DatagramDescriptor(); 
    178196 
     
    225243{ 
    226244        public: 
    227                 AcceptorDescriptor (EventMachine_t*, int); 
     245                AcceptorDescriptor (int, EventMachine_t*); 
    228246                virtual ~AcceptorDescriptor(); 
    229247 
     
    234252                virtual bool SelectForRead() {return true;} 
    235253                virtual bool SelectForWrite() {return false;} 
    236  
    237         protected: 
    238                 EventMachine_t *MyEventMachine; 
    239254}; 
    240255 
     
    246261{ 
    247262        public: 
    248                 PipeDescriptor (FILE*); 
     263                PipeDescriptor (FILE*, EventMachine_t*); 
    249264                virtual ~PipeDescriptor(); 
    250265 
  • version_0/ext/em.cpp

    r368 r372  
    193193int EventMachine_t::SetRlimitNofile (int nofiles) 
    194194{ 
     195        #ifdef OS_UNIX 
    195196        struct rlimit rlim; 
    196197        if (nofiles >= 0) { 
     
    203204        getrlimit (RLIMIT_NOFILE, &rlim); 
    204205        return rlim.rlim_cur; 
     206        #endif 
     207 
     208        #ifdef OS_WIN32 
     209        // No meaningful implementation on Windows. 
     210        return 0; 
     211        #endif 
    205212} 
    206213 
     
    292299                cloexec |= FD_CLOEXEC; 
    293300                fcntl (epfd, F_SETFD, cloexec); 
     301 
     302                assert (LoopBreakerReader >= 0); 
     303                LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this); 
     304                assert (ld); 
     305                Add (ld); 
    294306        } 
    295307        #endif 
     
    339351                for (int i=0; i < s; i++) { 
    340352                        EventableDescriptor *ed = (EventableDescriptor*) ev[i].data.ptr; 
    341                         assert (ed); 
    342353 
    343354                        if (ev[i].events & (EPOLLERR | EPOLLHUP)) 
     
    746757                 * (To wit, the ConnectionCompleted event gets sent to the client.) 
    747758                 */ 
    748                 ConnectionDescriptor *cd = new ConnectionDescriptor (this, sd); 
     759                ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); 
    749760                if (!cd) 
    750761                        throw std::runtime_error ("no connection allocated"); 
     
    764775                        // Put the connection on the stack and wait for it to complete 
    765776                        // or time out. 
    766                         ConnectionDescriptor *cd = new ConnectionDescriptor (this, sd); 
     777                        ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); 
    767778                        if (!cd) 
    768779                                throw std::runtime_error ("no connection allocated"); 
     
    784795                         * for people to know that a failure occurred. 
    785796                         */ 
    786                         ConnectionDescriptor *cd = new ConnectionDescriptor (this, sd); 
     797                        ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); 
    787798                        if (!cd) 
    788799                                throw std::runtime_error ("no connection allocated"); 
     
    811822                // Put the connection on the stack and wait for it to complete 
    812823                // or time out. 
    813                 ConnectionDescriptor *cd = new ConnectionDescriptor (this, sd); 
     824                ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); 
    814825                if (!cd) 
    815826                        throw std::runtime_error ("no connection allocated"); 
     
    888899        // we still set the "pending" flag, so some needed initializations take 
    889900        // place. 
    890         ConnectionDescriptor *cd = new ConnectionDescriptor (this, fd); 
     901        ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this); 
    891902        if (!cd) 
    892903                throw std::runtime_error ("no connection allocated"); 
     
    980991 
    981992        { // Looking good. 
    982                 AcceptorDescriptor *ad = new AcceptorDescriptor (this, sd_accept); 
     993                AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this); 
    983994                if (!ad) 
    984995                        throw std::runtime_error ("unable to allocate acceptor"); 
     
    10411052 
    10421053        { // Looking good. 
    1043                 DatagramDescriptor *ds = new DatagramDescriptor (sd); 
     1054                DatagramDescriptor *ds = new DatagramDescriptor (sd, this); 
    10441055                if (!ds) 
    10451056                        throw std::runtime_error ("unable to allocate datagram-socket"); 
     
    11271138  int fd = open (filename, O_CREAT|O_TRUNC|O_WRONLY|O_NONBLOCK, 0644); 
    11281139   
    1129         FileStreamDescriptor *fsd = new FileStreamDescriptor (fd); 
     1140        FileStreamDescriptor *fsd = new FileStreamDescriptor (fd, this); 
    11301141  if (!fsd) 
    11311142        throw std::runtime_error ("no file-stream allocated"); 
     
    12041215 
    12051216        { // Looking good. 
    1206                 AcceptorDescriptor *ad = new AcceptorDescriptor (this, sd_accept); 
     1217                AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this); 
    12071218                if (!ad) 
    12081219                        throw std::runtime_error ("unable to allocate acceptor"); 
     
    12501261 
    12511262        { // Looking good. 
    1252                 PipeDescriptor *pd = new PipeDescriptor (fp); 
     1263                PipeDescriptor *pd = new PipeDescriptor (fp, this); 
    12531264                if (!pd) 
    12541265                        throw std::runtime_error ("unable to allocate pipe"); 
  • version_0/ext/em.h

    r366 r372  
    101101                void _AddNewDescriptors(); 
    102102                void _InitializeLoopBreaker(); 
    103                 void _ReadLoopBreaker(); 
    104103 
    105104                bool _RunSelectOnce(); 
    106105                bool _RunEpollOnce(); 
     106 
     107        public: 
     108                void _ReadLoopBreaker(); 
    107109 
    108110        private: 
  • version_0/ext/files.cpp

    r325 r372  
    2525******************************************/ 
    2626 
    27 FileStreamDescriptor::FileStreamDescriptor (int fd): 
    28         EventableDescriptor (fd), 
     27FileStreamDescriptor::FileStreamDescriptor (int fd, EventMachine_t *em): 
     28        EventableDescriptor (fd, em), 
    2929        OutboundDataSize (0) 
    3030{ 
  • version_0/ext/files.h

    r325 r372  
    3131{ 
    3232        public: 
    33                 FileStreamDescriptor (int); 
     33                FileStreamDescriptor (int, EventMachine_t*); 
    3434                virtual ~FileStreamDescriptor(); 
    3535 
  • version_0/ext/pipe.cpp

    r337 r372  
    2525******************************/ 
    2626 
    27 PipeDescriptor::PipeDescriptor (FILE *fp): 
    28         EventableDescriptor (fileno (fp)), 
     27PipeDescriptor::PipeDescriptor (FILE *fp, EventMachine_t *parent_em): 
     28        EventableDescriptor (fileno (fp), parent_em), 
    2929        bReadAttemptedAfterClose (false), 
    3030        LastIo (gCurrentLoopTime), 
  • version_0/tests/test_epoll.rb

    r371 r372  
    5353                end 
    5454                def receive_data data 
     55                        raise "bad response" unless data == "ABCDE" 
    5556                end 
    5657                def unbind 
     
    7475        # up past 512. (Each connection uses two sockets, a client and a server.) 
    7576        # (Will require running the test as root) 
     77        # This test exercises TCP clients and servers. 
    7678        # 
    7779        def test_descriptors 
     
    8284                        $n = 0 
    8385                        $max = 0 
    84                         400.times { 
     86                        100.times { 
    8587                                EM.connect("127.0.0.1", 9800, TestEchoClient) {$n += 1} 
    8688                        } 
    8789                } 
    8890                assert_equal(0, $n) 
    89                 assert_equal(400, $max) 
     91                assert_equal(100, $max) 
    9092        end 
     93 
     94        def test_defer 
     95                $n = 0 
     96                EM.epoll 
     97                EM.run { 
     98                        sleep_proc = proc {sleep 1} 
     99                        return_proc = proc {$n += 1; EM.stop} 
     100                        EM.defer sleep_proc, return_proc 
     101                } 
     102                assert_equal( 1, $n ) 
     103        end 
     104 
    91105end