Changeset 626

Show
Ignore:
Timestamp:
12/30/07 00:22:11 (11 months ago)
Author:
blackhedd
Message:

implemented kqueue support

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • version_0/ChangeLog

    r623 r626  
    10310327Dec07: Removed the hookable error handler. No one was using it and it significantly 
    104104        degraded performance. 
     10530Dec07: Implemented Kqueue support for OSX and BSD. 
     106 
  • version_0/ext/cmain.cpp

    r592 r626  
    2323static EventMachine_t *EventMachine; 
    2424static int bUseEpoll = 0; 
     25static int bUseKqueue = 0; 
    2526 
    2627 
     
    3940        if (bUseEpoll) 
    4041                EventMachine->_UseEpoll(); 
     42        if (bUseKqueue) 
     43                EventMachine->_UseKqueue(); 
    4144} 
    4245 
     
    417420{ 
    418421        bUseEpoll = 1; 
     422} 
     423 
     424/************ 
     425evma__kqueue 
     426************/ 
     427 
     428extern "C" void evma__kqueue() 
     429{ 
     430        bUseKqueue = 1; 
    419431} 
    420432 
  • version_0/ext/ed.cpp

    r579 r626  
    188188        EpollEvent.events = EPOLLOUT; 
    189189        #endif 
     190        #ifdef HAVE_KQUEUE 
     191        MyEventMachine->ArmKqueueWriter (this); 
     192        #endif 
    190193} 
    191194 
     
    321324        assert (MyEventMachine); 
    322325        MyEventMachine->Modify (this); 
     326        #endif 
     327        #ifdef HAVE_KQUEUE 
     328        MyEventMachine->ArmKqueueWriter (this); 
    323329        #endif 
    324330        return length; 
     
    532538                        EpollEvent.events = EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0); 
    533539                        #endif 
     540                        #ifdef HAVE_KQUEUE 
     541                        MyEventMachine->ArmKqueueReader (this); 
     542                        // The callback may have scheduled outbound data. 
     543                        if (SelectForWrite()) 
     544                                MyEventMachine->ArmKqueueWriter (this); 
     545                        #endif 
    534546                } 
    535547                else 
     
    614626                assert (MyEventMachine); 
    615627                MyEventMachine->Modify (this); 
     628                #endif 
     629                #ifdef HAVE_KQUEUE 
     630                if (SelectForWrite()) { 
     631                        MyEventMachine->ArmKqueueWriter (this); 
     632                        cerr << "POW\n"; 
     633                } 
    616634                #endif 
    617635        } 
     
    798816        EpollEvent.events = EPOLLIN; 
    799817        #endif 
     818        #ifdef HAVE_KQUEUE 
     819        MyEventMachine->ArmKqueueReader (this); 
     820        #endif 
    800821} 
    801822 
     
    834855        #ifdef HAVE_EPOLL 
    835856        EpollEvent.events = EPOLLIN; 
     857        #endif 
     858        #ifdef HAVE_KQUEUE 
     859        MyEventMachine->ArmKqueueReader (this); 
    836860        #endif 
    837861} 
     
    923947                assert (MyEventMachine); 
    924948                MyEventMachine->Add (cd); 
     949                #ifdef HAVE_KQUEUE 
     950                if (cd->SelectForWrite()) 
     951                        MyEventMachine->ArmKqueueWriter (cd); 
     952                MyEventMachine->ArmKqueueReader (cd); 
     953                #endif 
    925954        } 
    926955 
     
    9871016        EpollEvent.events = EPOLLIN; 
    9881017        #endif 
     1018        #ifdef HAVE_KQUEUE 
     1019        MyEventMachine->ArmKqueueReader (this); 
     1020        #endif 
    9891021} 
    9901022 
  • version_0/ext/em.cpp

    r625 r626  
    7272        LoopBreakerWriter (-1), 
    7373        bEpoll (false), 
     74        bKqueue (false), 
    7475        epfd (-1) 
    7576{ 
     
    116117        if (epfd != -1) 
    117118                close (epfd); 
     119        if (kqfd != -1) 
     120                close (kqfd); 
    118121} 
    119122 
     
    134137        #ifdef HAVE_EPOLL 
    135138        bEpoll = true; 
     139        #endif 
     140} 
     141 
     142/************************** 
     143EventMachine_t::_UseKqueue 
     144**************************/ 
     145 
     146void EventMachine_t::_UseKqueue() 
     147{ 
     148        /* Temporary. 
     149         * See comments under _UseEpoll. 
     150         */ 
     151 
     152        #ifdef HAVE_KQUEUE 
     153        bKqueue = true; 
    136154        #endif 
    137155} 
     
    338356        #endif 
    339357 
     358        #ifdef HAVE_KQUEUE 
     359        if (bKqueue) { 
     360                kqfd = kqueue(); 
     361                if (kqfd == -1) { 
     362                        char buf[200]; 
     363                        snprintf (buf, sizeof(buf)-1, "unable to create kqueue descriptor: %s", strerror(errno)); 
     364                        throw std::runtime_error (buf); 
     365                } 
     366                // cloexec not needed. By definition, kqueues are not carried across forks. 
     367 
     368                assert (LoopBreakerReader >= 0); 
     369                LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this); 
     370                assert (ld); 
     371                Add (ld); 
     372        } 
     373        #endif 
     374 
    340375        while (true) { 
    341376                gCurrentLoopTime = time(NULL); 
     
    370405        if (bEpoll) 
    371406                return _RunEpollOnce(); 
     407        else if (bKqueue) 
     408                return _RunKqueueOnce(); 
    372409        else 
    373410                return _RunSelectOnce(); 
     
    466503        #else 
    467504        throw std::runtime_error ("epoll is not implemented on this platform"); 
     505        #endif 
     506} 
     507 
     508 
     509/****************************** 
     510EventMachine_t::_RunKqueueOnce 
     511******************************/ 
     512 
     513bool EventMachine_t::_RunKqueueOnce() 
     514{ 
     515        #ifdef HAVE_KQUEUE 
     516        assert (kqfd != -1); 
     517        const int maxKevents = 2000; 
     518        struct kevent Karray [maxKevents]; 
     519        struct timespec ts = {0, 10000000}; // Too frequent. Use blocking_region 
     520 
     521        int k = kevent (kqfd, NULL, 0, Karray, maxKevents, &ts); 
     522        struct kevent *ke = Karray; 
     523        while (k > 0) { 
     524                EventableDescriptor *ed = (EventableDescriptor*) (ke->udata); 
     525                assert (ed); 
     526 
     527                if (ke->filter == EVFILT_READ) 
     528                        ed->Read(); 
     529                else if (ke->filter == EVFILT_WRITE) 
     530                        ed->Write(); 
     531                else 
     532                        cerr << "Discarding unknown kqueue event " << ke->filter << endl; 
     533 
     534                --k; 
     535                ++ke; 
     536        } 
     537 
     538        { // cleanup dying sockets 
     539                // vector::pop_back works in constant time. 
     540                // TODO, rip this out and only delete the descriptors we know have died, 
     541                // rather than traversing the whole list. 
     542                // In kqueue, closing a descriptor automatically removes its event filters. 
     543 
     544                int i, j; 
     545                int nSockets = Descriptors.size(); 
     546                for (i=0, j=0; i < nSockets; i++) { 
     547                        EventableDescriptor *ed = Descriptors[i]; 
     548                        assert (ed); 
     549                        if (ed->ShouldDelete()) { 
     550                                ModifiedDescriptors.erase (ed); 
     551                                delete ed; 
     552                        } 
     553                        else 
     554                                Descriptors [j++] = ed; 
     555                } 
     556                while ((size_t)j < Descriptors.size()) 
     557                        Descriptors.pop_back(); 
     558 
     559        } 
     560 
     561        { // dispatch heartbeats 
     562                if (gCurrentLoopTime >= NextHeartbeatTime) { 
     563                        NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval; 
     564 
     565                        for (int i=0; i < Descriptors.size(); i++) { 
     566                                EventableDescriptor *ed = Descriptors[i]; 
     567                                assert (ed); 
     568                                ed->Heartbeat(); 
     569                        } 
     570                } 
     571        } 
     572 
     573 
     574        // TODO, replace this with rb_thread_blocking_region for 1.9 builds. 
     575        timeval tv = {0,0}; 
     576        EmSelect (0, NULL, NULL, NULL, &tv); 
     577 
     578        return true; 
     579        #else 
     580        throw std::runtime_error ("kqueue is not implemented on this platform"); 
    468581        #endif 
    469582} 
     
    11891302 
    11901303 
     1304/******************************* 
     1305EventMachine_t::ArmKqueueWriter 
     1306*******************************/ 
     1307 
     1308void EventMachine_t::ArmKqueueWriter (EventableDescriptor *ed) 
     1309{ 
     1310        #ifdef HAVE_KQUEUE 
     1311        if (bKqueue) { 
     1312                if (!ed) 
     1313                        throw std::runtime_error ("added bad descriptor"); 
     1314                struct kevent k; 
     1315                EV_SET (&k, ed->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, ed); 
     1316                int t = kevent (kqfd, &k, 1, NULL, 0, NULL); 
     1317                assert (t == 0); 
     1318        } 
     1319        #endif 
     1320} 
     1321 
     1322/******************************* 
     1323EventMachine_t::ArmKqueueReader 
     1324*******************************/ 
     1325 
     1326void EventMachine_t::ArmKqueueReader (EventableDescriptor *ed) 
     1327{ 
     1328        #ifdef HAVE_KQUEUE 
     1329        if (bKqueue) { 
     1330                if (!ed) 
     1331                        throw std::runtime_error ("added bad descriptor"); 
     1332                struct kevent k; 
     1333                EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed); 
     1334                int t = kevent (kqfd, &k, 1, NULL, 0, NULL); 
     1335                assert (t == 0); 
     1336        } 
     1337        #endif 
     1338} 
     1339 
    11911340/********************************** 
    11921341EventMachine_t::_AddNewDescriptors 
     
    12211370                        } 
    12221371                } 
     1372                #endif 
     1373 
     1374                #if HAVE_KQUEUE 
     1375                /* 
     1376                if (bKqueue) { 
     1377                        // INCOMPLETE. Some descriptors don't want to be readable. 
     1378                        assert (kqfd != -1); 
     1379                        struct kevent k; 
     1380                        EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed); 
     1381                        int t = kevent (kqfd, &k, 1, NULL, 0, NULL); 
     1382                        assert (t == 0); 
     1383                } 
     1384                */ 
    12231385                #endif 
    12241386 
  • version_0/ext/em.h

    r625 r626  
    8080                void Add (EventableDescriptor*); 
    8181                void Modify (EventableDescriptor*); 
     82                void ArmKqueueWriter (EventableDescriptor*); 
     83                void ArmKqueueReader (EventableDescriptor*); 
    8284 
    8385                void SetTimerQuantum (int); 
     
    8991                // Temporary: 
    9092                void _UseEpoll(); 
     93                void _UseKqueue(); 
    9194 
    9295 
     
    100103                bool _RunSelectOnce(); 
    101104                bool _RunEpollOnce(); 
     105                bool _RunKqueueOnce(); 
    102106 
    103107                void _ModifyEpollEvent (EventableDescriptor*); 
     
    134138                bool bEpoll; 
    135139                int epfd; // Epoll file-descriptor 
     140 
     141                bool bKqueue; 
     142                int kqfd; // Kqueue file-descriptor 
    136143}; 
    137144 
  • version_0/ext/eventmachine.h

    r591 r626  
    7272        // Temporary: 
    7373        void evma__epoll(); 
     74        void evma__kqueue(); 
    7475 
    7576#if __cplusplus 
  • version_0/ext/extconf.rb

    r625 r626  
    8282  flags << '-DBUILD_FOR_RUBY' 
    8383 
     84  if have_header("sys/event.h") and have_header("sys/queue.h") 
     85    flags << "-DHAVE_KQUEUE" 
     86  end 
     87 
    8488  dir_config('ssl') 
    8589  if have_library('ssl') and 
     
    154158  flags << '-DBUILD_FOR_RUBY' 
    155159 
     160  if have_header("sys/event.h") and have_header("sys/queue.h") 
     161    flags << "-DHAVE_KQUEUE" 
     162  end 
     163 
    156164  dir_config('ssl') 
    157165  if have_library('ssl') and 
  • version_0/ext/kb.cpp

    r509 r626  
    3333        #ifdef HAVE_EPOLL 
    3434        EpollEvent.events = EPOLLIN; 
     35        #endif 
     36        #ifdef HAVE_KQUEUE 
     37        MyEventMachine->ArmKqueueReader (this); 
    3538        #endif 
    3639} 
  • version_0/ext/pipe.cpp

    r592 r626  
    3838        #ifdef HAVE_EPOLL 
    3939        EpollEvent.events = EPOLLIN; 
     40        #endif 
     41        #ifdef HAVE_KQUEUE 
     42        MyEventMachine->ArmKqueueReader (this); 
    4043        #endif 
    4144} 
  • version_0/ext/project.h

    r497 r626  
    9797#endif 
    9898 
     99#ifdef HAVE_KQUEUE 
     100#include <sys/event.h> 
     101#include <sys/queue.h> 
     102#endif 
     103 
    99104#include "binder.h" 
    100105#include "em.h" 
  • version_0/ext/rubymain.cpp

    r625 r626  
    457457        // Temporary. 
    458458        evma__epoll(); 
     459        return Qnil; 
     460} 
     461 
     462/********* 
     463t__kqueue 
     464*********/ 
     465 
     466static VALUE t__kqueue (VALUE self) 
     467{ 
     468        // Temporary. 
     469        evma__kqueue(); 
    459470        return Qnil; 
    460471} 
     
    580591        // Temporary: 
    581592        rb_define_module_function (EmModule, "epoll", (VALUE(*)(...))t__epoll, 0); 
     593        rb_define_module_function (EmModule, "kqueue", (VALUE(*)(...))t__kqueue, 0); 
    582594 
    583595        rb_define_method (EmConnection, "get_outbound_data_size", (VALUE(*)(...))conn_get_outbound_data_size, 0);