root/trunk/ext/em.cpp

Revision 788, 50.9 kB (checked in by raggi, 8 months ago)

Merge of branches/raggi
Most notable work and patches by Aman Gupta, Roger Pack, and James Tucker.
Patches / Tickets also submitted by: Jeremy Evans, aanand, darix, mmmurf,
danielaquino, macournoyer.

  • Moved docs into docs/ dir
  • Major refactor of rakefile, added generic rakefile helpers in tasks
  • Added example CPP build rakefile in tasks/cpp.rake
  • Moved rake tests out to tasks/tests.rake
  • Added svn ignores where appropriate
  • Fixed jruby build on older java platforms
  • Gem now builds from Rakefile rather than directly via extconf
  • Gem unified for jruby, C++ and pure ruby.
  • Correction for pure C++ build, removing ruby dependency
  • Fix for CYGWIN builds on ipv6
  • Major refactor for extconf.rb
  • Working mingw builds
  • extconf optionally uses pkg_config over manual configuration
  • extconf builds for 1.9 on any system that has 1.9
  • extconf no longer links pthread explicitly
  • looks for kqueue on all *nix systems
  • better error output on std::runtime_error, now says where it came from
  • Fixed some tests on jruby
  • Added test for general send_data flaw, required for a bugfix in jruby build
  • Added timeout to epoll tests
  • Added fixes for java reactor ruby api
  • Small addition of some docs in httpclient.rb and httpcli2.rb
  • Some refactor and fixes in smtpserver.rb
  • Added parenthesis where possible to avoid excess ruby warnings
  • Refactor of $eventmachine_library logic for accuracy and maintenance, jruby
  • EM::start_server now supports unix sockets
  • EM::connect now supports unix sockets
  • EM::defer @threadqueue now handled more gracefully
  • Added better messages on exceptions raised
  • Fix edge case in timer fires
  • Explicitly require buftok.rb
  • Add protocols to autoload, rather than require them all immediately
  • Fix a bug in pr_eventmachine for outbound_q
  • Refactors to take some of the use of defer out of tests.
  • Fixes in EM.defer under start/stop conditions. Reduced scope of threads.
  • Property svn:keywords set to Id
