Changeset 626
- Timestamp:
- 12/30/07 00:22:11 (11 months ago)
- Files:
-
- version_0/ChangeLog (modified) (1 diff)
- version_0/ext/cmain.cpp (modified) (3 diffs)
- version_0/ext/ed.cpp (modified) (8 diffs)
- version_0/ext/em.cpp (modified) (8 diffs)
- version_0/ext/em.h (modified) (4 diffs)
- version_0/ext/eventmachine.h (modified) (1 diff)
- version_0/ext/extconf.rb (modified) (2 diffs)
- version_0/ext/kb.cpp (modified) (1 diff)
- version_0/ext/pipe.cpp (modified) (1 diff)
- version_0/ext/project.h (modified) (1 diff)
- version_0/ext/rubymain.cpp (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
version_0/ChangeLog
r623 r626 103 103 27Dec07: Removed the hookable error handler. No one was using it and it significantly 104 104 degraded performance. 105 30Dec07: Implemented Kqueue support for OSX and BSD. 106 version_0/ext/cmain.cpp
r592 r626 23 23 static EventMachine_t *EventMachine; 24 24 static int bUseEpoll = 0; 25 static int bUseKqueue = 0; 25 26 26 27 … … 39 40 if (bUseEpoll) 40 41 EventMachine->_UseEpoll(); 42 if (bUseKqueue) 43 EventMachine->_UseKqueue(); 41 44 } 42 45 … … 417 420 { 418 421 bUseEpoll = 1; 422 } 423 424 /************ 425 evma__kqueue 426 ************/ 427 428 extern "C" void evma__kqueue() 429 { 430 bUseKqueue = 1; 419 431 } 420 432 version_0/ext/ed.cpp
r579 r626 188 188 EpollEvent.events = EPOLLOUT; 189 189 #endif 190 #ifdef HAVE_KQUEUE 191 MyEventMachine->ArmKqueueWriter (this); 192 #endif 190 193 } 191 194 … … 321 324 assert (MyEventMachine); 322 325 MyEventMachine->Modify (this); 326 #endif 327 #ifdef HAVE_KQUEUE 328 MyEventMachine->ArmKqueueWriter (this); 323 329 #endif 324 330 return length; … … 532 538 EpollEvent.events = EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0); 533 539 #endif 540 #ifdef HAVE_KQUEUE 541 MyEventMachine->ArmKqueueReader (this); 542 // The callback may have scheduled outbound data. 543 if (SelectForWrite()) 544 MyEventMachine->ArmKqueueWriter (this); 545 #endif 534 546 } 535 547 else … … 614 626 assert (MyEventMachine); 615 627 MyEventMachine->Modify (this); 628 #endif 629 #ifdef HAVE_KQUEUE 630 if (SelectForWrite()) { 631 MyEventMachine->ArmKqueueWriter (this); 632 cerr << "POW\n"; 633 } 616 634 #endif 617 635 } … … 798 816 EpollEvent.events = EPOLLIN; 799 817 #endif 818 #ifdef HAVE_KQUEUE 819 MyEventMachine->ArmKqueueReader (this); 820 #endif 800 821 } 801 822 … … 834 855 #ifdef HAVE_EPOLL 835 856 EpollEvent.events = EPOLLIN; 857 #endif 858 #ifdef HAVE_KQUEUE 859 MyEventMachine->ArmKqueueReader (this); 836 860 #endif 837 861 } … … 923 947 assert (MyEventMachine); 924 948 MyEventMachine->Add (cd); 949 #ifdef HAVE_KQUEUE 950 if (cd->SelectForWrite()) 951 MyEventMachine->ArmKqueueWriter (cd); 952 MyEventMachine->ArmKqueueReader (cd); 953 #endif 925 954 } 926 955 … … 987 1016 EpollEvent.events = EPOLLIN; 988 1017 #endif 1018 #ifdef HAVE_KQUEUE 1019 MyEventMachine->ArmKqueueReader (this); 1020 #endif 989 1021 } 990 1022 version_0/ext/em.cpp
r625 r626 72 72 LoopBreakerWriter (-1), 73 73 bEpoll (false), 74 bKqueue (false), 74 75 epfd (-1) 75 76 { … … 116 117 if (epfd != -1) 117 118 close (epfd); 119 if (kqfd != -1) 120 close (kqfd); 118 121 } 119 122 … … 134 137 #ifdef HAVE_EPOLL 135 138 bEpoll = true; 139 #endif 140 } 141 142 /************************** 143 EventMachine_t::_UseKqueue 144 **************************/ 145 146 void EventMachine_t::_UseKqueue() 147 { 148 /* Temporary. 149 * See comments under _UseEpoll. 150 */ 151 152 #ifdef HAVE_KQUEUE 153 bKqueue = true; 136 154 #endif 137 155 } … … 338 356 #endif 339 357 358 #ifdef HAVE_KQUEUE 359 if (bKqueue) { 360 kqfd = kqueue(); 361 if (kqfd == -1) { 362 char buf[200]; 363 snprintf (buf, sizeof(buf)-1, "unable to create kqueue descriptor: %s", strerror(errno)); 364 throw std::runtime_error (buf); 365 } 366 // cloexec not needed. By definition, kqueues are not carried across forks. 367 368 assert (LoopBreakerReader >= 0); 369 LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this); 370 assert (ld); 371 Add (ld); 372 } 373 #endif 374 340 375 while (true) { 341 376 gCurrentLoopTime = time(NULL); … … 370 405 if (bEpoll) 371 406 return _RunEpollOnce(); 407 else if (bKqueue) 408 return _RunKqueueOnce(); 372 409 else 373 410 return _RunSelectOnce(); … … 466 503 #else 467 504 throw std::runtime_error ("epoll is not implemented on this platform"); 505 #endif 506 } 507 508 509 /****************************** 510 EventMachine_t::_RunKqueueOnce 511 ******************************/ 512 513 bool EventMachine_t::_RunKqueueOnce() 514 { 515 #ifdef HAVE_KQUEUE 516 assert (kqfd != -1); 517 const int maxKevents = 2000; 518 struct kevent Karray [maxKevents]; 519 struct timespec ts = {0, 10000000}; // Too frequent. Use blocking_region 520 521 int k = kevent (kqfd, NULL, 0, Karray, maxKevents, &ts); 522 struct kevent *ke = Karray; 523 while (k > 0) { 524 EventableDescriptor *ed = (EventableDescriptor*) (ke->udata); 525 assert (ed); 526 527 if (ke->filter == EVFILT_READ) 528 ed->Read(); 529 else if (ke->filter == EVFILT_WRITE) 530 ed->Write(); 531 else 532 cerr << "Discarding unknown kqueue event " << ke->filter << endl; 533 534 --k; 535 ++ke; 536 } 537 538 { // cleanup dying sockets 539 // vector::pop_back works in constant time. 540 // TODO, rip this out and only delete the descriptors we know have died, 541 // rather than traversing the whole list. 542 // In kqueue, closing a descriptor automatically removes its event filters. 543 544 int i, j; 545 int nSockets = Descriptors.size(); 546 for (i=0, j=0; i < nSockets; i++) { 547 EventableDescriptor *ed = Descriptors[i]; 548 assert (ed); 549 if (ed->ShouldDelete()) { 550 ModifiedDescriptors.erase (ed); 551 delete ed; 552 } 553 else 554 Descriptors [j++] = ed; 555 } 556 while ((size_t)j < Descriptors.size()) 557 Descriptors.pop_back(); 558 559 } 560 561 { // dispatch heartbeats 562 if (gCurrentLoopTime >= NextHeartbeatTime) { 563 NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval; 564 565 for (int i=0; i < Descriptors.size(); i++) { 566 EventableDescriptor *ed = Descriptors[i]; 567 assert (ed); 568 ed->Heartbeat(); 569 } 570 } 571 } 572 573 574 // TODO, replace this with rb_thread_blocking_region for 1.9 builds. 575 timeval tv = {0,0}; 576 EmSelect (0, NULL, NULL, NULL, &tv); 577 578 return true; 579 #else 580 throw std::runtime_error ("kqueue is not implemented on this platform"); 468 581 #endif 469 582 } … … 1189 1302 1190 1303 1304 /******************************* 1305 EventMachine_t::ArmKqueueWriter 1306 *******************************/ 1307 1308 void EventMachine_t::ArmKqueueWriter (EventableDescriptor *ed) 1309 { 1310 #ifdef HAVE_KQUEUE 1311 if (bKqueue) { 1312 if (!ed) 1313 throw std::runtime_error ("added bad descriptor"); 1314 struct kevent k; 1315 EV_SET (&k, ed->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, ed); 1316 int t = kevent (kqfd, &k, 1, NULL, 0, NULL); 1317 assert (t == 0); 1318 } 1319 #endif 1320 } 1321 1322 /******************************* 1323 EventMachine_t::ArmKqueueReader 1324 *******************************/ 1325 1326 void EventMachine_t::ArmKqueueReader (EventableDescriptor *ed) 1327 { 1328 #ifdef HAVE_KQUEUE 1329 if (bKqueue) { 1330 if (!ed) 1331 throw std::runtime_error ("added bad descriptor"); 1332 struct kevent k; 1333 EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed); 1334 int t = kevent (kqfd, &k, 1, NULL, 0, NULL); 1335 assert (t == 0); 1336 } 1337 #endif 1338 } 1339 1191 1340 /********************************** 1192 1341 EventMachine_t::_AddNewDescriptors … … 1221 1370 } 1222 1371 } 1372 #endif 1373 1374 #if HAVE_KQUEUE 1375 /* 1376 if (bKqueue) { 1377 // INCOMPLETE. Some descriptors don't want to be readable. 1378 assert (kqfd != -1); 1379 struct kevent k; 1380 EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed); 1381 int t = kevent (kqfd, &k, 1, NULL, 0, NULL); 1382 assert (t == 0); 1383 } 1384 */ 1223 1385 #endif 1224 1386 version_0/ext/em.h
r625 r626 80 80 void Add (EventableDescriptor*); 81 81 void Modify (EventableDescriptor*); 82 void ArmKqueueWriter (EventableDescriptor*); 83 void ArmKqueueReader (EventableDescriptor*); 82 84 83 85 void SetTimerQuantum (int); … … 89 91 // Temporary: 90 92 void _UseEpoll(); 93 void _UseKqueue(); 91 94 92 95 … … 100 103 bool _RunSelectOnce(); 101 104 bool _RunEpollOnce(); 105 bool _RunKqueueOnce(); 102 106 103 107 void _ModifyEpollEvent (EventableDescriptor*); … … 134 138 bool bEpoll; 135 139 int epfd; // Epoll file-descriptor 140 141 bool bKqueue; 142 int kqfd; // Kqueue file-descriptor 136 143 }; 137 144 version_0/ext/eventmachine.h
r591 r626 72 72 // Temporary: 73 73 void evma__epoll(); 74 void evma__kqueue(); 74 75 75 76 #if __cplusplus version_0/ext/extconf.rb
r625 r626 82 82 flags << '-DBUILD_FOR_RUBY' 83 83 84 if have_header("sys/event.h") and have_header("sys/queue.h") 85 flags << "-DHAVE_KQUEUE" 86 end 87 84 88 dir_config('ssl') 85 89 if have_library('ssl') and … … 154 158 flags << '-DBUILD_FOR_RUBY' 155 159 160 if have_header("sys/event.h") and have_header("sys/queue.h") 161 flags << "-DHAVE_KQUEUE" 162 end 163 156 164 dir_config('ssl') 157 165 if have_library('ssl') and version_0/ext/kb.cpp
r509 r626 33 33 #ifdef HAVE_EPOLL 34 34 EpollEvent.events = EPOLLIN; 35 #endif 36 #ifdef HAVE_KQUEUE 37 MyEventMachine->ArmKqueueReader (this); 35 38 #endif 36 39 } version_0/ext/pipe.cpp
r592 r626 38 38 #ifdef HAVE_EPOLL 39 39 EpollEvent.events = EPOLLIN; 40 #endif 41 #ifdef HAVE_KQUEUE 42 MyEventMachine->ArmKqueueReader (this); 40 43 #endif 41 44 } version_0/ext/project.h
r497 r626 97 97 #endif 98 98 99 #ifdef HAVE_KQUEUE 100 #include <sys/event.h> 101 #include <sys/queue.h> 102 #endif 103 99 104 #include "binder.h" 100 105 #include "em.h" version_0/ext/rubymain.cpp
r625 r626 457 457 // Temporary. 458 458 evma__epoll(); 459 return Qnil; 460 } 461 462 /********* 463 t__kqueue 464 *********/ 465 466 static VALUE t__kqueue (VALUE self) 467 { 468 // Temporary. 469 evma__kqueue(); 459 470 return Qnil; 460 471 } … … 580 591 // Temporary: 581 592 rb_define_module_function (EmModule, "epoll", (VALUE(*)(...))t__epoll, 0); 593 rb_define_module_function (EmModule, "kqueue", (VALUE(*)(...))t__kqueue, 0); 582 594 583 595 rb_define_method (EmConnection, "get_outbound_data_size", (VALUE(*)(...))conn_get_outbound_data_size, 0);