root/trunk/ext/em.cpp

Revision 798, 50.9 kB (checked in by francis, 8 months ago)

fixed a variable redefinition problem that was incorrect C++ in windows vc6.

  • Property svn:keywords set to Id
Line 
1 /*****************************************************************************
2
3 $Id$
4
5 File:     em.cpp
6 Date:     06Apr06
7
8 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
9 Gmail: blackhedd
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of either: 1) the GNU General Public License
13 as published by the Free Software Foundation; either version 2 of the
14 License, or (at your option) any later version; or 2) Ruby's License.
15
16 See the file COPYING for complete licensing information.
17
18 *****************************************************************************/
19
20 // THIS ENTIRE FILE WILL EVENTUALLY BE FOR UNIX BUILDS ONLY.
21 //#ifdef OS_UNIX
22
23
24 #include "project.h"
25
26 // Keep a global variable floating around
27 // with the current loop time as set by the Event Machine.
28 // This avoids the need for frequent expensive calls to time(NULL);
29 time_t gCurrentLoopTime;
30
31 #ifdef OS_WIN32
32 unsigned gTickCountTickover;
33 unsigned gLastTickCount;
34 #endif
35
36
37 /* The numer of max outstanding timers was once a const enum defined in em.h.
38  * Now we define it here so that users can change its value if necessary.
39  */
40 static int MaxOutstandingTimers = 1000;
41
42
43 /* Internal helper to convert strings to internet addresses. IPv6-aware.
44  * Not reentrant or threadsafe, optimized for speed.
45  */
46 static struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size);
47
48
49 /***************************************
50 STATIC EventMachine_t::SetMaxTimerCount
51 ***************************************/
52
53 void EventMachine_t::SetMaxTimerCount (int count)
54 {
55         /* Allow a user to increase the maximum number of outstanding timers.
56          * If this gets "too high" (a metric that is of course platform dependent),
57          * bad things will happen like performance problems and possible overuse
58          * of memory.
59          * The actual timer mechanism is very efficient so it's hard to know what
60          * the practical max, but 100,000 shouldn't be too problematical.
61          */
62         if (count < 100)
63                 count = 100;
64         MaxOutstandingTimers = count;
65 }
66
67
68
69 /******************************
70 EventMachine_t::EventMachine_t
71 ******************************/
72
73 EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const char*, int)):
74         EventCallback (event_callback),
75         NextHeartbeatTime (0),
76         LoopBreakerReader (-1),
77         LoopBreakerWriter (-1),
78         bEpoll (false),
79         bKqueue (false),
80         epfd (-1)
81 {
82         // Default time-slice is just smaller than one hundred mills.
83         Quantum.tv_sec = 0;
84         Quantum.tv_usec = 90000;
85
86         gTerminateSignalReceived = false;
87         // Make sure the current loop time is sane, in case we do any initializations of
88         // objects before we start running.
89         gCurrentLoopTime = time(NULL);
90
91         /* We initialize the network library here (only on Windows of course)
92          * and initialize "loop breakers." Our destructor also does some network-level
93          * cleanup. There's thus an implicit assumption that any given instance of EventMachine_t
94          * will only call ::Run once. Is that a good assumption? Should we move some of these
95          * inits and de-inits into ::Run?
96          */
97         #ifdef OS_WIN32
98         WSADATA w;
99         WSAStartup (MAKEWORD (1, 1), &w);
100         #endif
101
102         _InitializeLoopBreaker();
103 }
104
105
106 /*******************************
107 EventMachine_t::~EventMachine_t
108 *******************************/
109
110 EventMachine_t::~EventMachine_t()
111 {
112         // Run down descriptors
113         size_t i;
114         for (i = 0; i < NewDescriptors.size(); i++)
115                 delete NewDescriptors[i];
116         for (i = 0; i < Descriptors.size(); i++)
117                 delete Descriptors[i];
118
119         close (LoopBreakerReader);
120         close (LoopBreakerWriter);
121
122         if (epfd != -1)
123                 close (epfd);
124         if (kqfd != -1)
125                 close (kqfd);
126 }
127
128
129 /*************************
130 EventMachine_t::_UseEpoll
131 *************************/
132
133 void EventMachine_t::_UseEpoll()
134 {
135         /* Temporary.
136          * Use an internal flag to switch in epoll-based functionality until we determine
137          * how it should be integrated properly and the extent of the required changes.
138          * A permanent solution needs to allow the integration of additional technologies,
139          * like kqueue and Solaris's events.
140          */
141
142         #ifdef HAVE_EPOLL
143         bEpoll = true;
144         #endif
145 }
146
147 /**************************
148 EventMachine_t::_UseKqueue
149 **************************/
150
151 void EventMachine_t::_UseKqueue()
152 {
153         /* Temporary.
154          * See comments under _UseEpoll.
155          */
156
157         #ifdef HAVE_KQUEUE
158         bKqueue = true;
159         #endif
160 }
161
162
163 /****************************
164 EventMachine_t::ScheduleHalt
165 ****************************/
166
167 void EventMachine_t::ScheduleHalt()
168 {
169   /* This is how we stop the machine.
170    * This can be called by clients. Signal handlers will probably
171    * set the global flag.
172    * For now this means there can only be one EventMachine ever running at a time.
173    *
174    * IMPORTANT: keep this light, fast, and async-safe. Don't do anything frisky in here,
175    * because it may be called from signal handlers invoked from code that we don't
176    * control. At this writing (20Sep06), EM does NOT install any signal handlers of
177    * its own.
178    *
179    * We need a FAQ. And one of the questions is: how do I stop EM when Ctrl-C happens?
180    * The answer is to call evma_stop_machine, which calls here, from a SIGINT handler.
181    */
182         gTerminateSignalReceived = true;
183 }
184
185
186
187 /*******************************
188 EventMachine_t::SetTimerQuantum
189 *******************************/
190
191 void EventMachine_t::SetTimerQuantum (int interval)
192 {
193         /* We get a timer-quantum expressed in milliseconds.
194          * Don't set a quantum smaller than 5 or larger than 2500.
195          */
196
197         if ((interval < 5) || (interval > 2500))
198                 throw std::runtime_error ("invalid timer-quantum");
199
200         Quantum.tv_sec = interval / 1000;
201         Quantum.tv_usec = (interval % 1000) * 1000;
202 }
203
204
205 /*************************************
206 (STATIC) EventMachine_t::SetuidString
207 *************************************/
208
209 void EventMachine_t::SetuidString (const char *username)
210 {
211     /* This method takes a caller-supplied username and tries to setuid
212      * to that user. There is no meaningful implementation (and no error)
213      * on Windows. On Unix, a failure to setuid the caller-supplied string
214      * causes a fatal abort, because presumably the program is calling here
215      * in order to fulfill a security requirement. If we fail silently,
216      * the user may continue to run with too much privilege.
217      *
218      * TODO, we need to decide on and document a way of generating C++ level errors
219      * that can be wrapped in documented Ruby exceptions, so users can catch
220      * and handle them. And distinguish it from errors that we WON'T let the Ruby
221      * user catch (like security-violations and resource-overallocation).
222      * A setuid failure here would be in the latter category.
223      */
224
225     #ifdef OS_UNIX
226     if (!username || !*username)
227         throw std::runtime_error ("setuid_string failed: no username specified");
228
229     struct passwd *p = getpwnam (username);
230     if (!p)
231         throw std::runtime_error ("setuid_string failed: unknown username");
232
233     if (setuid (p->pw_uid) != 0)
234         throw std::runtime_error ("setuid_string failed: no setuid");
235
236     // Success.
237     #endif
238 }
239
240
241 /****************************************
242 (STATIC) EventMachine_t::SetRlimitNofile
243 ****************************************/
244
245 int EventMachine_t::SetRlimitNofile (int nofiles)
246 {
247         #ifdef OS_UNIX
248         struct rlimit rlim;
249         getrlimit (RLIMIT_NOFILE, &rlim);
250         if (nofiles >= 0) {
251                 rlim.rlim_cur = nofiles;
252                 if (nofiles > rlim.rlim_max)
253                         rlim.rlim_max = nofiles;
254                 setrlimit (RLIMIT_NOFILE, &rlim);
255                 // ignore the error return, for now at least.
256                 // TODO, emit an error message someday when we have proper debug levels.
257         }
258         getrlimit (RLIMIT_NOFILE, &rlim);
259         return rlim.rlim_cur;
260         #endif
261
262         #ifdef OS_WIN32
263         // No meaningful implementation on Windows.
264         return 0;
265         #endif
266 }
267
268
269 /*********************************
270 EventMachine_t::SignalLoopBreaker
271 *********************************/
272
273 void EventMachine_t::SignalLoopBreaker()
274 {
275         #ifdef OS_UNIX
276         write (LoopBreakerWriter, "", 1);
277         #endif
278         #ifdef OS_WIN32
279         sendto (LoopBreakerReader, "", 0, 0, (struct sockaddr*)&(LoopBreakerTarget), sizeof(LoopBreakerTarget));
280         #endif
281 }
282
283
284 /**************************************
285 EventMachine_t::_InitializeLoopBreaker
286 **************************************/
287
288 void EventMachine_t::_InitializeLoopBreaker()
289 {
290         /* A "loop-breaker" is a socket-descriptor that we can write to in order
291          * to break the main select loop. Primarily useful for things running on
292          * threads other than the main EM thread, so they can trigger processing
293          * of events that arise exogenously to the EM.
294          * Keep the loop-breaker pipe out of the main descriptor set, otherwise
295          * its events will get passed on to user code.
296          */
297
298         #ifdef OS_UNIX
299         int fd[2];
300         if (pipe (fd))
301                 throw std::runtime_error ("no loop breaker");
302
303         LoopBreakerWriter = fd[1];
304         LoopBreakerReader = fd[0];
305         #endif
306
307         #ifdef OS_WIN32
308         int sd = socket (AF_INET, SOCK_DGRAM, 0);
309         if (sd == INVALID_SOCKET)
310                 throw std::runtime_error ("no loop breaker socket");
311         SetSocketNonblocking (sd);
312
313         memset (&LoopBreakerTarget, 0, sizeof(LoopBreakerTarget));
314         LoopBreakerTarget.sin_family = AF_INET;
315         LoopBreakerTarget.sin_addr.s_addr = inet_addr ("127.0.0.1");
316
317         srand ((int)time(NULL));
318         int i;
319         for (i=0; i < 100; i++) {
320                 int r = (rand() % 10000) + 20000;
321                 LoopBreakerTarget.sin_port = htons (r);
322                 if (bind (sd, (struct sockaddr*)&LoopBreakerTarget, sizeof(LoopBreakerTarget)) == 0)
323                         break;
324         }
325
326         if (i == 100)
327                 throw std::runtime_error ("no loop breaker");
328         LoopBreakerReader = sd;
329         #endif
330 }
331
332
333 /*******************
334 EventMachine_t::Run
335 *******************/
336
337 void EventMachine_t::Run()
338 {
339         #ifdef OS_WIN32
340         HookControlC (true);
341         #endif
342
343         #ifdef HAVE_EPOLL
344         if (bEpoll) {
345                 epfd = epoll_create (MaxEpollDescriptors);
346                 if (epfd == -1) {
347                         char buf[200];
348                         snprintf (buf, sizeof(buf)-1, "unable to create epoll descriptor: %s", strerror(errno));
349                         throw std::runtime_error (buf);
350                 }
351                 int cloexec = fcntl (epfd, F_GETFD, 0);
352                 assert (cloexec >= 0);
353                 cloexec |= FD_CLOEXEC;
354                 fcntl (epfd, F_SETFD, cloexec);
355
356                 assert (LoopBreakerReader >= 0);
357                 LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this);
358                 assert (ld);
359                 Add (ld);
360         }
361         #endif
362
363         #ifdef HAVE_KQUEUE
364         if (bKqueue) {
365                 kqfd = kqueue();
366                 if (kqfd == -1) {
367                         char buf[200];
368                         snprintf (buf, sizeof(buf)-1, "unable to create kqueue descriptor: %s", strerror(errno));
369                         throw std::runtime_error (buf);
370                 }
371                 // cloexec not needed. By definition, kqueues are not carried across forks.
372
373                 assert (LoopBreakerReader >= 0);
374                 LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this);
375                 assert (ld);
376                 Add (ld);
377         }
378         #endif
379
380         while (true) {
381                 gCurrentLoopTime = time(NULL);
382                 if (!_RunTimers())
383                         break;
384
385                 /* _Add must precede _Modify because the same descriptor might
386                  * be on both lists during the same pass through the machine,
387                  * and to modify a descriptor before adding it would fail.
388                  */
389                 _AddNewDescriptors();
390                 _ModifyDescriptors();
391
392                 if (!_RunOnce())
393                         break;
394                 if (gTerminateSignalReceived)
395                         break;
396         }
397
398         #ifdef OS_WIN32
399         HookControlC (false);
400         #endif
401 }
402
403
404 /************************
405 EventMachine_t::_RunOnce
406 ************************/
407
408 bool EventMachine_t::_RunOnce()
409 {
410         if (bEpoll)
411                 return _RunEpollOnce();
412         else if (bKqueue)
413                 return _RunKqueueOnce();
414         else
415                 return _RunSelectOnce();
416 }
417
418
419
420 /*****************************
421 EventMachine_t::_RunEpollOnce
422 *****************************/
423
424 bool EventMachine_t::_RunEpollOnce()
425 {
426         #ifdef HAVE_EPOLL
427         assert (epfd != -1);
428         struct epoll_event ev [MaxEpollDescriptors];
429         int s = epoll_wait (epfd, ev, MaxEpollDescriptors, 50);
430         if (s > 0) {
431                 for (int i=0; i < s; i++) {
432                         EventableDescriptor *ed = (EventableDescriptor*) ev[i].data.ptr;
433
434                         if (ev[i].events & (EPOLLERR | EPOLLHUP))
435                                 ed->ScheduleClose (false);
436                         if (ev[i].events & EPOLLIN)
437                                 ed->Read();
438                         if (ev[i].events & EPOLLOUT) {
439                                 ed->Write();
440                                 epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent());
441                                 // Ignoring return value
442                         }
443                 }
444         }
445         else if (s < 0) {
446                 // epoll_wait can fail on error in a handful of ways.
447                 // If this happens, then wait for a little while to avoid busy-looping.
448                 // If the error was EINTR, we probably caught SIGCHLD or something,
449                 // so keep the wait short.
450                 timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000};
451                 EmSelect (0, NULL, NULL, NULL, &tv);
452         }
453
454         { // cleanup dying sockets
455                 // vector::pop_back works in constant time.
456                 // TODO, rip this out and only delete the descriptors we know have died,
457                 // rather than traversing the whole list.
458                 //  Modified 05Jan08 per suggestions by Chris Heath. It's possible that
459                 //  an EventableDescriptor will have a descriptor value of -1. That will
460                 //  happen if EventableDescriptor::Close was called on it. In that case,
461                 //  don't call epoll_ctl to remove the socket's filters from the epoll set.
462                 //  According to the epoll docs, this happens automatically when the
463                 //  descriptor is closed anyway. This is different from the case where
464                 //  the socket has already been closed but the descriptor in the ED object
465                 //  hasn't yet been set to INVALID_SOCKET.
466                 int i, j;
467                 int nSockets = Descriptors.size();
468                 for (i=0, j=0; i < nSockets; i++) {
469                         EventableDescriptor *ed = Descriptors[i];
470                         assert (ed);
471                         if (ed->ShouldDelete()) {
472                                 if (ed->GetSocket() != INVALID_SOCKET) {
473                                         assert (bEpoll); // wouldn't be in this method otherwise.
474                                         assert (epfd != -1);
475                                         int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent());
476                                         // ENOENT or EBADF are not errors because the socket may be already closed when we get here.
477                                         if (e && (errno != ENOENT) && (errno != EBADF)) {
478                                                 char buf [200];
479                                                 snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno));
480                                                 throw std::runtime_error (buf);
481                                         }
482                                 }
483
484                                 ModifiedDescriptors.erase (ed);
485                                 delete ed;
486                         }
487                         else
488                                 Descriptors [j++] = ed;
489                 }
490                 while ((size_t)j < Descriptors.size())
491                         Descriptors.pop_back();
492
493         }
494
495         // TODO, heartbeats.
496         // Added 14Sep07, its absence was noted by Brian Candler. But the comment was here, indicated
497         // that this got thought about and not done when EPOLL was originally written. Was there a reason
498         // not to do it, or was it an oversight? Certainly, running a heartbeat on 50,000 connections every
499         // two seconds can get to be a real bear, especially if all we're doing is timing out dead ones.
500         // Maybe there's a better way to do this. (Or maybe it's not that expensive after all.)
501         //
502         { // dispatch heartbeats
503                 if (gCurrentLoopTime >= NextHeartbeatTime) {
504                         NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
505
506                         for (int i=0; i < Descriptors.size(); i++) {
507                                 EventableDescriptor *ed = Descriptors[i];
508                                 assert (ed);
509                                 ed->Heartbeat();
510                         }
511                 }
512         }
513
514         timeval tv = {0,0};
515         EmSelect (0, NULL, NULL, NULL, &tv);
516
517         return true;
518         #else
519         throw std::runtime_error ("epoll is not implemented on this platform");
520         #endif
521 }
522
523
524 /******************************
525 EventMachine_t::_RunKqueueOnce
526 ******************************/
527
528 bool EventMachine_t::_RunKqueueOnce()
529 {
530         #ifdef HAVE_KQUEUE
531         assert (kqfd != -1);
532         const int maxKevents = 2000;
533         struct kevent Karray [maxKevents];
534         struct timespec ts = {0, 10000000}; // Too frequent. Use blocking_region
535
536         int k = kevent (kqfd, NULL, 0, Karray, maxKevents, &ts);
537         struct kevent *ke = Karray;
538         while (k > 0) {
539                 EventableDescriptor *ed = (EventableDescriptor*) (ke->udata);
540                 assert (ed);
541
542                 if (ke->filter == EVFILT_READ)
543                         ed->Read();
544                 else if (ke->filter == EVFILT_WRITE)
545                         ed->Write();
546                 else
547                         cerr << "Discarding unknown kqueue event " << ke->filter << endl;
548
549                 --k;
550                 ++ke;
551         }
552
553         { // cleanup dying sockets
554                 // vector::pop_back works in constant time.
555                 // TODO, rip this out and only delete the descriptors we know have died,
556                 // rather than traversing the whole list.
557                 // In kqueue, closing a descriptor automatically removes its event filters.
558
559                 int i, j;
560                 int nSockets = Descriptors.size();
561                 for (i=0, j=0; i < nSockets; i++) {
562                         EventableDescriptor *ed = Descriptors[i];
563                         assert (ed);
564                         if (ed->ShouldDelete()) {
565                                 ModifiedDescriptors.erase (ed);
566                                 delete ed;
567                         }
568                         else
569                                 Descriptors [j++] = ed;
570                 }
571                 while ((size_t)j < Descriptors.size())
572                         Descriptors.pop_back();
573
574         }
575
576         { // dispatch heartbeats
577                 if (gCurrentLoopTime >= NextHeartbeatTime) {
578                         NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
579
580                         for (int i=0; i < Descriptors.size(); i++) {
581                                 EventableDescriptor *ed = Descriptors[i];
582                                 assert (ed);
583                                 ed->Heartbeat();
584                         }
585                 }
586         }
587
588
589         // TODO, replace this with rb_thread_blocking_region for 1.9 builds.
590         timeval tv = {0,0};
591         EmSelect (0, NULL, NULL, NULL, &tv);
592
593         return true;
594         #else
595         throw std::runtime_error ("kqueue is not implemented on this platform");
596         #endif
597 }
598
599
600 /*********************************
601 EventMachine_t::_ModifyEpollEvent
602 *********************************/
603
604 void EventMachine_t::_ModifyEpollEvent (EventableDescriptor *ed)
605 {
606         #ifdef HAVE_EPOLL
607         if (bEpoll) {
608                 assert (epfd != -1);
609                 assert (ed);
610                 int e = epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent());
611                 if (e) {
612                         char buf [200];
613                         snprintf (buf, sizeof(buf)-1, "unable to modify epoll event: %s", strerror(errno));
614                         throw std::runtime_error (buf);
615                 }
616         }
617         #endif
618 }
619
620
621
622 /**************************
623 SelectData_t::SelectData_t
624 **************************/
625
626 SelectData_t::SelectData_t()
627 {
628         maxsocket = 0;
629         FD_ZERO (&fdreads);
630         FD_ZERO (&fdwrites);
631 }
632
633
634 #ifdef BUILD_FOR_RUBY
635 /*****************
636 _SelectDataSelect
637 *****************/
638
639 #ifdef HAVE_TBR
640 static VALUE _SelectDataSelect (void *v)
641 {
642         SelectData_t *sd = (SelectData_t*)v;
643         sd->nSockets = select (sd->maxsocket+1, &(sd->fdreads), &(sd->fdwrites), NULL, &(sd->tv));
644         return Qnil;
645 }
646 #endif
647
648 /*********************
649 SelectData_t::_Select
650 *********************/
651
652 int SelectData_t::_Select()
653 {
654         #ifdef HAVE_TBR
655         rb_thread_blocking_region (_SelectDataSelect, (void*)this, RB_UBF_DFL, 0);
656         return nSockets;
657         #endif
658
659         #ifndef HAVE_TBR
660         return EmSelect (maxsocket+1, &fdreads, &fdwrites, NULL, &tv);
661         #endif
662 }
663 #endif
664
665
666
667 /******************************
668 EventMachine_t::_RunSelectOnce
669 ******************************/
670
671 bool EventMachine_t::_RunSelectOnce()
672 {
673         // Crank the event machine once.
674         // If there are no descriptors to process, then sleep
675         // for a few hundred mills to avoid busy-looping.
676         // Return T/F to indicate whether we should continue.
677         // This is based on a select loop. Alternately provide epoll
678         // if we know we're running on a 2.6 kernel.
679         // epoll will be effective if we provide it as an alternative,
680         // however it has the same problem interoperating with Ruby
681         // threads that select does.
682
683         //cerr << "X";
684
685         /* This protection is now obsolete, because we will ALWAYS
686          * have at least one descriptor (the loop-breaker) to read.
687          */
688         /*
689         if (Descriptors.size() == 0) {
690                 #ifdef OS_UNIX
691                 timeval tv = {0, 200 * 1000};
692                 EmSelect (0, NULL, NULL, NULL, &tv);
693                 return true;
694                 #endif
695                 #ifdef OS_WIN32
696                 Sleep (200);
697                 return true;
698                 #endif
699         }
700         */
701
702         SelectData_t SelectData;
703         /*
704         fd_set fdreads, fdwrites;
705         FD_ZERO (&fdreads);
706         FD_ZERO (&fdwrites);
707
708         int maxsocket = 0;
709         */
710
711         // Always read the loop-breaker reader.
712         // Changed 23Aug06, provisionally implemented for Windows with a UDP socket
713         // running on localhost with a randomly-chosen port. (*Puke*)
714         // Windows has a version of the Unix pipe() library function, but it doesn't
715         // give you back descriptors that are selectable.
716         FD_SET (LoopBreakerReader, &(SelectData.fdreads));
717         if (SelectData.maxsocket < LoopBreakerReader)
718                 SelectData.maxsocket = LoopBreakerReader;
719
720         // prepare the sockets for reading and writing
721         size_t i;
722         for (i = 0; i < Descriptors.size(); i++) {
723                 EventableDescriptor *ed = Descriptors[i];
724                 assert (ed);
725                 int sd = ed->GetSocket();
726                 assert (sd != INVALID_SOCKET);
727
728                 if (ed->SelectForRead())
729                         FD_SET (sd, &(SelectData.fdreads));
730                 if (ed->SelectForWrite())
731                         FD_SET (sd, &(SelectData.fdwrites));
732
733                 if (SelectData.maxsocket < sd)
734                         SelectData.maxsocket = sd;
735         }
736
737
738         { // read and write the sockets
739                 //timeval tv = {1, 0}; // Solaris fails if the microseconds member is >= 1000000.
740                 //timeval tv = Quantum;
741                 SelectData.tv = Quantum;
742                 int s = SelectData._Select();
743                 //rb_thread_blocking_region(xxx,(void*)&SelectData,RB_UBF_DFL,0);
744                 //int s = EmSelect (SelectData.maxsocket+1, &(SelectData.fdreads), &(SelectData.fdwrites), NULL, &(SelectData.tv));
745                 //int s = SelectData.nSockets;
746                 if (s > 0) {
747                         /* Changed 01Jun07. We used to handle the Loop-breaker right here.
748                          * Now we do it AFTER all the regular descriptors. There's an
749                          * incredibly important and subtle reason for this. Code on
750                          * loop breakers is sometimes used to cause the reactor core to
751                          * cycle (for example, to allow outbound network buffers to drain).
752                          * If a loop-breaker handler reschedules itself (say, after determining
753                          * that the write buffers are still too full), then it will execute
754                          * IMMEDIATELY if _ReadLoopBreaker is done here instead of after
755                          * the other descriptors are processed. That defeats the whole purpose.
756                          */
757                         for (i=0; i < Descriptors.size(); i++) {
758                                 EventableDescriptor *ed = Descriptors[i];
759                                 assert (ed);
760                                 int sd = ed->GetSocket();
761                                 assert (sd != INVALID_SOCKET);
762
763                                 if (FD_ISSET (sd, &(SelectData.fdwrites)))
764                                         ed->Write();
765                                 if (FD_ISSET (sd, &(SelectData.fdreads)))
766                                         ed->Read();
767                         }
768
769                         if (FD_ISSET (LoopBreakerReader, &(SelectData.fdreads)))
770                                 _ReadLoopBreaker();
771                 }
772                 else if (s < 0) {
773                         // select can fail on error in a handful of ways.
774                         // If this happens, then wait for a little while to avoid busy-looping.
775                         // If the error was EINTR, we probably caught SIGCHLD or something,
776                         // so keep the wait short.
777                         timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000};
778                         EmSelect (0, NULL, NULL, NULL, &tv);
779                 }
780         }
781
782
783         { // dispatch heartbeats
784                 if (gCurrentLoopTime >= NextHeartbeatTime) {
785                         NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
786
787                         for (i=0; i < Descriptors.size(); i++) {
788                                 EventableDescriptor *ed = Descriptors[i];
789                                 assert (ed);
790                                 ed->Heartbeat();
791                         }
792                 }
793         }
794
795         { // cleanup dying sockets
796                 // vector::pop_back works in constant time.
797                 int i, j;
798                 int nSockets = Descriptors.size();
799                 for (i=0, j=0; i < nSockets; i++) {
800                         EventableDescriptor *ed = Descriptors[i];
801                         assert (ed);
802                         if (ed->ShouldDelete())
803                                 delete ed;
804                         else
805                                 Descriptors [j++] = ed;
806                 }
807                 while ((size_t)j < Descriptors.size())
808                         Descriptors.pop_back();
809
810         }
811
812         return true;
813 }
814
815
816 /********************************
817 EventMachine_t::_ReadLoopBreaker
818 ********************************/
819
820 void EventMachine_t::_ReadLoopBreaker()
821 {
822         /* The loop breaker has selected readable.
823          * Read it ONCE (it may block if we try to read it twice)
824          * and send a loop-break event back to user code.
825          */
826         char buffer [1024];
827         read (LoopBreakerReader, buffer, sizeof(buffer));
828         if (EventCallback)
829                 (*EventCallback)("", EM_LOOPBREAK_SIGNAL, "", 0);
830 }
831
832
833 /**************************
834 EventMachine_t::_RunTimers
835 **************************/
836
837 bool EventMachine_t::_RunTimers()
838 {
839         // These are caller-defined timer handlers.
840         // Return T/F to indicate whether we should continue the main loop.
841         // We rely on the fact that multimaps sort by their keys to avoid
842         // inspecting the whole list every time we come here.
843         // Just keep inspecting and processing the list head until we hit
844         // one that hasn't expired yet.
845
846         #ifdef OS_UNIX
847         struct timeval tv;
848         gettimeofday (&tv, NULL);
849         Int64 now = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec));
850         #endif
851
852         #ifdef OS_WIN32
853         unsigned tick = GetTickCount();
854         if (tick < gLastTickCount)
855                 gTickCountTickover += 1;
856         gLastTickCount = tick;
857         Int64 now = ((Int64)gTickCountTickover << 32) + (Int64)tick;
858         #endif
859
860         while (true) {
861                 multimap<Int64,Timer_t>::iterator i = Timers.begin();
862                 if (i == Timers.end())
863                         break;
864                 if (i->first > now)
865                         break;
866                 if (EventCallback)
867                         (*EventCallback) ("", EM_TIMER_FIRED, i->second.GetBinding().c_str(), i->second.GetBinding().length());
868                 Timers.erase (i);
869         }
870         return true;
871 }
872
873
874
875 /***********************************
876 EventMachine_t::InstallOneshotTimer
877 ***********************************/
878
879 const char *EventMachine_t::InstallOneshotTimer (int milliseconds)
880 {
881         if (Timers.size() > MaxOutstandingTimers)
882                 return false;
883         // Don't use the global loop-time variable here, because we might
884         // get called before the main event machine is running.
885
886         #ifdef OS_UNIX
887         struct timeval tv;
888         gettimeofday (&tv, NULL);
889         Int64 fire_at = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec));
890         fire_at += ((Int64)milliseconds) * 1000LL;
891         #endif
892
893         #ifdef OS_WIN32
894         unsigned tick = GetTickCount();
895         if (tick < gLastTickCount)
896                 gTickCountTickover += 1;
897         gLastTickCount = tick;
898
899         Int64 fire_at = ((Int64)gTickCountTickover << 32) + (Int64)tick;
900         fire_at += (Int64)milliseconds;
901         #endif
902
903         Timer_t t;
904         multimap<Int64,Timer_t>::iterator i =
905                 Timers.insert (make_pair (fire_at, t));
906         return i->second.GetBindingChars();
907 }
908
909
910 /*******************************
911 EventMachine_t::ConnectToServer
912 *******************************/
913
914 const char *EventMachine_t::ConnectToServer (const char *server, int port)
915 {
916         /* We want to spend no more than a few seconds waiting for a connection
917          * to a remote host. So we use a nonblocking connect.
918          * Linux disobeys the usual rules for nonblocking connects.
919          * Per Stevens (UNP p.410), you expect a nonblocking connect to select
920          * both readable and writable on error, and not to return EINPROGRESS
921          * if the connect can be fulfilled immediately. Linux violates both
922          * of these expectations.
923          * Any kind of nonblocking connect on Linux returns EINPROGRESS.
924          * The socket will then return writable when the disposition of the
925          * connect is known, but it will not also be readable in case of
926          * error! Weirdly, it will be readable in case there is data to read!!!
927          * (Which can happen with protocols like SSH and SMTP.)
928          * I suppose if you were so inclined you could consider this logical,
929          * but it's not the way Unix has historically done it.
930          * So we ignore the readable flag and read getsockopt to see if there
931          * was an error connecting. A select timeout works as expected.
932          * In regard to getsockopt: Linux does the Berkeley-style thing,
933          * not the Solaris-style, and returns zero with the error code in
934          * the error parameter.
935          * Return the binding-text of the newly-created pending connection,
936          * or NULL if there was a problem.
937          */
938
939         if (!server || !*server || !port)
940                 return NULL;
941
942         int family, bind_size;
943         struct sockaddr *bind_as = name2address (server, port, &family, &bind_size);
944         if (!bind_as)
945                 return NULL;
946
947         int sd = socket (family, SOCK_STREAM, 0);
948         if (sd == INVALID_SOCKET)
949                 return NULL;
950
951         /*
952         sockaddr_in pin;
953         unsigned long HostAddr;
954
955         HostAddr = inet_addr (server);
956         if (HostAddr == INADDR_NONE) {
957                 hostent *hp = gethostbyname ((char*)server); // Windows requires (char*)
958                 if (!hp) {
959                         // TODO: This gives the caller a fatal error. Not good.
960                         // They can respond by catching RuntimeError (blecch).
961                         // Possibly we need to fire an unbind event and provide
962                         // a status code so user code can detect the cause of the
963                         // failure.
964                         return NULL;
965                 }
966                 HostAddr = ((in_addr*)(hp->h_addr))->s_addr;
967         }
968
969         memset (&pin, 0, sizeof(pin));
970         pin.sin_family = AF_INET;
971         pin.sin_addr.s_addr = HostAddr;
972         pin.sin_port = htons (port);
973
974         int sd = socket (AF_INET, SOCK_STREAM, 0);
975         if (sd == INVALID_SOCKET)
976                 return NULL;
977         */
978
979         // From here on, ALL error returns must close the socket.
980         // Set the new socket nonblocking.
981         if (!SetSocketNonblocking (sd)) {
982                 closesocket (sd);
983                 return NULL;
984         }
985         // Disable slow-start (Nagle algorithm).
986         int one = 1;
987         setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one));
988
989         const char *out = NULL;
990
991         #ifdef OS_UNIX
992         //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) {
993         if (connect (sd, bind_as, bind_size) == 0) {
994                 // This is a connect success, which Linux appears
995                 // never to give when the socket is nonblocking,
996                 // even if the connection is intramachine or to
997                 // localhost.
998
999                 /* Changed this branch 08Aug06. Evidently some kernels
1000                  * (FreeBSD for example) will actually return success from
1001                  * a nonblocking connect. This is a pretty simple case,
1002                  * just set up the new connection and clear the pending flag.
1003                  * Thanks to Chris Ochs for helping track this down.
1004                  * This branch never gets taken on Linux or (oddly) OSX.
1005                  * The original behavior was to throw an unimplemented,
1006                  * which the user saw as a fatal exception. Very unfriendly.
1007                  *
1008                  * Tweaked 10Aug06. Even though the connect disposition is
1009                  * known, we still set the connect-pending flag. That way
1010                  * some needed initialization will happen in the ConnectionDescriptor.
1011                  * (To wit, the ConnectionCompleted event gets sent to the client.)
1012                  */
1013                 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1014                 if (!cd)
1015                         throw std::runtime_error ("no connection allocated");
1016                 cd->SetConnectPending (true);
1017                 Add (cd);
1018                 out = cd->GetBinding().c_str();
1019         }
1020         else if (errno == EINPROGRESS) {
1021                 // Errno will generally always be EINPROGRESS, but on Linux
1022                 // we have to look at getsockopt to be sure what really happened.
1023                 int error;
1024                 socklen_t len;
1025                 len = sizeof(error);
1026                 int o = getsockopt (sd, SOL_SOCKET, SO_ERROR, &error, &len);
1027                 if ((o == 0) && (error == 0)) {
1028                         // Here, there's no disposition.
1029                         // Put the connection on the stack and wait for it to complete
1030                         // or time out.
1031                         ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1032                         if (!cd)
1033                                 throw std::runtime_error ("no connection allocated");
1034                         cd->SetConnectPending (true);
1035                         Add (cd);
1036                         out = cd->GetBinding().c_str();
1037                 }
1038                 else {
1039                         /* This could be connection refused or some such thing.
1040                          * We will come here on Linux if a localhost connection fails.
1041                          * Changed 16Jul06: Originally this branch was a no-op, and
1042                          * we'd drop down to the end of the method, close the socket,
1043                          * and return NULL, which would cause the caller to GET A
1044                          * FATAL EXCEPTION. Now we keep the socket around but schedule an
1045                          * immediate close on it, so the caller will get a close-event
1046                          * scheduled on it. This was only an issue for localhost connections
1047                          * to non-listening ports. We may eventually need to revise this
1048                          * revised behavior, in case it causes problems like making it hard
1049                          * for people to know that a failure occurred.
1050                          */
1051                         ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1052                         if (!cd)
1053                                 throw std::runtime_error ("no connection allocated");
1054                         cd->ScheduleClose (false);
1055                         Add (cd);
1056                         out = cd->GetBinding().c_str();
1057                 }
1058         }
1059         else {
1060                 // The error from connect was something other then EINPROGRESS.
1061         }
1062         #endif
1063
1064         #ifdef OS_WIN32
1065         //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) {
1066         if (connect (sd, bind_as, bind_size) == 0) {
1067                 // This is a connect success, which Windows appears
1068                 // never to give when the socket is nonblocking,
1069                 // even if the connection is intramachine or to
1070                 // localhost.
1071                 throw std::runtime_error ("unimplemented");
1072         }
1073         else if (WSAGetLastError() == WSAEWOULDBLOCK) {
1074                 // Here, there's no disposition.
1075                 // Windows appears not to surface refused connections or
1076                 // such stuff at this point.
1077                 // Put the connection on the stack and wait for it to complete
1078                 // or time out.
1079                 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1080                 if (!cd)
1081                         throw std::runtime_error ("no connection allocated");
1082                 cd->SetConnectPending (true);
1083                 Add (cd);
1084                 out = cd->GetBinding().c_str();
1085         }
1086         else {
1087                 // The error from connect was something other then WSAEWOULDBLOCK.
1088         }
1089
1090         #endif
1091
1092         if (out == NULL)
1093                 closesocket (sd);
1094         return out;
1095 }
1096
1097 /***********************************
1098 EventMachine_t::ConnectToUnixServer
1099 ***********************************/
1100
1101 const char *EventMachine_t::ConnectToUnixServer (const char *server)
1102 {
1103         /* Connect to a Unix-domain server, which by definition is running
1104          * on the same host.
1105          * There is no meaningful implementation on Windows.
1106          * There's no need to do a nonblocking connect, since the connection
1107          * is always local and can always be fulfilled immediately.
1108          */
1109
1110         #ifdef OS_WIN32
1111         throw std::runtime_error ("unix-domain connection unavailable on this platform");
1112         return NULL;
1113         #endif
1114
1115         // The whole rest of this function is only compiled on Unix systems.
1116         #ifdef OS_UNIX
1117
1118         const char *out = NULL;
1119
1120         if (!server || !*server)
1121                 return NULL;
1122
1123         sockaddr_un pun;
1124         memset (&pun, 0, sizeof(pun));
1125         pun.sun_family = AF_LOCAL;
1126
1127         // You ordinarily expect the server name field to be at least 1024 bytes long,
1128         // but on Linux it can be MUCH shorter.
1129         if (strlen(server) >= sizeof(pun.sun_path))
1130                 throw std::runtime_error ("unix-domain server name is too long");
1131
1132
1133         strcpy (pun.sun_path, server);
1134
1135         int fd = socket (AF_LOCAL, SOCK_STREAM, 0);
1136         if (fd == INVALID_SOCKET)
1137                 return NULL;
1138
1139         // From here on, ALL error returns must close the socket.
1140         // NOTE: At this point, the socket is still a blocking socket.
1141         if (connect (fd, (struct sockaddr*)&pun, sizeof(pun)) != 0) {
1142                 closesocket (fd);
1143                 return NULL;
1144         }
1145
1146         // Set the newly-connected socket nonblocking.
1147         if (!SetSocketNonblocking (fd)) {
1148                 closesocket (fd);
1149                 return NULL;
1150         }
1151
1152         // Set up a connection descriptor and add it to the event-machine.
1153         // Observe, even though we know the connection status is connect-success,
1154         // we still set the "pending" flag, so some needed initializations take
1155         // place.
1156         ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this);
1157         if (!cd)
1158                 throw std::runtime_error ("no connection allocated");
1159         cd->SetConnectPending (true);
1160         Add (cd);
1161         out = cd->GetBinding().c_str();
1162
1163         if (out == NULL)
1164                 closesocket (fd);
1165
1166         return out;
1167         #endif
1168 }
1169
1170 /************************
1171 EventMachine_t::AttachFD
1172 ************************/
1173
1174 const char *EventMachine_t::AttachFD (int fd, bool notify_readable, bool notify_writable)
1175 {
1176         #ifdef OS_UNIX
1177         if (fcntl(fd, F_GETFL, 0) < 0)
1178                 throw std::runtime_error ("invalid file descriptor");
1179         #endif
1180
1181         #ifdef OS_WIN32
1182         // TODO: add better check for invalid file descriptors (see ioctlsocket or getsockopt)
1183         if (fd == INVALID_SOCKET)
1184                 throw std::runtime_error ("invalid file descriptor");
1185         #endif
1186
1187         {// Check for duplicate descriptors
1188                 size_t i;
1189                 for (i = 0; i < Descriptors.size(); i++) {
1190                         EventableDescriptor *ed = Descriptors[i];
1191                         assert (ed);
1192                         if (ed->GetSocket() == fd)
1193                                 throw std::runtime_error ("adding existing descriptor");
1194                 }
1195
1196                 for (i = 0; i < NewDescriptors.size(); i++) {
1197                         EventableDescriptor *ed = NewDescriptors[i];
1198                         assert (ed);
1199                         if (ed->GetSocket() == fd)
1200                                 throw std::runtime_error ("adding existing new descriptor");
1201                 }
1202         }
1203
1204         ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this);
1205         if (!cd)
1206                 throw std::runtime_error ("no connection allocated");
1207
1208         cd->SetConnectPending (true);
1209         cd->SetNotifyReadable (notify_readable);
1210         cd->SetNotifyWritable (notify_writable);
1211
1212         Add (cd);
1213
1214         const char *out = NULL;
1215         out = cd->GetBinding().c_str();
1216         if (out == NULL)
1217                 closesocket (fd);
1218         return out;
1219 }
1220
1221 /************************
1222 EventMachine_t::DetachFD
1223 ************************/
1224
1225 int EventMachine_t::DetachFD (EventableDescriptor *ed)
1226 {
1227         if (!ed)
1228                 throw std::runtime_error ("detaching bad descriptor");
1229
1230         #ifdef HAVE_EPOLL
1231         if (bEpoll) {
1232                 if (ed->GetSocket() != INVALID_SOCKET) {
1233                         assert (bEpoll); // wouldn't be in this method otherwise.
1234                         assert (epfd != -1);
1235                         int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent());
1236                         // ENOENT or EBADF are not errors because the socket may be already closed when we get here.
1237                         if (e && (errno != ENOENT) && (errno != EBADF)) {
1238                                 char buf [200];
1239                                 snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno));
1240                                 throw std::runtime_error (buf);
1241                         }
1242                 }
1243         }
1244         #endif
1245
1246         #ifdef HAVE_KQUEUE
1247         if (bKqueue) {
1248                 struct kevent k;
1249                 EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_DELETE, 0, 0, ed);
1250                 int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1251                 assert (t == 0);
1252         }
1253         #endif
1254
1255         { // remove descriptor from lists
1256                 int i, j;
1257                 int nSockets = Descriptors.size();
1258                 for (i=0, j=0; i < nSockets; i++) {
1259                         EventableDescriptor *ted = Descriptors[i];
1260                         assert (ted);
1261                         if (ted != ed)
1262                                 Descriptors [j++] = ted;
1263                 }
1264                 while ((size_t)j < Descriptors.size())
1265                         Descriptors.pop_back();
1266
1267                 ModifiedDescriptors.erase (ed);
1268         }
1269
1270         int fd = ed->GetSocket();
1271
1272         // We depend on ~EventableDescriptor not calling close() if the socket is invalid
1273         ed->SetSocketInvalid();
1274         delete ed;
1275
1276         return fd;
1277 }
1278
1279 /************
1280 name2address
1281 ************/
1282
1283 struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size)
1284 {
1285         // THIS IS NOT RE-ENTRANT OR THREADSAFE. Optimize for speed.
1286         // Check the more-common cases first.
1287         // Return NULL if no resolution.
1288
1289         static struct sockaddr_in in4;
1290         #ifndef __CYGWIN__
1291         static struct sockaddr_in6 in6;
1292         #endif
1293         struct hostent *hp;
1294
1295         if (!server || !*server)
1296                 server = "0.0.0.0";
1297
1298         memset (&in4, 0, sizeof(in4));
1299         if ( (in4.sin_addr.s_addr = inet_addr (server)) != INADDR_NONE) {
1300                 if (family)
1301                         *family = AF_INET;
1302                 if (bind_size)
1303                         *bind_size = sizeof(in4);
1304                 in4.sin_family = AF_INET;
1305                 in4.sin_port = htons (port);
1306                 return (struct sockaddr*)&in4;
1307         }
1308
1309         #if defined(OS_UNIX) && !defined(__CYGWIN__)
1310         memset (&in6, 0, sizeof(in6));
1311         if (inet_pton (AF_INET6, server, in6.sin6_addr.s6_addr) > 0) {
1312                 if (family)
1313                         *family = AF_INET6;
1314                 if (bind_size)
1315                         *bind_size = sizeof(in6);
1316                 in6.sin6_family = AF_INET6;
1317                 in6.sin6_port = htons (port);
1318                 return (struct sockaddr*)&in6;
1319         }
1320         #endif
1321
1322         #ifdef OS_WIN32
1323         // TODO, must complete this branch. Windows doesn't have inet_pton.
1324         // A possible approach is to make a getaddrinfo call with the supplied
1325         // server address, constraining the hints to ipv6 and seeing if we
1326         // get any addresses.
1327         // For the time being, Ipv6 addresses aren't supported on Windows.
1328         #endif
1329
1330         hp = gethostbyname ((char*)server); // Windows requires the cast.
1331         if (hp) {
1332                 in4.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
1333                 if (family)
1334                         *family = AF_INET;
1335                 if (bind_size)
1336                         *bind_size = sizeof(in4);
1337                 in4.sin_family = AF_INET;
1338                 in4.sin_port = htons (port);
1339                 return (struct sockaddr*)&in4;
1340         }
1341
1342         return NULL;
1343 }
1344
1345
1346 /*******************************
1347 EventMachine_t::CreateTcpServer
1348 *******************************/
1349
1350 const char *EventMachine_t::CreateTcpServer (const char *server, int port)
1351 {
1352         /* Create a TCP-acceptor (server) socket and add it to the event machine.
1353          * Return the binding of the new acceptor to the caller.
1354          * This binding will be referenced when the new acceptor sends events
1355          * to indicate accepted connections.
1356          */
1357
1358
1359         int family, bind_size;
1360         struct sockaddr *bind_here = name2address (server, port, &family, &bind_size);
1361         if (!bind_here)
1362                 return NULL;
1363
1364         const char *output_binding = NULL;
1365
1366         //struct sockaddr_in sin;
1367
1368         int sd_accept = socket (family, SOCK_STREAM, 0);
1369         if (sd_accept == INVALID_SOCKET) {
1370                 goto fail;
1371         }
1372
1373         /*
1374         memset (&sin, 0, sizeof(sin));
1375         sin.sin_family = AF_INET;
1376         sin.sin_addr.s_addr = INADDR_ANY;
1377         sin.sin_port = htons (port);
1378
1379         if (server && *server) {
1380                 sin.sin_addr.s_addr = inet_addr (server);
1381                 if (sin.sin_addr.s_addr == INADDR_NONE) {
1382                         hostent *hp = gethostbyname ((char*)server); // Windows requires the cast.
1383                         if (hp == NULL) {
1384                                 //__warning ("hostname not resolved: ", server);
1385                                 goto fail;
1386                         }
1387                         sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
1388                 }
1389         }
1390         */
1391
1392         { // set reuseaddr to improve performance on restarts.
1393                 int oval = 1;
1394                 if (setsockopt (sd_accept, SOL_SOCKET, SO_REUSEADDR, (char*)&oval, sizeof(oval)) < 0) {
1395                         //__warning ("setsockopt failed while creating listener","");
1396                         goto fail;
1397                 }
1398         }
1399
1400         { // set CLOEXEC. Only makes sense on Unix
1401                 #ifdef OS_UNIX
1402                 int cloexec = fcntl (sd_accept, F_GETFD, 0);
1403                 assert (cloexec >= 0);
1404                 cloexec |= FD_CLOEXEC;
1405                 fcntl (sd_accept, F_SETFD, cloexec);
1406                 #endif
1407         }
1408
1409
1410         //if (bind (sd_accept, (struct sockaddr*)&sin, sizeof(sin))) {
1411         if (bind (sd_accept, bind_here, bind_size)) {
1412                 //__warning ("binding failed");
1413                 goto fail;
1414         }
1415
1416         if (listen (sd_accept, 100)) {
1417                 //__warning ("listen failed");
1418                 goto fail;
1419         }
1420
1421         {
1422                 // Set the acceptor non-blocking.
1423                 // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
1424                 if (!SetSocketNonblocking (sd_accept)) {
1425                 //int val = fcntl (sd_accept, F_GETFL, 0);
1426                 //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
1427                         goto fail;
1428                 }
1429         }
1430
1431         { // Looking good.
1432                 AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
1433                 if (!ad)
1434                         throw std::runtime_error ("unable to allocate acceptor");
1435                 Add (ad);
1436                 output_binding = ad->GetBinding().c_str();
1437         }
1438
1439         return output_binding;
1440
1441         fail:
1442         if (sd_accept != INVALID_SOCKET)
1443                 closesocket (sd_accept);
1444         return NULL;
1445 }
1446
1447
1448 /**********************************
1449 EventMachine_t::OpenDatagramSocket
1450 **********************************/
1451
1452 const char *EventMachine_t::OpenDatagramSocket (const char *address, int port)
1453 {
1454         const char *output_binding = NULL;
1455
1456         int sd = socket (AF_INET, SOCK_DGRAM, 0);
1457         if (sd == INVALID_SOCKET)
1458                 goto fail;
1459         // from here on, early returns must close the socket!
1460
1461
1462         struct sockaddr_in sin;
1463         memset (&sin, 0, sizeof(sin));
1464         sin.sin_family = AF_INET;
1465         sin.sin_port = htons (port);
1466
1467
1468         if (address && *address) {
1469                 sin.sin_addr.s_addr = inet_addr (address);
1470                 if (sin.sin_addr.s_addr == INADDR_NONE) {
1471                         hostent *hp = gethostbyname ((char*)address); // Windows requires the cast.
1472                         if (hp == NULL)
1473                                 goto fail;
1474                         sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
1475                 }
1476         }
1477         else
1478                 sin.sin_addr.s_addr = htonl (INADDR_ANY);
1479
1480
1481         // Set the new socket nonblocking.
1482         {
1483                 if (!SetSocketNonblocking (sd))
1484                 //int val = fcntl (sd, F_GETFL, 0);
1485                 //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1)
1486                         goto fail;
1487         }
1488
1489         if (bind (sd, (struct sockaddr*)&sin, sizeof(sin)) != 0)
1490                 goto fail;
1491
1492         { // Looking good.
1493                 DatagramDescriptor *ds = new DatagramDescriptor (sd, this);
1494                 if (!ds)
1495                         throw std::runtime_error ("unable to allocate datagram-socket");
1496                 Add (ds);
1497                 output_binding = ds->GetBinding().c_str();
1498         }
1499
1500         return output_binding;
1501
1502         fail:
1503         if (sd != INVALID_SOCKET)
1504                 closesocket (sd);
1505         return NULL;
1506 }
1507
1508
1509
1510 /*******************
1511 EventMachine_t::Add
1512 *******************/
1513
1514 void EventMachine_t::Add (EventableDescriptor *ed)
1515 {
1516         if (!ed)
1517                 throw std::runtime_error ("added bad descriptor");
1518         ed->SetEventCallback (EventCallback);
1519         NewDescriptors.push_back (ed);
1520 }
1521
1522
1523 /*******************************
1524 EventMachine_t::ArmKqueueWriter
1525 *******************************/
1526
1527 void EventMachine_t::ArmKqueueWriter (EventableDescriptor *ed)
1528 {
1529         #ifdef HAVE_KQUEUE
1530         if (bKqueue) {
1531                 if (!ed)
1532                         throw std::runtime_error ("added bad descriptor");
1533                 struct kevent k;
1534                 EV_SET (&k, ed->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, ed);
1535                 int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1536                 assert (t == 0);
1537         }
1538         #endif
1539 }
1540
1541 /*******************************
1542 EventMachine_t::ArmKqueueReader
1543 *******************************/
1544
1545 void EventMachine_t::ArmKqueueReader (EventableDescriptor *ed)
1546 {
1547         #ifdef HAVE_KQUEUE
1548         if (bKqueue) {
1549                 if (!ed)
1550                         throw std::runtime_error ("added bad descriptor");
1551                 struct kevent k;
1552                 EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed);
1553                 int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1554                 assert (t == 0);
1555         }
1556         #endif
1557 }
1558
1559 /**********************************
1560 EventMachine_t::_AddNewDescriptors
1561 **********************************/
1562
1563 void EventMachine_t::_AddNewDescriptors()
1564 {
1565         /* Avoid adding descriptors to the main descriptor list
1566          * while we're actually traversing the list.
1567          * Any descriptors that are added as a result of processing timers
1568          * or acceptors should go on a temporary queue and then added
1569          * while we're not traversing the main list.
1570          * Also, it (rarely) happens that a newly-created descriptor
1571          * is immediately scheduled to close. It might be a good
1572          * idea not to bother scheduling these for I/O but if
1573          * we do that, we might bypass some important processing.
1574          */
1575
1576         for (size_t i = 0; i < NewDescriptors.size(); i++) {
1577                 EventableDescriptor *ed = NewDescriptors[i];
1578                 if (ed == NULL)
1579                         throw std::runtime_error ("adding bad descriptor");
1580
1581                 #if HAVE_EPOLL
1582                 if (bEpoll) {
1583                         assert (epfd != -1);
1584                         int e = epoll_ctl (epfd, EPOLL_CTL_ADD, ed->GetSocket(), ed->GetEpollEvent());
1585                         if (e) {
1586                                 char buf [200];
1587                                 snprintf (buf, sizeof(buf)-1, "unable to add new descriptor: %s", strerror(errno));
1588                                 throw std::runtime_error (buf);
1589                         }
1590                 }
1591                 #endif
1592
1593                 #if HAVE_KQUEUE
1594                 /*
1595                 if (bKqueue) {
1596                         // INCOMPLETE. Some descriptors don't want to be readable.
1597                         assert (kqfd != -1);
1598                         struct kevent k;
1599                         EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed);
1600                         int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1601                         assert (t == 0);
1602                 }
1603                 */
1604                 #endif
1605
1606                 Descriptors.push_back (ed);
1607         }
1608         NewDescriptors.clear();
1609 }
1610
1611
1612 /**********************************
1613 EventMachine_t::_ModifyDescriptors
1614 **********************************/
1615
1616 void EventMachine_t::_ModifyDescriptors()
1617 {
1618         /* For implementations which don't level check every descriptor on
1619          * every pass through the machine, as select does.
1620          * If we're not selecting, then descriptors need a way to signal to the
1621          * machine that their readable or writable status has changed.
1622          * That's what the ::Modify call is for. We do it this way to avoid
1623          * modifying descriptors during the loop traversal, where it can easily
1624          * happen that an object (like a UDP socket) gets data written on it by
1625          * the application during #post_init. That would take place BEFORE the
1626          * descriptor even gets added to the epoll descriptor, so the modify
1627          * operation will crash messily.
1628          * Another really messy possibility is for a descriptor to put itself
1629          * on the Modified list, and then get deleted before we get here.
1630          * Remember, deletes happen after the I/O traversal and before the
1631          * next pass through here. So we have to make sure when we delete a
1632          * descriptor to remove it from the Modified list.
1633          */
1634
1635         #ifdef HAVE_EPOLL
1636         if (bEpoll) {
1637                 set<EventableDescriptor*>::iterator i = ModifiedDescriptors.begin();
1638                 while (i != ModifiedDescriptors.end()) {
1639                         assert (*i);
1640                         _ModifyEpollEvent (*i);
1641                         ++i;
1642                 }
1643         }
1644         #endif
1645
1646         ModifiedDescriptors.clear();
1647 }
1648
1649
1650 /**********************
1651 EventMachine_t::Modify
1652 **********************/
1653
1654 void EventMachine_t::Modify (EventableDescriptor *ed)
1655 {
1656         if (!ed)
1657                 throw std::runtime_error ("modified bad descriptor");
1658         ModifiedDescriptors.insert (ed);
1659 }
1660
1661
1662 /***********************************
1663 EventMachine_t::_OpenFileForWriting
1664 ***********************************/
1665
1666 const char *EventMachine_t::_OpenFileForWriting (const char *filename)
1667 {
1668   /*
1669          * Return the binding-text of the newly-opened file,
1670          * or NULL if there was a problem.
1671          */
1672
1673         if (!filename || !*filename)
1674                 return NULL;
1675
1676   int fd = open (filename, O_CREAT|O_TRUNC|O_WRONLY|O_NONBLOCK, 0644);
1677  
1678         FileStreamDescriptor *fsd = new FileStreamDescriptor (fd, this);
1679   if (!fsd)
1680         throw std::runtime_error ("no file-stream allocated");
1681   Add (fsd);
1682   return fsd->GetBinding().c_str();
1683
1684 }
1685
1686
1687 /**************************************
1688 EventMachine_t::CreateUnixDomainServer
1689 **************************************/
1690
1691 const char *EventMachine_t::CreateUnixDomainServer (const char *filename)
1692 {
1693         /* Create a UNIX-domain acceptor (server) socket and add it to the event machine.
1694          * Return the binding of the new acceptor to the caller.
1695          * This binding will be referenced when the new acceptor sends events
1696          * to indicate accepted connections.
1697          * THERE IS NO MEANINGFUL IMPLEMENTATION ON WINDOWS.
1698          */
1699
1700         #ifdef OS_WIN32
1701         throw std::runtime_error ("unix-domain server unavailable on this platform");
1702         #endif
1703
1704         // The whole rest of this function is only compiled on Unix systems.
1705         #ifdef OS_UNIX
1706         const char *output_binding = NULL;
1707
1708         struct sockaddr_un s_sun;
1709
1710         int sd_accept = socket (AF_LOCAL, SOCK_STREAM, 0);
1711         if (sd_accept == INVALID_SOCKET) {
1712                 goto fail;
1713         }
1714
1715         if (!filename || !*filename)
1716                 goto fail;
1717         unlink (filename);
1718
1719         bzero (&s_sun, sizeof(s_sun));
1720         s_sun.sun_family = AF_LOCAL;
1721         strncpy (s_sun.sun_path, filename, sizeof(s_sun.sun_path)-1);
1722
1723         // don't bother with reuseaddr for a local socket.
1724
1725         { // set CLOEXEC. Only makes sense on Unix
1726                 #ifdef OS_UNIX
1727                 int cloexec = fcntl (sd_accept, F_GETFD, 0);
1728                 assert (cloexec >= 0);
1729                 cloexec |= FD_CLOEXEC;
1730                 fcntl (sd_accept, F_SETFD, cloexec);
1731                 #endif
1732         }
1733
1734         if (bind (sd_accept, (struct sockaddr*)&s_sun, sizeof(s_sun))) {
1735                 //__warning ("binding failed");
1736                 goto fail;
1737         }
1738
1739         if (listen (sd_accept, 100)) {
1740                 //__warning ("listen failed");
1741                 goto fail;
1742         }
1743
1744         {
1745                 // Set the acceptor non-blocking.
1746                 // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
1747                 if (!SetSocketNonblocking (sd_accept)) {
1748                 //int val = fcntl (sd_accept, F_GETFL, 0);
1749                 //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
1750                         goto fail;
1751                 }
1752         }
1753
1754         { // Looking good.
1755                 AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
1756                 if (!ad)
1757                         throw std::runtime_error ("unable to allocate acceptor");
1758                 Add (ad);
1759                 output_binding = ad->GetBinding().c_str();
1760         }
1761
1762         return output_binding;
1763
1764         fail:
1765         if (sd_accept != INVALID_SOCKET)
1766                 closesocket (sd_accept);
1767         return NULL;
1768         #endif // OS_UNIX
1769 }
1770
1771
1772 /*********************
1773 EventMachine_t::Popen
1774 *********************/
1775 #if OBSOLETE
1776 const char *EventMachine_t::Popen (const char *cmd, const char *mode)
1777 {
1778         #ifdef OS_WIN32
1779         throw std::runtime_error ("popen is currently unavailable on this platform");
1780         #endif
1781
1782         // The whole rest of this function is only compiled on Unix systems.
1783         // Eventually we need this functionality (or a full-duplex equivalent) on Windows.
1784         #ifdef OS_UNIX
1785         const char *output_binding = NULL;
1786
1787         FILE *fp = popen (cmd, mode);
1788         if (!fp)
1789                 return NULL;
1790
1791         // From here, all early returns must pclose the stream.
1792
1793         // According to the pipe(2) manpage, descriptors returned from pipe have both
1794         // CLOEXEC and NONBLOCK clear. Do NOT set CLOEXEC. DO set nonblocking.
1795         if (!SetSocketNonblocking (fileno (fp))) {
1796                 pclose (fp);
1797                 return NULL;
1798         }
1799
1800         { // Looking good.
1801                 PipeDescriptor *pd = new PipeDescriptor (fp, this);
1802                 if (!pd)
1803                         throw std::runtime_error ("unable to allocate pipe");
1804                 Add (pd);
1805                 output_binding = pd->GetBinding().c_str();
1806         }
1807
1808         return output_binding;
1809         #endif
1810 }
1811 #endif // OBSOLETE
1812
1813 /**************************
1814 EventMachine_t::Socketpair
1815 **************************/
1816
1817 const char *EventMachine_t::Socketpair (char * const*cmd_strings)
1818 {
1819         #ifdef OS_WIN32
1820         throw std::runtime_error ("socketpair is currently unavailable on this platform");
1821         #endif
1822
1823         // The whole rest of this function is only compiled on Unix systems.
1824         // Eventually we need this functionality (or a full-duplex equivalent) on Windows.
1825         #ifdef OS_UNIX
1826         // Make sure the incoming array of command strings is sane.
1827         if (!cmd_strings)
1828                 return NULL;
1829         int j;
1830         for (j=0; j < 100 && cmd_strings[j]; j++)
1831                 ;
1832         if ((j==0) || (j==100))
1833                 return NULL;
1834
1835         const char *output_binding = NULL;
1836
1837         int sv[2];
1838         if (socketpair (AF_LOCAL, SOCK_STREAM, 0, sv) < 0)
1839                 return NULL;
1840         // from here, all early returns must close the pair of sockets.
1841
1842         // Set the parent side of the socketpair nonblocking.
1843         // We don't care about the child side, and most child processes will expect their
1844         // stdout to be blocking. Thanks to Duane Johnson and Bill Kelly for pointing this out.
1845         // Obviously DON'T set CLOEXEC.
1846         if (!SetSocketNonblocking (sv[0])) {
1847                 close (sv[0]);
1848                 close (sv[1]);
1849                 return NULL;
1850         }
1851
1852         pid_t f = fork();
1853         if (f > 0) {
1854                 close (sv[1]);
1855                 PipeDescriptor *pd = new PipeDescriptor (sv[0], f, this);
1856                 if (!pd)
1857                         throw std::runtime_error ("unable to allocate pipe");
1858                 Add (pd);
1859                 output_binding = pd->GetBinding().c_str();
1860         }
1861         else if (f == 0) {
1862                 close (sv[0]);
1863                 dup2 (sv[1], STDIN_FILENO);
1864                 close (sv[1]);
1865                 dup2 (STDIN_FILENO, STDOUT_FILENO);
1866                 execvp (cmd_strings[0], cmd_strings+1);
1867                 exit (-1); // end the child process if the exec doesn't work.
1868         }
1869         else
1870                 throw std::runtime_error ("no fork");
1871
1872         return output_binding;
1873         #endif
1874 }
1875
1876
1877 /****************************
1878 EventMachine_t::OpenKeyboard
1879 ****************************/
1880
1881 const char *EventMachine_t::OpenKeyboard()
1882 {
1883         KeyboardDescriptor *kd = new KeyboardDescriptor (this);
1884         if (!kd)
1885                 throw std::runtime_error ("no keyboard-object allocated");
1886         Add (kd);
1887         return kd->GetBinding().c_str();
1888 }
1889
1890
1891
1892
1893
1894 //#endif // OS_UNIX
1895
Note: See TracBrowser for help on using the browser.