Line 
1 /*****************************************************************************
2
3 $Id$
4
5 File:     em.cpp
6 Date:     06Apr06
7
8 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
9 Gmail: blackhedd
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of either: 1) the GNU General Public License
13 as published by the Free Software Foundation; either version 2 of the
14 License, or (at your option) any later version; or 2) Ruby's License.
15
16 See the file COPYING for complete licensing information.
17
18 *****************************************************************************/
19
20 // THIS ENTIRE FILE WILL EVENTUALLY BE FOR UNIX BUILDS ONLY.
21 //#ifdef OS_UNIX
22
23
24 #include "project.h"
25
26 // Keep a global variable floating around
27 // with the current loop time as set by the Event Machine.
28 // This avoids the need for frequent expensive calls to time(NULL);
29 time_t gCurrentLoopTime;
30
31 #ifdef OS_WIN32
32 unsigned gTickCountTickover;
33 unsigned gLastTickCount;
34 #endif
35
36
37 /* The numer of max outstanding timers was once a const enum defined in em.h.
38  * Now we define it here so that users can change its value if necessary.
39  */
40 static int MaxOutstandingTimers = 1000;
41
42
43 /* Internal helper to convert strings to internet addresses. IPv6-aware.
44  * Not reentrant or threadsafe, optimized for speed.
45  */
46 static struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size);
47
48
49 /***************************************
50 STATIC EventMachine_t::SetMaxTimerCount
51 ***************************************/
52
53 void EventMachine_t::SetMaxTimerCount (int count)
54 {
55         /* Allow a user to increase the maximum number of outstanding timers.
56          * If this gets "too high" (a metric that is of course platform dependent),
57          * bad things will happen like performance problems and possible overuse
58          * of memory.
59          * The actual timer mechanism is very efficient so it's hard to know what
60          * the practical max, but 100,000 shouldn't be too problematical.
61          */
62         if (count < 100)
63                 count = 100;
64         MaxOutstandingTimers = count;
65 }
66
67
68
69 /******************************
70 EventMachine_t::EventMachine_t
71 ******************************/
72
73 EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const char*, int)):
74         EventCallback (event_callback),
75         NextHeartbeatTime (0),
76         LoopBreakerReader (-1),
77         LoopBreakerWriter (-1),
78         bEpoll (false),
79         bKqueue (false),
80         epfd (-1)
81 {
82         // Default time-slice is just smaller than one hundred mills.
83         Quantum.tv_sec = 0;
84         Quantum.tv_usec = 90000;
85
86         gTerminateSignalReceived = false;
87         // Make sure the current loop time is sane, in case we do any initializations of
88         // objects before we start running.
89         gCurrentLoopTime = time(NULL);
90
91         /* We initialize the network library here (only on Windows of course)
92          * and initialize "loop breakers." Our destructor also does some network-level
93          * cleanup. There's thus an implicit assumption that any given instance of EventMachine_t
94          * will only call ::Run once. Is that a good assumption? Should we move some of these
95          * inits and de-inits into ::Run?
96          */
97         #ifdef OS_WIN32
98         WSADATA w;
99         WSAStartup (MAKEWORD (1, 1), &w);
100         #endif
101
102         _InitializeLoopBreaker();
103 }
104
105
106 /*******************************
107 EventMachine_t::~EventMachine_t
108 *******************************/
109
110 EventMachine_t::~EventMachine_t()
111 {
112         // Run down descriptors
113         size_t i;
114         for (i = 0; i < NewDescriptors.size(); i++)
115                 delete NewDescriptors[i];
116         for (i = 0; i < Descriptors.size(); i++)
117                 delete Descriptors[i];
118
119         close (LoopBreakerReader);
120         close (LoopBreakerWriter);
121
122         if (epfd != -1)
123                 close (epfd);
124         if (kqfd != -1)
125                 close (kqfd);
126 }
127
128
129 /*************************
130 EventMachine_t::_UseEpoll
131 *************************/
132
133 void EventMachine_t::_UseEpoll()
134 {
135         /* Temporary.
136          * Use an internal flag to switch in epoll-based functionality until we determine
137          * how it should be integrated properly and the extent of the required changes.
138          * A permanent solution needs to allow the integration of additional technologies,
139          * like kqueue and Solaris's events.
140          */
141
142         #ifdef HAVE_EPOLL
143         bEpoll = true;
144         #endif
145 }
146
147 /**************************
148 EventMachine_t::_UseKqueue
149 **************************/
150
151 void EventMachine_t::_UseKqueue()
152 {
153         /* Temporary.
154          * See comments under _UseEpoll.
155          */
156
157         #ifdef HAVE_KQUEUE
158         bKqueue = true;
159         #endif
160 }
161
162
163 /****************************
164 EventMachine_t::ScheduleHalt
165 ****************************/
166
167 void EventMachine_t::ScheduleHalt()
168 {
169   /* This is how we stop the machine.
170    * This can be called by clients. Signal handlers will probably
171    * set the global flag.
172    * For now this means there can only be one EventMachine ever running at a time.
173    *
174    * IMPORTANT: keep this light, fast, and async-safe. Don't do anything frisky in here,
175    * because it may be called from signal handlers invoked from code that we don't
176    * control. At this writing (20Sep06), EM does NOT install any signal handlers of
177    * its own.
178    *
179    * We need a FAQ. And one of the questions is: how do I stop EM when Ctrl-C happens?
180    * The answer is to call evma_stop_machine, which calls here, from a SIGINT handler.
181    */
182         gTerminateSignalReceived = true;
183 }
184
185
186
187 /*******************************
188 EventMachine_t::SetTimerQuantum
189 *******************************/
190
191 void EventMachine_t::SetTimerQuantum (int interval)
192 {
193         /* We get a timer-quantum expressed in milliseconds.
194          * Don't set a quantum smaller than 5 or larger than 2500.
195          */
196
197         if ((interval < 5) || (interval > 2500))
198                 throw std::runtime_error ("invalid timer-quantum");
199
200         Quantum.tv_sec = interval / 1000;
201         Quantum.tv_usec = (interval % 1000) * 1000;
202 }
203
204
205 /*************************************
206 (STATIC) EventMachine_t::SetuidString
207 *************************************/
208
209 void EventMachine_t::SetuidString (const char *username)
210 {
211     /* This method takes a caller-supplied username and tries to setuid
212      * to that user. There is no meaningful implementation (and no error)
213      * on Windows. On Unix, a failure to setuid the caller-supplied string
214      * causes a fatal abort, because presumably the program is calling here
215      * in order to fulfill a security requirement. If we fail silently,
216      * the user may continue to run with too much privilege.
217      *
218      * TODO, we need to decide on and document a way of generating C++ level errors
219      * that can be wrapped in documented Ruby exceptions, so users can catch
220      * and handle them. And distinguish it from errors that we WON'T let the Ruby
221      * user catch (like security-violations and resource-overallocation).
222      * A setuid failure here would be in the latter category.
223      */
224
225     #ifdef OS_UNIX
226     if (!username || !*username)
227         throw std::runtime_error ("setuid_string failed: no username specified");
228
229     struct passwd *p = getpwnam (username);
230     if (!p)
231         throw std::runtime_error ("setuid_string failed: unknown username");
232
233     if (setuid (p->pw_uid) != 0)
234         throw std::runtime_error ("setuid_string failed: no setuid");
235
236     // Success.
237     #endif
238 }
239
240
241 /****************************************
242 (STATIC) EventMachine_t::SetRlimitNofile
243 ****************************************/
244
245 int EventMachine_t::SetRlimitNofile (int nofiles)
246 {
247         #ifdef OS_UNIX
248         struct rlimit rlim;
249         getrlimit (RLIMIT_NOFILE, &rlim);
250         if (nofiles >= 0) {
251                 rlim.rlim_cur = nofiles;
252                 if (nofiles > rlim.rlim_max)
253                         rlim.rlim_max = nofiles;
254                 setrlimit (RLIMIT_NOFILE, &rlim);
255                 // ignore the error return, for now at least.
256                 // TODO, emit an error message someday when we have proper debug levels.
257         }
258         getrlimit (RLIMIT_NOFILE, &rlim);
259         return rlim.rlim_cur;
260         #endif
261
262         #ifdef OS_WIN32
263         // No meaningful implementation on Windows.
264         return 0;
265         #endif
266 }
267
268
269 /*********************************
270 EventMachine_t::SignalLoopBreaker
271 *********************************/
272
273 void EventMachine_t::SignalLoopBreaker()
274 {
275         #ifdef OS_UNIX
276         write (LoopBreakerWriter, "", 1);
277         #endif
278         #ifdef OS_WIN32
279         sendto (LoopBreakerReader, "", 0, 0, (struct sockaddr*)&(LoopBreakerTarget), sizeof(LoopBreakerTarget));
280         #endif
281 }
282
283
284 /**************************************
285 EventMachine_t::_InitializeLoopBreaker
286 **************************************/
287
288 void EventMachine_t::_InitializeLoopBreaker()
289 {
290         /* A "loop-breaker" is a socket-descriptor that we can write to in order
291          * to break the main select loop. Primarily useful for things running on
292          * threads other than the main EM thread, so they can trigger processing
293          * of events that arise exogenously to the EM.
294          * Keep the loop-breaker pipe out of the main descriptor set, otherwise
295          * its events will get passed on to user code.
296          */
297
298         #ifdef OS_UNIX
299         int fd[2];
300         if (pipe (fd))
301                 throw std::runtime_error ("no loop breaker");
302
303         LoopBreakerWriter = fd[1];
304         LoopBreakerReader = fd[0];
305         #endif
306
307         #ifdef OS_WIN32
308         int sd = socket (AF_INET, SOCK_DGRAM, 0);
309         if (sd == INVALID_SOCKET)
310                 throw std::runtime_error ("no loop breaker socket");
311         SetSocketNonblocking (sd);
312
313         memset (&LoopBreakerTarget, 0, sizeof(LoopBreakerTarget));
314         LoopBreakerTarget.sin_family = AF_INET;
315         LoopBreakerTarget.sin_addr.s_addr = inet_addr ("127.0.0.1");
316
317         srand ((int)time(NULL));
318         int i;
319         for (i=0; i < 100; i++) {
320                 int r = (rand() % 10000) + 20000;
321                 LoopBreakerTarget.sin_port = htons (r);
322                 if (bind (sd, (struct sockaddr*)&LoopBreakerTarget, sizeof(LoopBreakerTarget)) == 0)
323                         break;
324         }
325
326         if (i == 100)
327                 throw std::runtime_error ("no loop breaker");
328         LoopBreakerReader = sd;
329         #endif
330 }
331
332
333 /*******************
334 EventMachine_t::Run
335 *******************/
336
337 void EventMachine_t::Run()
338 {
339         #ifdef OS_WIN32
340         HookControlC (true);
341         #endif
342
343         #ifdef HAVE_EPOLL
344         if (bEpoll) {
345                 epfd = epoll_create (MaxEpollDescriptors);
346                 if (epfd == -1) {
347                         char buf[200];
348                         snprintf (buf, sizeof(buf)-1, "unable to create epoll descriptor: %s", strerror(errno));
349                         throw std::runtime_error (buf);
350                 }
351                 int cloexec = fcntl (epfd, F_GETFD, 0);
352                 assert (cloexec >= 0);
353                 cloexec |= FD_CLOEXEC;
354                 fcntl (epfd, F_SETFD, cloexec);
355
356                 assert (LoopBreakerReader >= 0);
357                 LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this);
358                 assert (ld);
359                 Add (ld);
360         }
361         #endif
362
363         #ifdef HAVE_KQUEUE
364         if (bKqueue) {
365                 kqfd = kqueue();
366                 if (kqfd == -1) {
367                         char buf[200];
368                         snprintf (buf, sizeof(buf)-1, "unable to create kqueue descriptor: %s", strerror(errno));
369                         throw std::runtime_error (buf);
370                 }
371                 // cloexec not needed. By definition, kqueues are not carried across forks.
372
373                 assert (LoopBreakerReader >= 0);
374                 LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this);
375                 assert (ld);
376                 Add (ld);
377         }
378         #endif
379
380         while (true) {
381                 gCurrentLoopTime = time(NULL);
382                 if (!_RunTimers())
383                         break;
384
385                 /* _Add must precede _Modify because the same descriptor might
386                  * be on both lists during the same pass through the machine,
387                  * and to modify a descriptor before adding it would fail.
388                  */
389                 _AddNewDescriptors();
390                 _ModifyDescriptors();
391
392                 if (!_RunOnce())
393                         break;
394                 if (gTerminateSignalReceived)
395                         break;
396         }
397
398         #ifdef OS_WIN32
399         HookControlC (false);
400         #endif
401 }
402
403
404 /************************
405 EventMachine_t::_RunOnce
406 ************************/
407
408 bool EventMachine_t::_RunOnce()
409 {
410         if (bEpoll)
411                 return _RunEpollOnce();
412         else if (bKqueue)
413                 return _RunKqueueOnce();
414         else
415                 return _RunSelectOnce();
416 }
417
418
419
420 /*****************************
421 EventMachine_t::_RunEpollOnce
422 *****************************/
423
424 bool EventMachine_t::_RunEpollOnce()
425 {
426         #ifdef HAVE_EPOLL
427         assert (epfd != -1);
428         struct epoll_event ev [MaxEpollDescriptors];
429         int s = epoll_wait (epfd, ev, MaxEpollDescriptors, 50);
430         if (s > 0) {
431                 for (int i=0; i < s; i++) {
432                         EventableDescriptor *ed = (EventableDescriptor*) ev[i].data.ptr;
433
434                         if (ev[i].events & (EPOLLERR | EPOLLHUP))
435                                 ed->ScheduleClose (false);
436                         if (ev[i].events & EPOLLIN)
437                                 ed->Read();
438                         if (ev[i].events & EPOLLOUT) {
439                                 ed->Write();
440                                 epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent());
441                                 // Ignoring return value
442                         }
443                 }
444         }
445         else if (s < 0) {
446                 // epoll_wait can fail on error in a handful of ways.
447                 // If this happens, then wait for a little while to avoid busy-looping.
448                 // If the error was EINTR, we probably caught SIGCHLD or something,
449                 // so keep the wait short.
450                 timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000};
451                 EmSelect (0, NULL, NULL, NULL, &tv);
452         }
453
454         { // cleanup dying sockets
455                 // vector::pop_back works in constant time.
456                 // TODO, rip this out and only delete the descriptors we know have died,
457                 // rather than traversing the whole list.
458                 //  Modified 05Jan08 per suggestions by Chris Heath. It's possible that
459                 //  an EventableDescriptor will have a descriptor value of -1. That will
460                 //  happen if EventableDescriptor::Close was called on it. In that case,
461                 //  don't call epoll_ctl to remove the socket's filters from the epoll set.
462                 //  According to the epoll docs, this happens automatically when the
463                 //  descriptor is closed anyway. This is different from the case where
464                 //  the socket has already been closed but the descriptor in the ED object
465                 //  hasn't yet been set to INVALID_SOCKET.
466                 int i, j;
467                 int nSockets = Descriptors.size();
468                 for (i=0, j=0; i < nSockets; i++) {
469                         EventableDescriptor *ed = Descriptors[i];
470                         assert (ed);
471                         if (ed->ShouldDelete()) {
472                                 if (ed->GetSocket() != INVALID_SOCKET) {
473                                         assert (bEpoll); // wouldn't be in this method otherwise.
474                                         assert (epfd != -1);
475                                         int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent());
476                                         // ENOENT or EBADF are not errors because the socket may be already closed when we get here.
477                                         if (e && (errno != ENOENT) && (errno != EBADF)) {
478                                                 char buf [200];
479                                                 snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno));
480                                                 throw std::runtime_error (buf);
481                                         }
482                                 }
483
484                                 ModifiedDescriptors.erase (ed);
485                                 delete ed;
486                         }
487                         else
488                                 Descriptors [j++] = ed;
489                 }
490                 while ((size_t)j < Descriptors.size())
491                         Descriptors.pop_back();
492
493         }
494
495         // TODO, heartbeats.
496         // Added 14Sep07, its absence was noted by Brian Candler. But the comment was here, indicated
497         // that this got thought about and not done when EPOLL was originally written. Was there a reason
498         // not to do it, or was it an oversight? Certainly, running a heartbeat on 50,000 connections every
499         // two seconds can get to be a real bear, especially if all we're doing is timing out dead ones.
500         // Maybe there's a better way to do this. (Or maybe it's not that expensive after all.)
501         //
502         { // dispatch heartbeats
503                 if (gCurrentLoopTime >= NextHeartbeatTime) {
504                         NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
505
506                         for (int i=0; i < Descriptors.size(); i++) {
507                                 EventableDescriptor *ed = Descriptors[i];
508                                 assert (ed);
509                                 ed->Heartbeat();
510                         }
511                 }
512         }
513
514         timeval tv = {0,0};
515         EmSelect (0, NULL, NULL, NULL, &tv);
516
517         return true;
518         #else
519         throw std::runtime_error ("epoll is not implemented on this platform");
520         #endif
521 }
522
523
524 /******************************
525 EventMachine_t::_RunKqueueOnce
526 ******************************/
527
528 bool EventMachine_t::_RunKqueueOnce()
529 {
530         #ifdef HAVE_KQUEUE
531         assert (kqfd != -1);
532         const int maxKevents = 2000;
533         struct kevent Karray [maxKevents];
534         struct timespec ts = {0, 10000000}; // Too frequent. Use blocking_region
535
536         int k = kevent (kqfd, NULL, 0, Karray, maxKevents, &ts);
537         struct kevent *ke = Karray;
538         while (k > 0) {
539                 EventableDescriptor *ed = (EventableDescriptor*) (ke->udata);
540                 assert (ed);
541
542                 if (ke->filter == EVFILT_READ)
543                         ed->Read();
544                 else if (ke->filter == EVFILT_WRITE)
545                         ed->Write();
546                 else
547                         cerr << "Discarding unknown kqueue event " << ke->filter << endl;
548
549                 --k;
550                 ++ke;
551         }
552
553         { // cleanup dying sockets
554                 // vector::pop_back works in constant time.
555                 // TODO, rip this out and only delete the descriptors we know have died,
556                 // rather than traversing the whole list.
557                 // In kqueue, closing a descriptor automatically removes its event filters.
558
559                 int i, j;
560                 int nSockets = Descriptors.size();
561                 for (i=0, j=0; i < nSockets; i++) {
562                         EventableDescriptor *ed = Descriptors[i];
563                         assert (ed);
564                         if (ed->ShouldDelete()) {
565                                 ModifiedDescriptors.erase (ed);
566                                 delete ed;
567                         }
568                         else
569                                 Descriptors [j++] = ed;
570                 }
571                 while ((size_t)j < Descriptors.size())
572                         Descriptors.pop_back();
573
574         }
575
576         { // dispatch heartbeats
577                 if (gCurrentLoopTime >= NextHeartbeatTime) {
578                         NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
579
580                         for (int i=0; i < Descriptors.size(); i++) {
581                                 EventableDescriptor *ed = Descriptors[i];
582                                 assert (ed);
583                                 ed->Heartbeat();
584                         }
585                 }
586         }
587
588
589         // TODO, replace this with rb_thread_blocking_region for 1.9 builds.
590         timeval tv = {0,0};
591         EmSelect (0, NULL, NULL, NULL, &tv);
592
593         return true;
594         #else
595         throw std::runtime_error ("kqueue is not implemented on this platform");
596         #endif
597 }
598
599
600 /*********************************
601 EventMachine_t::_ModifyEpollEvent
602 *********************************/
603
604 void EventMachine_t::_ModifyEpollEvent (EventableDescriptor *ed)
605 {
606         #ifdef HAVE_EPOLL
607         if (bEpoll) {
608                 assert (epfd != -1);
609                 assert (ed);
610                 int e = epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent());
611                 if (e) {
612                         char buf [200];
613                         snprintf (buf, sizeof(buf)-1, "unable to modify epoll event: %s", strerror(errno));
614                         throw std::runtime_error (buf);
615                 }
616         }
617         #endif
618 }
619
620
621
622 /**************************
623 SelectData_t::SelectData_t
624 **************************/
625
626 SelectData_t::SelectData_t()
627 {
628         maxsocket = 0;
629         FD_ZERO (&fdreads);
630         FD_ZERO (&fdwrites);
631 }
632
633
634 #ifdef BUILD_FOR_RUBY
635 /*****************
636 _SelectDataSelect
637 *****************/
638
639 #ifdef HAVE_TBR
640 static VALUE _SelectDataSelect (void *v)
641 {
642         SelectData_t *sd = (SelectData_t*)v;
643         sd->nSockets = select (sd->maxsocket+1, &(sd->fdreads), &(sd->fdwrites), NULL, &(sd->tv));
644         return Qnil;
645 }
646 #endif
647
648 /*********************
649 SelectData_t::_Select
650 *********************/
651
652 int SelectData_t::_Select()
653 {
654         #ifdef HAVE_TBR
655         rb_thread_blocking_region (_SelectDataSelect, (void*)this, RB_UBF_DFL, 0);
656         return nSockets;
657         #endif
658
659         #ifndef HAVE_TBR
660         return EmSelect (maxsocket+1, &fdreads, &fdwrites, NULL, &tv);
661         #endif
662 }
663 #endif
664
665
666
667 /******************************
668 EventMachine_t::_RunSelectOnce
669 ******************************/
670
671 bool EventMachine_t::_RunSelectOnce()
672 {
673         // Crank the event machine once.
674         // If there are no descriptors to process, then sleep
675         // for a few hundred mills to avoid busy-looping.
676         // Return T/F to indicate whether we should continue.
677         // This is based on a select loop. Alternately provide epoll
678         // if we know we're running on a 2.6 kernel.
679         // epoll will be effective if we provide it as an alternative,
680         // however it has the same problem interoperating with Ruby
681         // threads that select does.
682
683         //cerr << "X";
684
685         /* This protection is now obsolete, because we will ALWAYS
686          * have at least one descriptor (the loop-breaker) to read.
687          */
688         /*
689         if (Descriptors.size() == 0) {
690                 #ifdef OS_UNIX
691                 timeval tv = {0, 200 * 1000};
692                 EmSelect (0, NULL, NULL, NULL, &tv);
693                 return true;
694                 #endif
695                 #ifdef OS_WIN32
696                 Sleep (200);
697                 return true;
698                 #endif
699         }
700         */
701
702         SelectData_t SelectData;
703         /*
704         fd_set fdreads, fdwrites;
705         FD_ZERO (&fdreads);
706         FD_ZERO (&fdwrites);
707
708         int maxsocket = 0;
709         */
710
711         // Always read the loop-breaker reader.
712         // Changed 23Aug06, provisionally implemented for Windows with a UDP socket
713         // running on localhost with a randomly-chosen port. (*Puke*)
714         // Windows has a version of the Unix pipe() library function, but it doesn't
715         // give you back descriptors that are selectable.
716         FD_SET (LoopBreakerReader, &(SelectData.fdreads));
717         if (SelectData.maxsocket < LoopBreakerReader)
718                 SelectData.maxsocket = LoopBreakerReader;
719
720         // prepare the sockets for reading and writing
721         size_t i;
722         for (i = 0; i < Descriptors.size(); i++) {
723                 EventableDescriptor *ed = Descriptors[i];
724                 assert (ed);
725                 int sd = ed->GetSocket();
726                 assert (sd != INVALID_SOCKET);
727
728                 if (ed->SelectForRead())
729                         FD_SET (sd, &(SelectData.fdreads));
730                 if (ed->SelectForWrite())
731                         FD_SET (sd, &(SelectData.fdwrites));
732
733                 if (SelectData.maxsocket < sd)
734                         SelectData.maxsocket = sd;
735         }
736
737
738         { // read and write the sockets
739                 //timeval tv = {1, 0}; // Solaris fails if the microseconds member is >= 1000000.
740                 //timeval tv = Quantum;
741                 SelectData.tv = Quantum;
742                 int s = SelectData._Select();
743                 //rb_thread_blocking_region(xxx,(void*)&SelectData,RB_UBF_DFL,0);
744                 //int s = EmSelect (SelectData.maxsocket+1, &(SelectData.fdreads), &(SelectData.fdwrites), NULL, &(SelectData.tv));
745                 //int s = SelectData.nSockets;
746                 if (s > 0) {
747                         /* Changed 01Jun07. We used to handle the Loop-breaker right here.
748                          * Now we do it AFTER all the regular descriptors. There's an
749                          * incredibly important and subtle reason for this. Code on
750                          * loop breakers is sometimes used to cause the reactor core to
751                          * cycle (for example, to allow outbound network buffers to drain).
752                          * If a loop-breaker handler reschedules itself (say, after determining
753                          * that the write buffers are still too full), then it will execute
754                          * IMMEDIATELY if _ReadLoopBreaker is done here instead of after
755                          * the other descriptors are processed. That defeats the whole purpose.
756                          */
757                         for (i=0; i < Descriptors.size(); i++) {
758                                 EventableDescriptor *ed = Descriptors[i];
759                                 assert (ed);
760                                 int sd = ed->GetSocket();
761                                 assert (sd != INVALID_SOCKET);
762
763                                 if (FD_ISSET (sd, &(SelectData.fdwrites)))
764                                         ed->Write();
765                                 if (FD_ISSET (sd, &(SelectData.fdreads)))
766                                         ed->Read();
767                         }
768
769                         if (FD_ISSET (LoopBreakerReader, &(SelectData.fdreads)))
770                                 _ReadLoopBreaker();
771                 }
772                 else if (s < 0) {
773                         // select can fail on error in a handful of ways.
774                         // If this happens, then wait for a little while to avoid busy-looping.
775                         // If the error was EINTR, we probably caught SIGCHLD or something,
776                         // so keep the wait short.
777                         timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000};
778                         EmSelect (0, NULL, NULL, NULL, &tv);
779                 }
780         }
781
782
783         { // dispatch heartbeats
784                 if (gCurrentLoopTime >= NextHeartbeatTime) {
785                         NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
786
787                         for (i=0; i < Descriptors.size(); i++) {
788                                 EventableDescriptor *ed = Descriptors[i];
789                                 assert (ed);
790                                 ed->Heartbeat();
791                         }
792                 }
793         }
794
795         { // cleanup dying sockets
796                 // vector::pop_back works in constant time.
797                 int i, j;
798                 int nSockets = Descriptors.size();
799                 for (i=0, j=0; i < nSockets; i++) {
800                         EventableDescriptor *ed = Descriptors[i];
801                         assert (ed);
802                         if (ed->ShouldDelete())
803                                 delete ed;
804                         else
805                                 Descriptors [j++] = ed;
806                 }
807                 while ((size_t)j < Descriptors.size())
808                         Descriptors.pop_back();
809
810         }
811
812         return true;
813 }
814
815
816 /********************************
817 EventMachine_t::_ReadLoopBreaker
818 ********************************/
819
820 void EventMachine_t::_ReadLoopBreaker()
821 {
822         /* The loop breaker has selected readable.
823          * Read it ONCE (it may block if we try to read it twice)
824          * and send a loop-break event back to user code.
825          */
826         char buffer [1024];
827         read (LoopBreakerReader, buffer, sizeof(buffer));
828         if (EventCallback)
829                 (*EventCallback)("", EM_LOOPBREAK_SIGNAL, "", 0);
830 }
831
832
833 /**************************
834 EventMachine_t::_RunTimers
835 **************************/
836
837 bool EventMachine_t::_RunTimers()
838 {
839         // These are caller-defined timer handlers.
840         // Return T/F to indicate whether we should continue the main loop.
841         // We rely on the fact that multimaps sort by their keys to avoid
842         // inspecting the whole list every time we come here.
843         // Just keep inspecting and processing the list head until we hit
844         // one that hasn't expired yet.
845
846         #ifdef OS_UNIX
847         struct timeval tv;
848         gettimeofday (&tv, NULL);
849         Int64 now = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec));
850         #endif
851
852         #ifdef OS_WIN32
853         unsigned tick = GetTickCount();
854         if (tick < gLastTickCount)
855                 gTickCountTickover += 1;
856         gLastTickCount = tick;
857         Int64 now = ((Int64)gTickCountTickover << 32) + (Int64)tick;
858         #endif
859
860         while (true) {
861                 multimap<Int64,Timer_t>::iterator i = Timers.begin();
862                 if (i == Timers.end())
863                         break;
864                 if (i->first > now)
865                         break;
866                 if (EventCallback)
867                         (*EventCallback) ("", EM_TIMER_FIRED, i->second.GetBinding().c_str(), i->second.GetBinding().length());
868                 Timers.erase (i);
869         }
870         return true;
871 }
872
873
874
875 /***********************************
876 EventMachine_t::InstallOneshotTimer
877 ***********************************/
878
879 const char *EventMachine_t::InstallOneshotTimer (int milliseconds)
880 {
881         if (Timers.size() > MaxOutstandingTimers)
882                 return false;
883         // Don't use the global loop-time variable here, because we might
884         // get called before the main event machine is running.
885
886         #ifdef OS_UNIX
887         struct timeval tv;
888         gettimeofday (&tv, NULL);
889         Int64 fire_at = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec));
890         fire_at += ((Int64)milliseconds) * 1000LL;
891         #endif
892
893         #ifdef OS_WIN32
894         unsigned tick = GetTickCount();
895         if (tick < gLastTickCount)
896                 gTickCountTickover += 1;
897         gLastTickCount = tick;
898
899         Int64 fire_at = ((Int64)gTickCountTickover << 32) + (Int64)tick;
900         fire_at += (Int64)milliseconds;
901         #endif
902
903         Timer_t t;
904         multimap<Int64,Timer_t>::iterator i =
905                 Timers.insert (make_pair (fire_at, t));
906         return i->second.GetBindingChars();
907 }
908
909
910 /*******************************
911 EventMachine_t::ConnectToServer
912 *******************************/
913
914 const char *EventMachine_t::ConnectToServer (const char *server, int port)
915 {
916         /* We want to spend no more than a few seconds waiting for a connection
917          * to a remote host. So we use a nonblocking connect.
918          * Linux disobeys the usual rules for nonblocking connects.
919          * Per Stevens (UNP p.410), you expect a nonblocking connect to select
920          * both readable and writable on error, and not to return EINPROGRESS
921          * if the connect can be fulfilled immediately. Linux violates both
922          * of these expectations.
923          * Any kind of nonblocking connect on Linux returns EINPROGRESS.
924          * The socket will then return writable when the disposition of the
925          * connect is known, but it will not also be readable in case of
926          * error! Weirdly, it will be readable in case there is data to read!!!
927          * (Which can happen with protocols like SSH and SMTP.)
928          * I suppose if you were so inclined you could consider this logical,
929          * but it's not the way Unix has historically done it.
930          * So we ignore the readable flag and read getsockopt to see if there
931          * was an error connecting. A select timeout works as expected.
932          * In regard to getsockopt: Linux does the Berkeley-style thing,
933          * not the Solaris-style, and returns zero with the error code in
934          * the error parameter.
935          * Return the binding-text of the newly-created pending connection,
936          * or NULL if there was a problem.
937          */
938
939         if (!server || !*server || !port)
940                 return NULL;
941
942         int family, bind_size;
943         struct sockaddr *bind_as = name2address (server, port, &family, &bind_size);
944         if (!bind_as)
945                 return NULL;
946
947         int sd = socket (family, SOCK_STREAM, 0);
948         if (sd == INVALID_SOCKET)
949                 return NULL;
950
951         /*
952         sockaddr_in pin;
953         unsigned long HostAddr;
954
955         HostAddr = inet_addr (server);
956         if (HostAddr == INADDR_NONE) {
957                 hostent *hp = gethostbyname ((char*)server); // Windows requires (char*)
958                 if (!hp) {
959                         // TODO: This gives the caller a fatal error. Not good.
960                         // They can respond by catching RuntimeError (blecch).
961                         // Possibly we need to fire an unbind event and provide
962                         // a status code so user code can detect the cause of the
963                         // failure.
964                         return NULL;
965                 }
966                 HostAddr = ((in_addr*)(hp->h_addr))->s_addr;
967         }
968
969         memset (&pin, 0, sizeof(pin));
970         pin.sin_family = AF_INET;
971         pin.sin_addr.s_addr = HostAddr;
972         pin.sin_port = htons (port);
973
974         int sd = socket (AF_INET, SOCK_STREAM, 0);
975         if (sd == INVALID_SOCKET)
976                 return NULL;
977         */
978
979         // From here on, ALL error returns must close the socket.
980         // Set the new socket nonblocking.
981         if (!SetSocketNonblocking (sd)) {
982                 closesocket (sd);
983                 return NULL;
984         }
985         // Disable slow-start (Nagle algorithm).
986         int one = 1;
987         setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one));
988
989         const char *out = NULL;
990
991         #ifdef OS_UNIX
992         //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) {
993         if (connect (sd, bind_as, bind_size) == 0) {
994                 // This is a connect success, which Linux appears
995                 // never to give when the socket is nonblocking,
996                 // even if the connection is intramachine or to
997                 // localhost.
998
999                 /* Changed this branch 08Aug06. Evidently some kernels
1000                  * (FreeBSD for example) will actually return success from
1001                  * a nonblocking connect. This is a pretty simple case,
1002                  * just set up the new connection and clear the pending flag.
1003                  * Thanks to Chris Ochs for helping track this down.
1004                  * This branch never gets taken on Linux or (oddly) OSX.
1005                  * The original behavior was to throw an unimplemented,
1006                  * which the user saw as a fatal exception. Very unfriendly.
1007                  *
1008                  * Tweaked 10Aug06. Even though the connect disposition is
1009                  * known, we still set the connect-pending flag. That way
1010                  * some needed initialization will happen in the ConnectionDescriptor.
1011                  * (To wit, the ConnectionCompleted event gets sent to the client.)
1012                  */
1013                 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1014                 if (!cd)
1015                         throw std::runtime_error ("no connection allocated");
1016                 cd->SetConnectPending (true);
1017                 Add (cd);
1018                 out = cd->GetBinding().c_str();
1019         }
1020         else if (errno == EINPROGRESS) {
1021                 // Errno will generally always be EINPROGRESS, but on Linux
1022                 // we have to look at getsockopt to be sure what really happened.
1023                 int error;
1024                 socklen_t len;
1025                 len = sizeof(error);
1026                 int o = getsockopt (sd, SOL_SOCKET, SO_ERROR, &error, &len);
1027                 if ((o == 0) && (error == 0)) {
1028                         // Here, there's no disposition.
1029                         // Put the connection on the stack and wait for it to complete
1030                         // or time out.
1031                         ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1032                         if (!cd)
1033                                 throw std::runtime_error ("no connection allocated");
1034                         cd->SetConnectPending (true);
1035                         Add (cd);
1036                         out = cd->GetBinding().c_str();
1037                 }
1038                 else {
1039                         /* This could be connection refused or some such thing.
1040                          * We will come here on Linux if a localhost connection fails.
1041                          * Changed 16Jul06: Originally this branch was a no-op, and
1042                          * we'd drop down to the end of the method, close the socket,
1043                          * and return NULL, which would cause the caller to GET A
1044                          * FATAL EXCEPTION. Now we keep the socket around but schedule an
1045                          * immediate close on it, so the caller will get a close-event
1046                          * scheduled on it. This was only an issue for localhost connections
1047                          * to non-listening ports. We may eventually need to revise this
1048                          * revised behavior, in case it causes problems like making it hard
1049                          * for people to know that a failure occurred.
1050                          */
1051                         ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1052                         if (!cd)
1053                                 throw std::runtime_error ("no connection allocated");
1054                         cd->ScheduleClose (false);
1055                         Add (cd);
1056                         out = cd->GetBinding().c_str();
1057                 }
1058         }
1059         else {
1060                 // The error from connect was something other then EINPROGRESS.
1061         }
1062         #endif
1063
1064         #ifdef OS_WIN32
1065         //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) {
1066         if (connect (sd, bind_as, bind_size) == 0) {
1067                 // This is a connect success, which Windows appears
1068                 // never to give when the socket is nonblocking,
1069                 // even if the connection is intramachine or to
1070                 // localhost.
1071                 throw std::runtime_error ("unimplemented");
1072         }
1073         else if (WSAGetLastError() == WSAEWOULDBLOCK) {
1074                 // Here, there's no disposition.
1075                 // Windows appears not to surface refused connections or
1076                 // such stuff at this point.
1077                 // Put the connection on the stack and wait for it to complete
1078                 // or time out.
1079                 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1080                 if (!cd)
1081                         throw std::runtime_error ("no connection allocated");
1082                 cd->SetConnectPending (true);
1083                 Add (cd);
1084                 out = cd->GetBinding().c_str();
1085         }
1086         else {
1087                 // The error from connect was something other then WSAEWOULDBLOCK.
1088         }
1089
1090         #endif
1091
1092         if (out == NULL)
1093                 closesocket (sd);
1094         return out;
1095 }
1096
1097 /***********************************
1098 EventMachine_t::ConnectToUnixServer
1099 ***********************************/
1100
1101 const char *EventMachine_t::ConnectToUnixServer (const char *server)
1102 {
1103         /* Connect to a Unix-domain server, which by definition is running
1104          * on the same host.
1105          * There is no meaningful implementation on Windows.
1106          * There's no need to do a nonblocking connect, since the connection
1107          * is always local and can always be fulfilled immediately.
1108          */
1109
1110         #ifdef OS_WIN32
1111         throw std::runtime_error ("unix-domain connection unavailable on this platform");
1112         return NULL;
1113         #endif
1114
1115         // The whole rest of this function is only compiled on Unix systems.
1116         #ifdef OS_UNIX
1117
1118         const char *out = NULL;
1119
1120         if (!server || !*server)
1121                 return NULL;
1122
1123         sockaddr_un pun;
1124         memset (&pun, 0, sizeof(pun));
1125         pun.sun_family = AF_LOCAL;
1126
1127         // You ordinarily expect the server name field to be at least 1024 bytes long,
1128         // but on Linux it can be MUCH shorter.
1129         if (strlen(server) >= sizeof(pun.sun_path))
1130                 throw std::runtime_error ("unix-domain server name is too long");
1131
1132
1133         strcpy (pun.sun_path, server);
1134
1135         int fd = socket (AF_LOCAL, SOCK_STREAM, 0);
1136         if (fd == INVALID_SOCKET)
1137                 return NULL;
1138
1139         // From here on, ALL error returns must close the socket.
1140         // NOTE: At this point, the socket is still a blocking socket.
1141         if (connect (fd, (struct sockaddr*)&pun, sizeof(pun)) != 0) {
1142                 closesocket (fd);
1143                 return NULL;
1144         }
1145
1146         // Set the newly-connected socket nonblocking.
1147         if (!SetSocketNonblocking (fd)) {
1148                 closesocket (fd);
1149                 return NULL;
1150         }
1151
1152         // Set up a connection descriptor and add it to the event-machine.
1153         // Observe, even though we know the connection status is connect-success,
1154         // we still set the "pending" flag, so some needed initializations take
1155         // place.
1156         ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this);
1157         if (!cd)
1158                 throw std::runtime_error ("no connection allocated");
1159         cd->SetConnectPending (true);
1160         Add (cd);
1161         out = cd->GetBinding().c_str();
1162
1163         if (out == NULL)
1164                 closesocket (fd);
1165
1166         return out;
1167         #endif
1168 }
1169
1170 /************************
1171 EventMachine_t::AttachFD
1172 ************************/
1173
1174 const char *EventMachine_t::AttachFD (int fd, bool notify_readable, bool notify_writable)
1175 {
1176         #ifdef OS_UNIX
1177         if (fcntl(fd, F_GETFL, 0) < 0)
1178                 throw std::runtime_error ("invalid file descriptor");
1179         #endif
1180
1181         #ifdef OS_WIN32
1182         // TODO: add better check for invalid file descriptors (see ioctlsocket or getsockopt)
1183         if (fd == INVALID_SOCKET)
1184                 throw std::runtime_error ("invalid file descriptor");
1185         #endif
1186
1187         {// Check for duplicate descriptors
1188                 for (size_t i = 0; i < Descriptors.size(); i++) {
1189                         EventableDescriptor *ed = Descriptors[i];
1190                         assert (ed);
1191                         if (ed->GetSocket() == fd)
1192                                 throw std::runtime_error ("adding existing descriptor");
1193                 }
1194
1195                 for (size_t i = 0; i < NewDescriptors.size(); i++) {
1196                         EventableDescriptor *ed = NewDescriptors[i];
1197                         assert (ed);
1198                         if (ed->GetSocket() == fd)
1199                                 throw std::runtime_error ("adding existing new descriptor");
1200                 }
1201         }
1202
1203         ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this);
1204         if (!cd)
1205                 throw std::runtime_error ("no connection allocated");
1206
1207         cd->SetConnectPending (true);
1208         cd->SetNotifyReadable (notify_readable);
1209         cd->SetNotifyWritable (notify_writable);
1210
1211         Add (cd);
1212
1213         const char *out = NULL;
1214         out = cd->GetBinding().c_str();
1215         if (out == NULL)
1216                 closesocket (fd);
1217         return out;
1218 }
1219
1220 /************************
1221 EventMachine_t::DetachFD
1222 ************************/
1223
1224 int EventMachine_t::DetachFD (EventableDescriptor *ed)
1225 {
1226         if (!ed)
1227                 throw std::runtime_error ("detaching bad descriptor");
1228
1229         #ifdef HAVE_EPOLL
1230         if (bEpoll) {
1231                 if (ed->GetSocket() != INVALID_SOCKET) {
1232                         assert (bEpoll); // wouldn't be in this method otherwise.
1233                         assert (epfd != -1);
1234                         int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent());
1235                         // ENOENT or EBADF are not errors because the socket may be already closed when we get here.
1236                         if (e && (errno != ENOENT) && (errno != EBADF)) {
1237                                 char buf [200];
1238                                 snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno));
1239                                 throw std::runtime_error (buf);
1240                         }
1241                 }
1242         }
1243         #endif
1244
1245         #ifdef HAVE_KQUEUE
1246         if (bKqueue) {
1247                 struct kevent k;
1248                 EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_DELETE, 0, 0, ed);
1249                 int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1250                 assert (t == 0);
1251         }
1252         #endif
1253
1254         { // remove descriptor from lists
1255                 int i, j;
1256                 int nSockets = Descriptors.size();
1257                 for (i=0, j=0; i < nSockets; i++) {
1258                         EventableDescriptor *ted = Descriptors[i];
1259                         assert (ted);
1260                         if (ted != ed)
1261                                 Descriptors [j++] = ted;
1262                 }
1263                 while ((size_t)j < Descriptors.size())
1264                         Descriptors.pop_back();
1265
1266                 ModifiedDescriptors.erase (ed);
1267         }
1268
1269         int fd = ed->GetSocket();
1270
1271         // We depend on ~EventableDescriptor not calling close() if the socket is invalid
1272         ed->SetSocketInvalid();
1273         delete ed;
1274
1275         return fd;
1276 }
1277
1278 /************
1279 name2address
1280 ************/
1281
1282 struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size)
1283 {
1284         // THIS IS NOT RE-ENTRANT OR THREADSAFE. Optimize for speed.
1285         // Check the more-common cases first.
1286         // Return NULL if no resolution.
1287
1288         static struct sockaddr_in in4;
1289         #ifndef __CYGWIN__
1290         static struct sockaddr_in6 in6;
1291         #endif
1292         struct hostent *hp;
1293
1294         if (!server || !*server)
1295                 server = "0.0.0.0";
1296
1297         memset (&in4, 0, sizeof(in4));
1298         if ( (in4.sin_addr.s_addr = inet_addr (server)) != INADDR_NONE) {
1299                 if (family)
1300                         *family = AF_INET;
1301                 if (bind_size)
1302                         *bind_size = sizeof(in4);
1303                 in4.sin_family = AF_INET;
1304                 in4.sin_port = htons (port);
1305                 return (struct sockaddr*)&in4;
1306         }
1307
1308         #if defined(OS_UNIX) && !defined(__CYGWIN__)
1309         memset (&in6, 0, sizeof(in6));
1310         if (inet_pton (AF_INET6, server, in6.sin6_addr.s6_addr) > 0) {
1311                 if (family)
1312                         *family = AF_INET6;
1313                 if (bind_size)
1314                         *bind_size = sizeof(in6);
1315                 in6.sin6_family = AF_INET6;
1316                 in6.sin6_port = htons (port);
1317                 return (struct sockaddr*)&in6;
1318         }
1319         #endif
1320
1321         #ifdef OS_WIN32
1322         // TODO, must complete this branch. Windows doesn't have inet_pton.
1323         // A possible approach is to make a getaddrinfo call with the supplied
1324         // server address, constraining the hints to ipv6 and seeing if we
1325         // get any addresses.
1326         // For the time being, Ipv6 addresses aren't supported on Windows.
1327         #endif
1328
1329         hp = gethostbyname ((char*)server); // Windows requires the cast.
1330         if (hp) {
1331                 in4.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
1332                 if (family)
1333                         *family = AF_INET;
1334                 if (bind_size)
1335                         *bind_size = sizeof(in4);
1336                 in4.sin_family = AF_INET;
1337                 in4.sin_port = htons (port);
1338                 return (struct sockaddr*)&in4;
1339         }
1340
1341         return NULL;
1342 }
1343
1344
1345 /*******************************
1346 EventMachine_t::CreateTcpServer
1347 *******************************/
1348
1349 const char *EventMachine_t::CreateTcpServer (const char *server, int port)
1350 {
1351         /* Create a TCP-acceptor (server) socket and add it to the event machine.
1352          * Return the binding of the new acceptor to the caller.
1353          * This binding will be referenced when the new acceptor sends events
1354          * to indicate accepted connections.
1355          */
1356
1357
1358         int family, bind_size;
1359         struct sockaddr *bind_here = name2address (server, port, &family, &bind_size);
1360         if (!bind_here)
1361                 return NULL;
1362
1363         const char *output_binding = NULL;
1364
1365         //struct sockaddr_in sin;
1366
1367         int sd_accept = socket (family, SOCK_STREAM, 0);
1368         if (sd_accept == INVALID_SOCKET) {
1369                 goto fail;
1370         }
1371
1372         /*
1373         memset (&sin, 0, sizeof(sin));
1374         sin.sin_family = AF_INET;
1375         sin.sin_addr.s_addr = INADDR_ANY;
1376         sin.sin_port = htons (port);
1377
1378         if (server && *server) {
1379                 sin.sin_addr.s_addr = inet_addr (server);
1380                 if (sin.sin_addr.s_addr == INADDR_NONE) {
1381                         hostent *hp = gethostbyname ((char*)server); // Windows requires the cast.
1382                         if (hp == NULL) {
1383                                 //__warning ("hostname not resolved: ", server);
1384                                 goto fail;
1385                         }
1386                         sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
1387                 }
1388         }
1389         */
1390
1391         { // set reuseaddr to improve performance on restarts.
1392                 int oval = 1;
1393                 if (setsockopt (sd_accept, SOL_SOCKET, SO_REUSEADDR, (char*)&oval, sizeof(oval)) < 0) {
1394                         //__warning ("setsockopt failed while creating listener","");
1395                         goto fail;
1396                 }
1397         }
1398
1399         { // set CLOEXEC. Only makes sense on Unix
1400                 #ifdef OS_UNIX
1401                 int cloexec = fcntl (sd_accept, F_GETFD, 0);
1402                 assert (cloexec >= 0);
1403                 cloexec |= FD_CLOEXEC;
1404                 fcntl (sd_accept, F_SETFD, cloexec);
1405                 #endif
1406         }
1407
1408
1409         //if (bind (sd_accept, (struct sockaddr*)&sin, sizeof(sin))) {
1410         if (bind (sd_accept, bind_here, bind_size)) {
1411                 //__warning ("binding failed");
1412                 goto fail;
1413         }
1414
1415         if (listen (sd_accept, 100)) {
1416                 //__warning ("listen failed");
1417                 goto fail;
1418         }
1419
1420         {
1421                 // Set the acceptor non-blocking.
1422                 // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
1423                 if (!SetSocketNonblocking (sd_accept)) {
1424                 //int val = fcntl (sd_accept, F_GETFL, 0);
1425                 //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
1426                         goto fail;
1427                 }
1428         }
1429
1430         { // Looking good.
1431                 AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
1432                 if (!ad)
1433                         throw std::runtime_error ("unable to allocate acceptor");
1434                 Add (ad);
1435                 output_binding = ad->GetBinding().c_str();
1436         }
1437
1438         return output_binding;
1439
1440         fail:
1441         if (sd_accept != INVALID_SOCKET)
1442                 closesocket (sd_accept);
1443         return NULL;
1444 }
1445
1446
1447 /**********************************
1448 EventMachine_t::OpenDatagramSocket
1449 **********************************/
1450
1451 const char *EventMachine_t::OpenDatagramSocket (const char *address, int port)
1452 {
1453         const char *output_binding = NULL;
1454
1455         int sd = socket (AF_INET, SOCK_DGRAM, 0);
1456         if (sd == INVALID_SOCKET)
1457                 goto fail;
1458         // from here on, early returns must close the socket!
1459
1460
1461         struct sockaddr_in sin;
1462         memset (&sin, 0, sizeof(sin));
1463         sin.sin_family = AF_INET;
1464         sin.sin_port = htons (port);
1465
1466
1467         if (address && *address) {
1468                 sin.sin_addr.s_addr = inet_addr (address);
1469                 if (sin.sin_addr.s_addr == INADDR_NONE) {
1470                         hostent *hp = gethostbyname ((char*)address); // Windows requires the cast.
1471                         if (hp == NULL)
1472                                 goto fail;
1473                         sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
1474                 }
1475         }
1476         else
1477                 sin.sin_addr.s_addr = htonl (INADDR_ANY);
1478
1479
1480         // Set the new socket nonblocking.
1481         {
1482                 if (!SetSocketNonblocking (sd))
1483                 //int val = fcntl (sd, F_GETFL, 0);
1484                 //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1)
1485                         goto fail;
1486         }
1487
1488         if (bind (sd, (struct sockaddr*)&sin, sizeof(sin)) != 0)
1489                 goto fail;
1490
1491         { // Looking good.
1492                 DatagramDescriptor *ds = new DatagramDescriptor (sd, this);
1493                 if (!ds)
1494                         throw std::runtime_error ("unable to allocate datagram-socket");
1495                 Add (ds);
1496                 output_binding = ds->GetBinding().c_str();
1497         }
1498
1499         return output_binding;
1500
1501         fail:
1502         if (sd != INVALID_SOCKET)
1503                 closesocket (sd);
1504         return NULL;
1505 }
1506
1507
1508
1509 /*******************
1510 EventMachine_t::Add
1511 *******************/
1512
1513 void EventMachine_t::Add (EventableDescriptor *ed)
1514 {
1515         if (!ed)
1516                 throw std::runtime_error ("added bad descriptor");
1517         ed->SetEventCallback (EventCallback);
1518         NewDescriptors.push_back (ed);
1519 }
1520
1521
1522 /*******************************
1523 EventMachine_t::ArmKqueueWriter
1524 *******************************/
1525
1526 void EventMachine_t::ArmKqueueWriter (EventableDescriptor *ed)
1527 {
1528         #ifdef HAVE_KQUEUE
1529         if (bKqueue) {
1530                 if (!ed)
1531                         throw std::runtime_error ("added bad descriptor");
1532                 struct kevent k;
1533                 EV_SET (&k, ed->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, ed);
1534                 int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1535                 assert (t == 0);
1536         }
1537         #endif
1538 }
1539
1540 /*******************************
1541 EventMachine_t::ArmKqueueReader
1542 *******************************/
1543
1544 void EventMachine_t::ArmKqueueReader (EventableDescriptor *ed)
1545 {
1546         #ifdef HAVE_KQUEUE
1547         if (bKqueue) {
1548                 if (!ed)
1549                         throw std::runtime_error ("added bad descriptor");
1550                 struct kevent k;
1551                 EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed);
1552                 int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1553                 assert (t == 0);
1554         }
1555         #endif
1556 }
1557
1558 /**********************************
1559 EventMachine_t::_AddNewDescriptors
1560 **********************************/
1561
1562 void EventMachine_t::_AddNewDescriptors()
1563 {
1564         /* Avoid adding descriptors to the main descriptor list
1565          * while we're actually traversing the list.
1566          * Any descriptors that are added as a result of processing timers
1567          * or acceptors should go on a temporary queue and then added
1568          * while we're not traversing the main list.
1569          * Also, it (rarely) happens that a newly-created descriptor
1570          * is immediately scheduled to close. It might be a good
1571          * idea not to bother scheduling these for I/O but if
1572          * we do that, we might bypass some important processing.
1573          */
1574
1575         for (size_t i = 0; i < NewDescriptors.size(); i++) {
1576                 EventableDescriptor *ed = NewDescriptors[i];
1577                 if (ed == NULL)
1578                         throw std::runtime_error ("adding bad descriptor");
1579
1580                 #if HAVE_EPOLL
1581                 if (bEpoll) {
1582                         assert (epfd != -1);
1583                         int e = epoll_ctl (epfd, EPOLL_CTL_ADD, ed->GetSocket(), ed->GetEpollEvent());
1584                         if (e) {
1585                                 char buf [200];
1586                                 snprintf (buf, sizeof(buf)-1, "unable to add new descriptor: %s", strerror(errno));
1587                                 throw std::runtime_error (buf);
1588                         }
1589                 }
1590                 #endif
1591
1592                 #if HAVE_KQUEUE
1593                 /*
1594                 if (bKqueue) {
1595                         // INCOMPLETE. Some descriptors don't want to be readable.
1596                         assert (kqfd != -1);
1597                         struct kevent k;
1598                         EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed);
1599                         int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1600                         assert (t == 0);
1601                 }
1602                 */
1603                 #endif
1604
1605                 Descriptors.push_back (ed);
1606         }
1607         NewDescriptors.clear();
1608 }
1609
1610
1611 /**********************************
1612 EventMachine_t::_ModifyDescriptors
1613 **********************************/
1614
1615 void EventMachine_t::_ModifyDescriptors()
1616 {
1617         /* For implementations which don't level check every descriptor on
1618          * every pass through the machine, as select does.
1619          * If we're not selecting, then descriptors need a way to signal to the
1620          * machine that their readable or writable status has changed.
1621          * That's what the ::Modify call is for. We do it this way to avoid
1622          * modifying descriptors during the loop traversal, where it can easily
1623          * happen that an object (like a UDP socket) gets data written on it by
1624          * the application during #post_init. That would take place BEFORE the
1625          * descriptor even gets added to the epoll descriptor, so the modify
1626          * operation will crash messily.
1627          * Another really messy possibility is for a descriptor to put itself
1628          * on the Modified list, and then get deleted before we get here.
1629          * Remember, deletes happen after the I/O traversal and before the
1630          * next pass through here. So we have to make sure when we delete a
1631          * descriptor to remove it from the Modified list.
1632          */
1633
1634         #ifdef HAVE_EPOLL
1635         if (bEpoll) {
1636                 set<EventableDescriptor*>::iterator i = ModifiedDescriptors.begin();
1637                 while (i != ModifiedDescriptors.end()) {
1638                         assert (*i);
1639                         _ModifyEpollEvent (*i);
1640                         ++i;
1641                 }
1642         }
1643         #endif
1644
1645         ModifiedDescriptors.clear();
1646 }
1647
1648
1649 /**********************
1650 EventMachine_t::Modify
1651 **********************/
1652
1653 void EventMachine_t::Modify (EventableDescriptor *ed)
1654 {
1655         if (!ed)
1656                 throw std::runtime_error ("modified bad descriptor");
1657         ModifiedDescriptors.insert (ed);
1658 }
1659
1660
1661 /***********************************
1662 EventMachine_t::_OpenFileForWriting
1663 ***********************************/
1664
1665 const char *EventMachine_t::_OpenFileForWriting (const char *filename)
1666 {
1667   /*
1668          * Return the binding-text of the newly-opened file,
1669          * or NULL if there was a problem.
1670          */
1671
1672         if (!filename || !*filename)
1673                 return NULL;
1674
1675   int fd = open (filename, O_CREAT|O_TRUNC|O_WRONLY|O_NONBLOCK, 0644);
1676  
1677         FileStreamDescriptor *fsd = new FileStreamDescriptor (fd, this);
1678   if (!fsd)
1679         throw std::runtime_error ("no file-stream allocated");
1680   Add (fsd);
1681   return fsd->GetBinding().c_str();
1682
1683 }
1684
1685
1686 /**************************************
1687 EventMachine_t::CreateUnixDomainServer
1688 **************************************/
1689
1690 const char *EventMachine_t::CreateUnixDomainServer (const char *filename)
1691 {
1692         /* Create a UNIX-domain acceptor (server) socket and add it to the event machine.
1693          * Return the binding of the new acceptor to the caller.
1694          * This binding will be referenced when the new acceptor sends events
1695          * to indicate accepted connections.
1696          * THERE IS NO MEANINGFUL IMPLEMENTATION ON WINDOWS.
1697          */
1698
1699         #ifdef OS_WIN32
1700         throw std::runtime_error ("unix-domain server unavailable on this platform");
1701         #endif
1702
1703         // The whole rest of this function is only compiled on Unix systems.
1704         #ifdef OS_UNIX
1705         const char *output_binding = NULL;
1706
1707         struct sockaddr_un s_sun;
1708
1709         int sd_accept = socket (AF_LOCAL, SOCK_STREAM, 0);
1710         if (sd_accept == INVALID_SOCKET) {
1711                 goto fail;
1712         }
1713
1714         if (!filename || !*filename)
1715                 goto fail;
1716         unlink (filename);
1717
1718         bzero (&s_sun, sizeof(s_sun));
1719         s_sun.sun_family = AF_LOCAL;
1720         strncpy (s_sun.sun_path, filename, sizeof(s_sun.sun_path)-1);
1721
1722         // don't bother with reuseaddr for a local socket.
1723
1724         { // set CLOEXEC. Only makes sense on Unix
1725                 #ifdef OS_UNIX
1726                 int cloexec = fcntl (sd_accept, F_GETFD, 0);
1727                 assert (cloexec >= 0);
1728                 cloexec |= FD_CLOEXEC;
1729                 fcntl (sd_accept, F_SETFD, cloexec);
1730                 #endif
1731         }
1732
1733         if (bind (sd_accept, (struct sockaddr*)&s_sun, sizeof(s_sun))) {
1734                 //__warning ("binding failed");
1735                 goto fail;
1736         }
1737
1738         if (listen (sd_accept, 100)) {
1739                 //__warning ("listen failed");
1740                 goto fail;
1741         }
1742
1743         {
1744                 // Set the acceptor non-blocking.
1745                 // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
1746                 if (!SetSocketNonblocking (sd_accept)) {
1747                 //int val = fcntl (sd_accept, F_GETFL, 0);
1748                 //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
1749                         goto fail;
1750                 }
1751         }
1752
1753         { // Looking good.
1754                 AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
1755                 if (!ad)
1756                         throw std::runtime_error ("unable to allocate acceptor");
1757                 Add (ad);
1758                 output_binding = ad->GetBinding().c_str();
1759         }
1760
1761         return output_binding;
1762
1763         fail:
1764         if (sd_accept != INVALID_SOCKET)
1765                 closesocket (sd_accept);
1766         return NULL;
1767         #endif // OS_UNIX
1768 }
1769
1770
1771 /*********************
1772 EventMachine_t::Popen
1773 *********************/
1774 #if OBSOLETE
1775 const char *EventMachine_t::Popen (const char *cmd, const char *mode)
1776 {
1777         #ifdef OS_WIN32
1778         throw std::runtime_error ("popen is currently unavailable on this platform");
1779         #endif
1780
1781         // The whole rest of this function is only compiled on Unix systems.
1782         // Eventually we need this functionality (or a full-duplex equivalent) on Windows.
1783         #ifdef OS_UNIX
1784         const char *output_binding = NULL;
1785
1786         FILE *fp = popen (cmd, mode);
1787         if (!fp)
1788                 return NULL;
1789
1790         // From here, all early returns must pclose the stream.
1791
1792         // According to the pipe(2) manpage, descriptors returned from pipe have both
1793         // CLOEXEC and NONBLOCK clear. Do NOT set CLOEXEC. DO set nonblocking.
1794         if (!SetSocketNonblocking (fileno (fp))) {
1795                 pclose (fp);
1796                 return NULL;
1797         }
1798
1799         { // Looking good.
1800                 PipeDescriptor *pd = new PipeDescriptor (fp, this);
1801                 if (!pd)
1802                         throw std::runtime_error ("unable to allocate pipe");
1803                 Add (pd);
1804                 output_binding = pd->GetBinding().c_str();
1805         }
1806
1807         return output_binding;
1808         #endif
1809 }
1810 #endif // OBSOLETE
1811
1812 /**************************
1813 EventMachine_t::Socketpair
1814 **************************/
1815
1816 const char *EventMachine_t::Socketpair (char * const*cmd_strings)
1817 {
1818         #ifdef OS_WIN32
1819         throw std::runtime_error ("socketpair is currently unavailable on this platform");
1820         #endif
1821
1822         // The whole rest of this function is only compiled on Unix systems.
1823         // Eventually we need this functionality (or a full-duplex equivalent) on Windows.
1824         #ifdef OS_UNIX
1825         // Make sure the incoming array of command strings is sane.
1826         if (!cmd_strings)
1827                 return NULL;
1828         int j;
1829         for (j=0; j < 100 && cmd_strings[j]; j++)
1830                 ;
1831         if ((j==0) || (j==100))
1832                 return NULL;
1833
1834         const char *output_binding = NULL;
1835
1836         int sv[2];
1837         if (socketpair (AF_LOCAL, SOCK_STREAM, 0, sv) < 0)
1838                 return NULL;
1839         // from here, all early returns must close the pair of sockets.
1840
1841         // Set the parent side of the socketpair nonblocking.
1842         // We don't care about the child side, and most child processes will expect their
1843         // stdout to be blocking. Thanks to Duane Johnson and Bill Kelly for pointing this out.
1844         // Obviously DON'T set CLOEXEC.
1845         if (!SetSocketNonblocking (sv[0])) {
1846                 close (sv[0]);
1847                 close (sv[1]);
1848                 return NULL;
1849         }
1850
1851         pid_t f = fork();
1852         if (f > 0) {
1853                 close (sv[1]);
1854                 PipeDescriptor *pd = new PipeDescriptor (sv[0], f, this);
1855                 if (!pd)
1856                         throw std::runtime_error ("unable to allocate pipe");
1857                 Add (pd);
1858                 output_binding = pd->GetBinding().c_str();
1859         }
1860         else if (f == 0) {
1861                 close (sv[0]);
1862                 dup2 (sv[1], STDIN_FILENO);
1863                 close (sv[1]);
1864                 dup2 (STDIN_FILENO, STDOUT_FILENO);
1865                 execvp (cmd_strings[0], cmd_strings+1);
1866                 exit (-1); // end the child process if the exec doesn't work.
1867         }
1868         else
1869                 throw std::runtime_error ("no fork");
1870
1871         return output_binding;
1872         #endif
1873 }
1874
1875
1876 /****************************
1877 EventMachine_t::OpenKeyboard
1878 ****************************/
1879
1880 const char *EventMachine_t::OpenKeyboard()
1881 {
1882         KeyboardDescriptor *kd = new KeyboardDescriptor (this);
1883         if (!kd)
1884                 throw std::runtime_error ("no keyboard-object allocated");
1885         Add (kd);
1886         return kd->GetBinding().c_str();
1887 }
1888
1889
1890
1891
1892
1893 //#endif // OS_UNIX
1894
Note: See TracBrowser for help on using the browser.