root/trunk/ext/em.cpp

Revision 785, 50.8 kB (checked in by francis, 9 months ago)

Applied a patch from Aman Gupta (tmm1) with contributions from
Riham Aldakkak, adds file-descriptor attach/detach and two new
event notifications, to support file descriptors not generated
internally by the reactor.

  • Property svn:keywords set to Id
Line 
1 /*****************************************************************************
2
3 $Id$
4
5 File:     em.cpp
6 Date:     06Apr06
7
8 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
9 Gmail: blackhedd
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of either: 1) the GNU General Public License
13 as published by the Free Software Foundation; either version 2 of the
14 License, or (at your option) any later version; or 2) Ruby's License.
15
16 See the file COPYING for complete licensing information.
17
18 *****************************************************************************/
19
20 // THIS ENTIRE FILE WILL EVENTUALLY BE FOR UNIX BUILDS ONLY.
21 //#ifdef OS_UNIX
22
23
24 #include "project.h"
25
26 // Keep a global variable floating around
27 // with the current loop time as set by the Event Machine.
28 // This avoids the need for frequent expensive calls to time(NULL);
29 time_t gCurrentLoopTime;
30
31 #ifdef OS_WIN32
32 unsigned gTickCountTickover;
33 unsigned gLastTickCount;
34 #endif
35
36
37 /* The numer of max outstanding timers was once a const enum defined in em.h.
38  * Now we define it here so that users can change its value if necessary.
39  */
40 static int MaxOutstandingTimers = 1000;
41
42
43 /* Internal helper to convert strings to internet addresses. IPv6-aware.
44  * Not reentrant or threadsafe, optimized for speed.
45  */
46 static struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size);
47
48
49 /***************************************
50 STATIC EventMachine_t::SetMaxTimerCount
51 ***************************************/
52
53 void EventMachine_t::SetMaxTimerCount (int count)
54 {
55         /* Allow a user to increase the maximum number of outstanding timers.
56          * If this gets "too high" (a metric that is of course platform dependent),
57          * bad things will happen like performance problems and possible overuse
58          * of memory.
59          * The actual timer mechanism is very efficient so it's hard to know what
60          * the practical max, but 100,000 shouldn't be too problematical.
61          */
62         if (count < 100)
63                 count = 100;
64         MaxOutstandingTimers = count;
65 }
66
67
68
69 /******************************
70 EventMachine_t::EventMachine_t
71 ******************************/
72
73 EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const char*, int)):
74         EventCallback (event_callback),
75         NextHeartbeatTime (0),
76         LoopBreakerReader (-1),
77         LoopBreakerWriter (-1),
78         bEpoll (false),
79         bKqueue (false),
80         epfd (-1)
81 {
82         // Default time-slice is just smaller than one hundred mills.
83         Quantum.tv_sec = 0;
84         Quantum.tv_usec = 90000;
85
86         gTerminateSignalReceived = false;
87         // Make sure the current loop time is sane, in case we do any initializations of
88         // objects before we start running.
89         gCurrentLoopTime = time(NULL);
90
91         /* We initialize the network library here (only on Windows of course)
92          * and initialize "loop breakers." Our destructor also does some network-level
93          * cleanup. There's thus an implicit assumption that any given instance of EventMachine_t
94          * will only call ::Run once. Is that a good assumption? Should we move some of these
95          * inits and de-inits into ::Run?
96          */
97         #ifdef OS_WIN32
98         WSADATA w;
99         WSAStartup (MAKEWORD (1, 1), &w);
100         #endif
101
102         _InitializeLoopBreaker();
103 }
104
105
106 /*******************************
107 EventMachine_t::~EventMachine_t
108 *******************************/
109
110 EventMachine_t::~EventMachine_t()
111 {
112         // Run down descriptors
113         size_t i;
114         for (i = 0; i < NewDescriptors.size(); i++)
115                 delete NewDescriptors[i];
116         for (i = 0; i < Descriptors.size(); i++)
117                 delete Descriptors[i];
118
119         close (LoopBreakerReader);
120         close (LoopBreakerWriter);
121
122         if (epfd != -1)
123                 close (epfd);
124         if (kqfd != -1)
125                 close (kqfd);
126 }
127
128
129 /*************************
130 EventMachine_t::_UseEpoll
131 *************************/
132
133 void EventMachine_t::_UseEpoll()
134 {
135         /* Temporary.
136          * Use an internal flag to switch in epoll-based functionality until we determine
137          * how it should be integrated properly and the extent of the required changes.
138          * A permanent solution needs to allow the integration of additional technologies,
139          * like kqueue and Solaris's events.
140          */
141
142         #ifdef HAVE_EPOLL
143         bEpoll = true;
144         #endif
145 }
146
147 /**************************
148 EventMachine_t::_UseKqueue
149 **************************/
150
151 void EventMachine_t::_UseKqueue()
152 {
153         /* Temporary.
154          * See comments under _UseEpoll.
155          */
156
157         #ifdef HAVE_KQUEUE
158         bKqueue = true;
159         #endif
160 }
161
162
163 /****************************
164 EventMachine_t::ScheduleHalt
165 ****************************/
166
167 void EventMachine_t::ScheduleHalt()
168 {
169   /* This is how we stop the machine.
170    * This can be called by clients. Signal handlers will probably
171    * set the global flag.
172    * For now this means there can only be one EventMachine ever running at a time.
173    *
174    * IMPORTANT: keep this light, fast, and async-safe. Don't do anything frisky in here,
175    * because it may be called from signal handlers invoked from code that we don't
176    * control. At this writing (20Sep06), EM does NOT install any signal handlers of
177    * its own.
178    *
179    * We need a FAQ. And one of the questions is: how do I stop EM when Ctrl-C happens?
180    * The answer is to call evma_stop_machine, which calls here, from a SIGINT handler.
181    */
182         gTerminateSignalReceived = true;
183 }
184
185
186
187 /*******************************
188 EventMachine_t::SetTimerQuantum
189 *******************************/
190
191 void EventMachine_t::SetTimerQuantum (int interval)
192 {
193         /* We get a timer-quantum expressed in milliseconds.
194          * Don't set a quantum smaller than 5 or larger than 2500.
195          */
196
197         if ((interval < 5) || (interval > 2500))
198                 throw std::runtime_error ("invalid timer-quantum");
199
200         Quantum.tv_sec = interval / 1000;
201         Quantum.tv_usec = (interval % 1000) * 1000;
202 }
203
204
205 /*************************************
206 (STATIC) EventMachine_t::SetuidString
207 *************************************/
208
209 void EventMachine_t::SetuidString (const char *username)
210 {
211     /* This method takes a caller-supplied username and tries to setuid
212      * to that user. There is no meaningful implementation (and no error)
213      * on Windows. On Unix, a failure to setuid the caller-supplied string
214      * causes a fatal abort, because presumably the program is calling here
215      * in order to fulfill a security requirement. If we fail silently,
216      * the user may continue to run with too much privilege.
217      *
218      * TODO, we need to decide on and document a way of generating C++ level errors
219      * that can be wrapped in documented Ruby exceptions, so users can catch
220      * and handle them. And distinguish it from errors that we WON'T let the Ruby
221      * user catch (like security-violations and resource-overallocation).
222      * A setuid failure here would be in the latter category.
223      */
224
225     #ifdef OS_UNIX
226     if (!username || !*username)
227         throw std::runtime_error ("setuid_string failed: no username specified");
228
229     struct passwd *p = getpwnam (username);
230     if (!p)
231         throw std::runtime_error ("setuid_string failed: unknown username");
232
233     if (setuid (p->pw_uid) != 0)
234         throw std::runtime_error ("setuid_string failed: no setuid");
235
236     // Success.
237     #endif
238 }
239
240
241 /****************************************
242 (STATIC) EventMachine_t::SetRlimitNofile
243 ****************************************/
244
245 int EventMachine_t::SetRlimitNofile (int nofiles)
246 {
247         #ifdef OS_UNIX
248         struct rlimit rlim;
249         getrlimit (RLIMIT_NOFILE, &rlim);
250         if (nofiles >= 0) {
251                 rlim.rlim_cur = nofiles;
252                 if (nofiles > rlim.rlim_max)
253                         rlim.rlim_max = nofiles;
254                 setrlimit (RLIMIT_NOFILE, &rlim);
255                 // ignore the error return, for now at least.
256                 // TODO, emit an error message someday when we have proper debug levels.
257         }
258         getrlimit (RLIMIT_NOFILE, &rlim);
259         return rlim.rlim_cur;
260         #endif
261
262         #ifdef OS_WIN32
263         // No meaningful implementation on Windows.
264         return 0;
265         #endif
266 }
267
268
269 /*********************************
270 EventMachine_t::SignalLoopBreaker
271 *********************************/
272
273 void EventMachine_t::SignalLoopBreaker()
274 {
275         #ifdef OS_UNIX
276         write (LoopBreakerWriter, "", 1);
277         #endif
278         #ifdef OS_WIN32
279         sendto (LoopBreakerReader, "", 0, 0, (struct sockaddr*)&(LoopBreakerTarget), sizeof(LoopBreakerTarget));
280         #endif
281 }
282
283
284 /**************************************
285 EventMachine_t::_InitializeLoopBreaker
286 **************************************/
287
288 void EventMachine_t::_InitializeLoopBreaker()
289 {
290         /* A "loop-breaker" is a socket-descriptor that we can write to in order
291          * to break the main select loop. Primarily useful for things running on
292          * threads other than the main EM thread, so they can trigger processing
293          * of events that arise exogenously to the EM.
294          * Keep the loop-breaker pipe out of the main descriptor set, otherwise
295          * its events will get passed on to user code.
296          */
297
298         #ifdef OS_UNIX
299         int fd[2];
300         if (pipe (fd))
301                 throw std::runtime_error ("no loop breaker");
302
303         LoopBreakerWriter = fd[1];
304         LoopBreakerReader = fd[0];
305         #endif
306
307         #ifdef OS_WIN32
308         int sd = socket (AF_INET, SOCK_DGRAM, 0);
309         if (sd == INVALID_SOCKET)
310                 throw std::runtime_error ("no loop breaker socket");
311         SetSocketNonblocking (sd);
312
313         memset (&LoopBreakerTarget, 0, sizeof(LoopBreakerTarget));
314         LoopBreakerTarget.sin_family = AF_INET;
315         LoopBreakerTarget.sin_addr.s_addr = inet_addr ("127.0.0.1");
316
317         srand ((int)time(NULL));
318         int i;
319         for (i=0; i < 100; i++) {
320                 int r = (rand() % 10000) + 20000;
321                 LoopBreakerTarget.sin_port = htons (r);
322                 if (bind (sd, (struct sockaddr*)&LoopBreakerTarget, sizeof(LoopBreakerTarget)) == 0)
323                         break;
324         }
325
326         if (i == 100)
327                 throw std::runtime_error ("no loop breaker");
328         LoopBreakerReader = sd;
329         #endif
330 }
331
332
333 /*******************
334 EventMachine_t::Run
335 *******************/
336
337 void EventMachine_t::Run()
338 {
339         #ifdef OS_WIN32
340         HookControlC (true);
341         #endif
342
343         #ifdef HAVE_EPOLL
344         if (bEpoll) {
345                 epfd = epoll_create (MaxEpollDescriptors);
346                 if (epfd == -1) {
347                         char buf[200];
348                         snprintf (buf, sizeof(buf)-1, "unable to create epoll descriptor: %s", strerror(errno));
349                         throw std::runtime_error (buf);
350                 }
351                 int cloexec = fcntl (epfd, F_GETFD, 0);
352                 assert (cloexec >= 0);
353                 cloexec |= FD_CLOEXEC;
354                 fcntl (epfd, F_SETFD, cloexec);
355
356                 assert (LoopBreakerReader >= 0);
357                 LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this);
358                 assert (ld);
359                 Add (ld);
360         }
361         #endif
362
363         #ifdef HAVE_KQUEUE
364         if (bKqueue) {
365                 kqfd = kqueue();
366                 if (kqfd == -1) {
367                         char buf[200];
368                         snprintf (buf, sizeof(buf)-1, "unable to create kqueue descriptor: %s", strerror(errno));
369                         throw std::runtime_error (buf);
370                 }
371                 // cloexec not needed. By definition, kqueues are not carried across forks.
372
373                 assert (LoopBreakerReader >= 0);
374                 LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this);
375                 assert (ld);
376                 Add (ld);
377         }
378         #endif
379
380         while (true) {
381                 gCurrentLoopTime = time(NULL);
382                 if (!_RunTimers())
383                         break;
384
385                 /* _Add must precede _Modify because the same descriptor might
386                  * be on both lists during the same pass through the machine,
387                  * and to modify a descriptor before adding it would fail.
388                  */
389                 _AddNewDescriptors();
390                 _ModifyDescriptors();
391
392                 if (!_RunOnce())
393                         break;
394                 if (gTerminateSignalReceived)
395                         break;
396         }
397
398         #ifdef OS_WIN32
399         HookControlC (false);
400         #endif
401 }
402
403
404 /************************
405 EventMachine_t::_RunOnce
406 ************************/
407
408 bool EventMachine_t::_RunOnce()
409 {
410         if (bEpoll)
411                 return _RunEpollOnce();
412         else if (bKqueue)
413                 return _RunKqueueOnce();
414         else
415                 return _RunSelectOnce();
416 }
417
418
419
420 /*****************************
421 EventMachine_t::_RunEpollOnce
422 *****************************/
423
424 bool EventMachine_t::_RunEpollOnce()
425 {
426         #ifdef HAVE_EPOLL
427         assert (epfd != -1);
428         struct epoll_event ev [MaxEpollDescriptors];
429         int s = epoll_wait (epfd, ev, MaxEpollDescriptors, 50);
430         if (s > 0) {
431                 for (int i=0; i < s; i++) {
432                         EventableDescriptor *ed = (EventableDescriptor*) ev[i].data.ptr;
433
434                         if (ev[i].events & (EPOLLERR | EPOLLHUP))
435                                 ed->ScheduleClose (false);
436                         if (ev[i].events & EPOLLIN)
437                                 ed->Read();
438                         if (ev[i].events & EPOLLOUT) {
439                                 ed->Write();
440                                 epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent());
441                                 // Ignoring return value
442                         }
443                 }
444         }
445         else if (s < 0) {
446                 // epoll_wait can fail on error in a handful of ways.
447                 // If this happens, then wait for a little while to avoid busy-looping.
448                 // If the error was EINTR, we probably caught SIGCHLD or something,
449                 // so keep the wait short.
450                 timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000};
451                 EmSelect (0, NULL, NULL, NULL, &tv);
452         }
453
454         { // cleanup dying sockets
455                 // vector::pop_back works in constant time.
456                 // TODO, rip this out and only delete the descriptors we know have died,
457                 // rather than traversing the whole list.
458                 //  Modified 05Jan08 per suggestions by Chris Heath. It's possible that
459                 //  an EventableDescriptor will have a descriptor value of -1. That will
460                 //  happen if EventableDescriptor::Close was called on it. In that case,
461                 //  don't call epoll_ctl to remove the socket's filters from the epoll set.
462                 //  According to the epoll docs, this happens automatically when the
463                 //  descriptor is closed anyway. This is different from the case where
464                 //  the socket has already been closed but the descriptor in the ED object
465                 //  hasn't yet been set to INVALID_SOCKET.
466                 int i, j;
467                 int nSockets = Descriptors.size();
468                 for (i=0, j=0; i < nSockets; i++) {
469                         EventableDescriptor *ed = Descriptors[i];
470                         assert (ed);
471                         if (ed->ShouldDelete()) {
472                                 if (ed->GetSocket() != INVALID_SOCKET) {
473                                         assert (bEpoll); // wouldn't be in this method otherwise.
474                                         assert (epfd != -1);
475                                         int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent());
476                                         // ENOENT or EBADF are not errors because the socket may be already closed when we get here.
477                                         if (e && (errno != ENOENT) && (errno != EBADF)) {
478                                                 char buf [200];
479                                                 snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno));
480                                                 throw std::runtime_error (buf);
481                                         }
482                                 }
483
484                                 ModifiedDescriptors.erase (ed);
485                                 delete ed;
486                         }
487                         else
488                                 Descriptors [j++] = ed;
489                 }
490                 while ((size_t)j < Descriptors.size())
491                         Descriptors.pop_back();
492
493         }
494
495         // TODO, heartbeats.
496         // Added 14Sep07, its absence was noted by Brian Candler. But the comment was here, indicated
497         // that this got thought about and not done when EPOLL was originally written. Was there a reason
498         // not to do it, or was it an oversight? Certainly, running a heartbeat on 50,000 connections every
499         // two seconds can get to be a real bear, especially if all we're doing is timing out dead ones.
500         // Maybe there's a better way to do this. (Or maybe it's not that expensive after all.)
501         //
502         { // dispatch heartbeats
503                 if (gCurrentLoopTime >= NextHeartbeatTime) {
504                         NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
505
506                         for (int i=0; i < Descriptors.size(); i++) {
507                                 EventableDescriptor *ed = Descriptors[i];
508                                 assert (ed);
509                                 ed->Heartbeat();
510                         }
511                 }
512         }
513
514         timeval tv = {0,0};
515         EmSelect (0, NULL, NULL, NULL, &tv);
516
517         return true;
518         #else
519         throw std::runtime_error ("epoll is not implemented on this platform");
520         #endif
521 }
522
523
524 /******************************
525 EventMachine_t::_RunKqueueOnce
526 ******************************/
527
528 bool EventMachine_t::_RunKqueueOnce()
529 {
530         #ifdef HAVE_KQUEUE
531         assert (kqfd != -1);
532         const int maxKevents = 2000;
533         struct kevent Karray [maxKevents];
534         struct timespec ts = {0, 10000000}; // Too frequent. Use blocking_region
535
536         int k = kevent (kqfd, NULL, 0, Karray, maxKevents, &ts);
537         struct kevent *ke = Karray;
538         while (k > 0) {
539                 EventableDescriptor *ed = (EventableDescriptor*) (ke->udata);
540                 assert (ed);
541
542                 if (ke->filter == EVFILT_READ)
543                         ed->Read();
544                 else if (ke->filter == EVFILT_WRITE)
545                         ed->Write();
546                 else
547                         cerr << "Discarding unknown kqueue event " << ke->filter << endl;
548
549                 --k;
550                 ++ke;
551         }
552
553         { // cleanup dying sockets
554                 // vector::pop_back works in constant time.
555                 // TODO, rip this out and only delete the descriptors we know have died,
556                 // rather than traversing the whole list.
557                 // In kqueue, closing a descriptor automatically removes its event filters.
558
559                 int i, j;
560                 int nSockets = Descriptors.size();
561                 for (i=0, j=0; i < nSockets; i++) {
562                         EventableDescriptor *ed = Descriptors[i];
563                         assert (ed);
564                         if (ed->ShouldDelete()) {
565                                 ModifiedDescriptors.erase (ed);
566                                 delete ed;
567                         }
568                         else
569                                 Descriptors [j++] = ed;
570                 }
571                 while ((size_t)j < Descriptors.size())
572                         Descriptors.pop_back();
573
574         }
575
576         { // dispatch heartbeats
577                 if (gCurrentLoopTime >= NextHeartbeatTime) {
578                         NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
579
580                         for (int i=0; i < Descriptors.size(); i++) {
581                                 EventableDescriptor *ed = Descriptors[i];
582                                 assert (ed);
583                                 ed->Heartbeat();
584                         }
585                 }
586         }
587
588
589         // TODO, replace this with rb_thread_blocking_region for 1.9 builds.
590         timeval tv = {0,0};
591         EmSelect (0, NULL, NULL, NULL, &tv);
592
593         return true;
594         #else
595         throw std::runtime_error ("kqueue is not implemented on this platform");
596         #endif
597 }
598
599
600 /*********************************
601 EventMachine_t::_ModifyEpollEvent
602 *********************************/
603
604 void EventMachine_t::_ModifyEpollEvent (EventableDescriptor *ed)
605 {
606         #ifdef HAVE_EPOLL
607         if (bEpoll) {
608                 assert (epfd != -1);
609                 assert (ed);
610                 int e = epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent());
611                 if (e) {
612                         char buf [200];
613                         snprintf (buf, sizeof(buf)-1, "unable to modify epoll event: %s", strerror(errno));
614                         throw std::runtime_error (buf);
615                 }
616         }
617         #endif
618 }
619
620
621
622 /**************************
623 SelectData_t::SelectData_t
624 **************************/
625
626 SelectData_t::SelectData_t()
627 {
628         maxsocket = 0;
629         FD_ZERO (&fdreads);
630         FD_ZERO (&fdwrites);
631 }
632
633
634 /*****************
635 _SelectDataSelect
636 *****************/
637
638 #ifdef HAVE_TBR
639 static VALUE _SelectDataSelect (void *v)
640 {
641         SelectData_t *sd = (SelectData_t*)v;
642         sd->nSockets = select (sd->maxsocket+1, &(sd->fdreads), &(sd->fdwrites), NULL, &(sd->tv));
643         return Qnil;
644 }
645 #endif
646
647 /*********************
648 SelectData_t::_Select
649 *********************/
650
651 int SelectData_t::_Select()
652 {
653         #ifdef HAVE_TBR
654         rb_thread_blocking_region (_SelectDataSelect, (void*)this, RB_UBF_DFL, 0);
655         return nSockets;
656         #endif
657
658         #ifndef HAVE_TBR
659         return EmSelect (maxsocket+1, &fdreads, &fdwrites, NULL, &tv);
660         #endif
661 }
662
663
664
665 /******************************
666 EventMachine_t::_RunSelectOnce
667 ******************************/
668
669 bool EventMachine_t::_RunSelectOnce()
670 {
671         // Crank the event machine once.
672         // If there are no descriptors to process, then sleep
673         // for a few hundred mills to avoid busy-looping.
674         // Return T/F to indicate whether we should continue.
675         // This is based on a select loop. Alternately provide epoll
676         // if we know we're running on a 2.6 kernel.
677         // epoll will be effective if we provide it as an alternative,
678         // however it has the same problem interoperating with Ruby
679         // threads that select does.
680
681         //cerr << "X";
682
683         /* This protection is now obsolete, because we will ALWAYS
684          * have at least one descriptor (the loop-breaker) to read.
685          */
686         /*
687         if (Descriptors.size() == 0) {
688                 #ifdef OS_UNIX
689                 timeval tv = {0, 200 * 1000};
690                 EmSelect (0, NULL, NULL, NULL, &tv);
691                 return true;
692                 #endif
693                 #ifdef OS_WIN32
694                 Sleep (200);
695                 return true;
696                 #endif
697         }
698         */
699
700         SelectData_t SelectData;
701         /*
702         fd_set fdreads, fdwrites;
703         FD_ZERO (&fdreads);
704         FD_ZERO (&fdwrites);
705
706         int maxsocket = 0;
707         */
708
709         // Always read the loop-breaker reader.
710         // Changed 23Aug06, provisionally implemented for Windows with a UDP socket
711         // running on localhost with a randomly-chosen port. (*Puke*)
712         // Windows has a version of the Unix pipe() library function, but it doesn't
713         // give you back descriptors that are selectable.
714         FD_SET (LoopBreakerReader, &(SelectData.fdreads));
715         if (SelectData.maxsocket < LoopBreakerReader)
716                 SelectData.maxsocket = LoopBreakerReader;
717
718         // prepare the sockets for reading and writing
719         size_t i;
720         for (i = 0; i < Descriptors.size(); i++) {
721                 EventableDescriptor *ed = Descriptors[i];
722                 assert (ed);
723                 int sd = ed->GetSocket();
724                 assert (sd != INVALID_SOCKET);
725
726                 if (ed->SelectForRead())
727                         FD_SET (sd, &(SelectData.fdreads));
728                 if (ed->SelectForWrite())
729                         FD_SET (sd, &(SelectData.fdwrites));
730
731                 if (SelectData.maxsocket < sd)
732                         SelectData.maxsocket = sd;
733         }
734
735
736         { // read and write the sockets
737                 //timeval tv = {1, 0}; // Solaris fails if the microseconds member is >= 1000000.
738                 //timeval tv = Quantum;
739                 SelectData.tv = Quantum;
740                 int s = SelectData._Select();
741                 //rb_thread_blocking_region(xxx,(void*)&SelectData,RB_UBF_DFL,0);
742                 //int s = EmSelect (SelectData.maxsocket+1, &(SelectData.fdreads), &(SelectData.fdwrites), NULL, &(SelectData.tv));
743                 //int s = SelectData.nSockets;
744                 if (s > 0) {
745                         /* Changed 01Jun07. We used to handle the Loop-breaker right here.
746                          * Now we do it AFTER all the regular descriptors. There's an
747                          * incredibly important and subtle reason for this. Code on
748                          * loop breakers is sometimes used to cause the reactor core to
749                          * cycle (for example, to allow outbound network buffers to drain).
750                          * If a loop-breaker handler reschedules itself (say, after determining
751                          * that the write buffers are still too full), then it will execute
752                          * IMMEDIATELY if _ReadLoopBreaker is done here instead of after
753                          * the other descriptors are processed. That defeats the whole purpose.
754                          */
755                         for (i=0; i < Descriptors.size(); i++) {
756                                 EventableDescriptor *ed = Descriptors[i];
757                                 assert (ed);
758                                 int sd = ed->GetSocket();
759                                 assert (sd != INVALID_SOCKET);
760
761                                 if (FD_ISSET (sd, &(SelectData.fdwrites)))
762                                         ed->Write();
763                                 if (FD_ISSET (sd, &(SelectData.fdreads)))
764                                         ed->Read();
765                         }
766
767                         if (FD_ISSET (LoopBreakerReader, &(SelectData.fdreads)))
768                                 _ReadLoopBreaker();
769                 }
770                 else if (s < 0) {
771                         // select can fail on error in a handful of ways.
772                         // If this happens, then wait for a little while to avoid busy-looping.
773                         // If the error was EINTR, we probably caught SIGCHLD or something,
774                         // so keep the wait short.
775                         timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000};
776                         EmSelect (0, NULL, NULL, NULL, &tv);
777                 }
778         }
779
780
781         { // dispatch heartbeats
782                 if (gCurrentLoopTime >= NextHeartbeatTime) {
783                         NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
784
785                         for (i=0; i < Descriptors.size(); i++) {
786                                 EventableDescriptor *ed = Descriptors[i];
787                                 assert (ed);
788                                 ed->Heartbeat();
789                         }
790                 }
791         }
792
793         { // cleanup dying sockets
794                 // vector::pop_back works in constant time.
795                 int i, j;
796                 int nSockets = Descriptors.size();
797                 for (i=0, j=0; i < nSockets; i++) {
798                         EventableDescriptor *ed = Descriptors[i];
799                         assert (ed);
800                         if (ed->ShouldDelete())
801                                 delete ed;
802                         else
803                                 Descriptors [j++] = ed;
804                 }
805                 while ((size_t)j < Descriptors.size())
806                         Descriptors.pop_back();
807
808         }
809
810         return true;
811 }
812
813
814 /********************************
815 EventMachine_t::_ReadLoopBreaker
816 ********************************/
817
818 void EventMachine_t::_ReadLoopBreaker()
819 {
820         /* The loop breaker has selected readable.
821          * Read it ONCE (it may block if we try to read it twice)
822          * and send a loop-break event back to user code.
823          */
824         char buffer [1024];
825         read (LoopBreakerReader, buffer, sizeof(buffer));
826         if (EventCallback)
827                 (*EventCallback)("", EM_LOOPBREAK_SIGNAL, "", 0);
828 }
829
830
831 /**************************
832 EventMachine_t::_RunTimers
833 **************************/
834
835 bool EventMachine_t::_RunTimers()
836 {
837         // These are caller-defined timer handlers.
838         // Return T/F to indicate whether we should continue the main loop.
839         // We rely on the fact that multimaps sort by their keys to avoid
840         // inspecting the whole list every time we come here.
841         // Just keep inspecting and processing the list head until we hit
842         // one that hasn't expired yet.
843
844         #ifdef OS_UNIX
845         struct timeval tv;
846         gettimeofday (&tv, NULL);
847         Int64 now = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec));
848         #endif
849
850         #ifdef OS_WIN32
851         unsigned tick = GetTickCount();
852         if (tick < gLastTickCount)
853                 gTickCountTickover += 1;
854         gLastTickCount = tick;
855         Int64 now = ((Int64)gTickCountTickover << 32) + (Int64)tick;
856         #endif
857
858         while (true) {
859                 multimap<Int64,Timer_t>::iterator i = Timers.begin();
860                 if (i == Timers.end())
861                         break;
862                 if (i->first > now)
863                         break;
864                 if (EventCallback)
865                         (*EventCallback) ("", EM_TIMER_FIRED, i->second.GetBinding().c_str(), i->second.GetBinding().length());
866                 Timers.erase (i);
867         }
868         return true;
869 }
870
871
872
873 /***********************************
874 EventMachine_t::InstallOneshotTimer
875 ***********************************/
876
877 const char *EventMachine_t::InstallOneshotTimer (int milliseconds)
878 {
879         if (Timers.size() > MaxOutstandingTimers)
880                 return false;
881         // Don't use the global loop-time variable here, because we might
882         // get called before the main event machine is running.
883
884         #ifdef OS_UNIX
885         struct timeval tv;
886         gettimeofday (&tv, NULL);
887         Int64 fire_at = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec));
888         fire_at += ((Int64)milliseconds) * 1000LL;
889         #endif
890
891         #ifdef OS_WIN32
892         unsigned tick = GetTickCount();
893         if (tick < gLastTickCount)
894                 gTickCountTickover += 1;
895         gLastTickCount = tick;
896
897         Int64 fire_at = ((Int64)gTickCountTickover << 32) + (Int64)tick;
898         fire_at += (Int64)milliseconds;
899         #endif
900
901         Timer_t t;
902         multimap<Int64,Timer_t>::iterator i =
903                 Timers.insert (make_pair (fire_at, t));
904         return i->second.GetBindingChars();
905 }
906
907
908 /*******************************
909 EventMachine_t::ConnectToServer
910 *******************************/
911
912 const char *EventMachine_t::ConnectToServer (const char *server, int port)
913 {
914         /* We want to spend no more than a few seconds waiting for a connection
915          * to a remote host. So we use a nonblocking connect.
916          * Linux disobeys the usual rules for nonblocking connects.
917          * Per Stevens (UNP p.410), you expect a nonblocking connect to select
918          * both readable and writable on error, and not to return EINPROGRESS
919          * if the connect can be fulfilled immediately. Linux violates both
920          * of these expectations.
921          * Any kind of nonblocking connect on Linux returns EINPROGRESS.
922          * The socket will then return writable when the disposition of the
923          * connect is known, but it will not also be readable in case of
924          * error! Weirdly, it will be readable in case there is data to read!!!
925          * (Which can happen with protocols like SSH and SMTP.)
926          * I suppose if you were so inclined you could consider this logical,
927          * but it's not the way Unix has historically done it.
928          * So we ignore the readable flag and read getsockopt to see if there
929          * was an error connecting. A select timeout works as expected.
930          * In regard to getsockopt: Linux does the Berkeley-style thing,
931          * not the Solaris-style, and returns zero with the error code in
932          * the error parameter.
933          * Return the binding-text of the newly-created pending connection,
934          * or NULL if there was a problem.
935          */
936
937         if (!server || !*server || !port)
938                 return NULL;
939
940         int family, bind_size;
941         struct sockaddr *bind_as = name2address (server, port, &family, &bind_size);
942         if (!bind_as)
943                 return NULL;
944
945         int sd = socket (family, SOCK_STREAM, 0);
946         if (sd == INVALID_SOCKET)
947                 return NULL;
948
949         /*
950         sockaddr_in pin;
951         unsigned long HostAddr;
952
953         HostAddr = inet_addr (server);
954         if (HostAddr == INADDR_NONE) {
955                 hostent *hp = gethostbyname ((char*)server); // Windows requires (char*)
956                 if (!hp) {
957                         // TODO: This gives the caller a fatal error. Not good.
958                         // They can respond by catching RuntimeError (blecch).
959                         // Possibly we need to fire an unbind event and provide
960                         // a status code so user code can detect the cause of the
961                         // failure.
962                         return NULL;
963                 }
964                 HostAddr = ((in_addr*)(hp->h_addr))->s_addr;
965         }
966
967         memset (&pin, 0, sizeof(pin));
968         pin.sin_family = AF_INET;
969         pin.sin_addr.s_addr = HostAddr;
970         pin.sin_port = htons (port);
971
972         int sd = socket (AF_INET, SOCK_STREAM, 0);
973         if (sd == INVALID_SOCKET)
974                 return NULL;
975         */
976
977         // From here on, ALL error returns must close the socket.
978         // Set the new socket nonblocking.
979         if (!SetSocketNonblocking (sd)) {
980                 closesocket (sd);
981                 return NULL;
982         }
983         // Disable slow-start (Nagle algorithm).
984         int one = 1;
985         setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one));
986
987         const char *out = NULL;
988
989         #ifdef OS_UNIX
990         //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) {
991         if (connect (sd, bind_as, bind_size) == 0) {
992                 // This is a connect success, which Linux appears
993                 // never to give when the socket is nonblocking,
994                 // even if the connection is intramachine or to
995                 // localhost.
996
997                 /* Changed this branch 08Aug06. Evidently some kernels
998                  * (FreeBSD for example) will actually return success from
999                  * a nonblocking connect. This is a pretty simple case,
1000                  * just set up the new connection and clear the pending flag.
1001                  * Thanks to Chris Ochs for helping track this down.
1002                  * This branch never gets taken on Linux or (oddly) OSX.
1003                  * The original behavior was to throw an unimplemented,
1004                  * which the user saw as a fatal exception. Very unfriendly.
1005                  *
1006                  * Tweaked 10Aug06. Even though the connect disposition is
1007                  * known, we still set the connect-pending flag. That way
1008                  * some needed initialization will happen in the ConnectionDescriptor.
1009                  * (To wit, the ConnectionCompleted event gets sent to the client.)
1010                  */
1011                 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1012                 if (!cd)
1013                         throw std::runtime_error ("no connection allocated");
1014                 cd->SetConnectPending (true);
1015                 Add (cd);
1016                 out = cd->GetBinding().c_str();
1017         }
1018         else if (errno == EINPROGRESS) {
1019                 // Errno will generally always be EINPROGRESS, but on Linux
1020                 // we have to look at getsockopt to be sure what really happened.
1021                 int error;
1022                 socklen_t len;
1023                 len = sizeof(error);
1024                 int o = getsockopt (sd, SOL_SOCKET, SO_ERROR, &error, &len);
1025                 if ((o == 0) && (error == 0)) {
1026                         // Here, there's no disposition.
1027                         // Put the connection on the stack and wait for it to complete
1028                         // or time out.
1029                         ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1030                         if (!cd)
1031                                 throw std::runtime_error ("no connection allocated");
1032                         cd->SetConnectPending (true);
1033                         Add (cd);
1034                         out = cd->GetBinding().c_str();
1035                 }
1036                 else {
1037                         /* This could be connection refused or some such thing.
1038                          * We will come here on Linux if a localhost connection fails.
1039                          * Changed 16Jul06: Originally this branch was a no-op, and
1040                          * we'd drop down to the end of the method, close the socket,
1041                          * and return NULL, which would cause the caller to GET A
1042                          * FATAL EXCEPTION. Now we keep the socket around but schedule an
1043                          * immediate close on it, so the caller will get a close-event
1044                          * scheduled on it. This was only an issue for localhost connections
1045                          * to non-listening ports. We may eventually need to revise this
1046                          * revised behavior, in case it causes problems like making it hard
1047                          * for people to know that a failure occurred.
1048                          */
1049                         ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1050                         if (!cd)
1051                                 throw std::runtime_error ("no connection allocated");
1052                         cd->ScheduleClose (false);
1053                         Add (cd);
1054                         out = cd->GetBinding().c_str();
1055                 }
1056         }
1057         else {
1058                 // The error from connect was something other then EINPROGRESS.
1059         }
1060         #endif
1061
1062         #ifdef OS_WIN32
1063         //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) {
1064         if (connect (sd, bind_as, bind_size) == 0) {
1065                 // This is a connect success, which Windows appears
1066                 // never to give when the socket is nonblocking,
1067                 // even if the connection is intramachine or to
1068                 // localhost.
1069                 throw std::runtime_error ("unimplemented");
1070         }
1071         else if (WSAGetLastError() == WSAEWOULDBLOCK) {
1072                 // Here, there's no disposition.
1073                 // Windows appears not to surface refused connections or
1074                 // such stuff at this point.
1075                 // Put the connection on the stack and wait for it to complete
1076                 // or time out.
1077                 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
1078                 if (!cd)
1079                         throw std::runtime_error ("no connection allocated");
1080                 cd->SetConnectPending (true);
1081                 Add (cd);
1082                 out = cd->GetBinding().c_str();
1083         }
1084         else {
1085                 // The error from connect was something other then WSAEWOULDBLOCK.
1086         }
1087
1088         #endif
1089
1090         if (out == NULL)
1091                 closesocket (sd);
1092         return out;
1093 }
1094
1095 /***********************************
1096 EventMachine_t::ConnectToUnixServer
1097 ***********************************/
1098
1099 const char *EventMachine_t::ConnectToUnixServer (const char *server)
1100 {
1101         /* Connect to a Unix-domain server, which by definition is running
1102          * on the same host.
1103          * There is no meaningful implementation on Windows.
1104          * There's no need to do a nonblocking connect, since the connection
1105          * is always local and can always be fulfilled immediately.
1106          */
1107
1108         #ifdef OS_WIN32
1109         throw std::runtime_error ("unix-domain connection unavailable on this platform");
1110         return NULL;
1111         #endif
1112
1113         // The whole rest of this function is only compiled on Unix systems.
1114         #ifdef OS_UNIX
1115
1116         const char *out = NULL;
1117
1118         if (!server || !*server)
1119                 return NULL;
1120
1121         sockaddr_un pun;
1122         memset (&pun, 0, sizeof(pun));
1123         pun.sun_family = AF_LOCAL;
1124
1125         // You ordinarily expect the server name field to be at least 1024 bytes long,
1126         // but on Linux it can be MUCH shorter.
1127         if (strlen(server) >= sizeof(pun.sun_path))
1128                 throw std::runtime_error ("unix-domain server name is too long");
1129
1130
1131         strcpy (pun.sun_path, server);
1132
1133         int fd = socket (AF_LOCAL, SOCK_STREAM, 0);
1134         if (fd == INVALID_SOCKET)
1135                 return NULL;
1136
1137         // From here on, ALL error returns must close the socket.
1138         // NOTE: At this point, the socket is still a blocking socket.
1139         if (connect (fd, (struct sockaddr*)&pun, sizeof(pun)) != 0) {
1140                 closesocket (fd);
1141                 return NULL;
1142         }
1143
1144         // Set the newly-connected socket nonblocking.
1145         if (!SetSocketNonblocking (fd)) {
1146                 closesocket (fd);
1147                 return NULL;
1148         }
1149
1150         // Set up a connection descriptor and add it to the event-machine.
1151         // Observe, even though we know the connection status is connect-success,
1152         // we still set the "pending" flag, so some needed initializations take
1153         // place.
1154         ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this);
1155         if (!cd)
1156                 throw std::runtime_error ("no connection allocated");
1157         cd->SetConnectPending (true);
1158         Add (cd);
1159         out = cd->GetBinding().c_str();
1160
1161         if (out == NULL)
1162                 closesocket (fd);
1163
1164         return out;
1165         #endif
1166 }
1167
1168 /************************
1169 EventMachine_t::AttachFD
1170 ************************/
1171
1172 const char *EventMachine_t::AttachFD (int fd, bool notify_readable, bool notify_writable)
1173 {
1174         #ifdef OS_UNIX
1175         if (fcntl(fd, F_GETFL, 0) < 0)
1176                 throw std::runtime_error ("invalid file descriptor");
1177         #endif
1178
1179         #ifdef OS_WIN32
1180         // TODO: add better check for invalid file descriptors (see ioctlsocket or getsockopt)
1181         if (fd == INVALID_SOCKET)
1182                 throw std::runtime_error ("invalid file descriptor");
1183         #endif
1184
1185         {// Check for duplicate descriptors
1186                 for (size_t i = 0; i < Descriptors.size(); i++) {
1187                         EventableDescriptor *ed = Descriptors[i];
1188                         assert (ed);
1189                         if (ed->GetSocket() == fd)
1190                                 throw std::runtime_error ("adding existing descriptor");
1191                 }
1192
1193                 for (size_t i = 0; i < NewDescriptors.size(); i++) {
1194                         EventableDescriptor *ed = NewDescriptors[i];
1195                         assert (ed);
1196                         if (ed->GetSocket() == fd)
1197                                 throw std::runtime_error ("adding existing new descriptor");
1198                 }
1199         }
1200
1201         ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this);
1202         if (!cd)
1203                 throw std::runtime_error ("no connection allocated");
1204
1205         cd->SetConnectPending (true);
1206         cd->SetNotifyReadable (notify_readable);
1207         cd->SetNotifyWritable (notify_writable);
1208
1209         Add (cd);
1210
1211         const char *out = NULL;
1212         out = cd->GetBinding().c_str();
1213         if (out == NULL)
1214                 closesocket (fd);
1215         return out;
1216 }
1217
1218 /************************
1219 EventMachine_t::DetachFD
1220 ************************/
1221
1222 int EventMachine_t::DetachFD (EventableDescriptor *ed)
1223 {
1224         if (!ed)
1225                 throw std::runtime_error ("detaching bad descriptor");
1226
1227         #ifdef HAVE_EPOLL
1228         if (bEpoll) {
1229                 if (ed->GetSocket() != INVALID_SOCKET) {
1230                         assert (bEpoll); // wouldn't be in this method otherwise.
1231                         assert (epfd != -1);
1232                         int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent());
1233                         // ENOENT or EBADF are not errors because the socket may be already closed when we get here.
1234                         if (e && (errno != ENOENT) && (errno != EBADF)) {
1235                                 char buf [200];
1236                                 snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno));
1237                                 throw std::runtime_error (buf);
1238                         }
1239                 }
1240         }
1241         #endif
1242
1243         #ifdef HAVE_KQUEUE
1244         if (bKqueue) {
1245                 struct kevent k;
1246                 EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_DELETE, 0, 0, ed);
1247                 int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1248                 assert (t == 0);
1249         }
1250         #endif
1251
1252         { // remove descriptor from lists
1253                 int i, j;
1254                 int nSockets = Descriptors.size();
1255                 for (i=0, j=0; i < nSockets; i++) {
1256                         EventableDescriptor *ted = Descriptors[i];
1257                         assert (ted);
1258                         if (ted != ed)
1259                                 Descriptors [j++] = ted;
1260                 }
1261                 while ((size_t)j < Descriptors.size())
1262                         Descriptors.pop_back();
1263
1264                 ModifiedDescriptors.erase (ed);
1265         }
1266
1267         int fd = ed->GetSocket();
1268
1269         // We depend on ~EventableDescriptor not calling close() if the socket is invalid
1270         ed->SetSocketInvalid();
1271         delete ed;
1272
1273         return fd;
1274 }
1275
1276 /************
1277 name2address
1278 ************/
1279
1280 struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size)
1281 {
1282         // THIS IS NOT RE-ENTRANT OR THREADSAFE. Optimize for speed.
1283         // Check the more-common cases first.
1284         // Return NULL if no resolution.
1285
1286         static struct sockaddr_in in4;
1287         static struct sockaddr_in6 in6;
1288         struct hostent *hp;
1289
1290         if (!server || !*server)
1291                 server = "0.0.0.0";
1292
1293         memset (&in4, 0, sizeof(in4));
1294         if ( (in4.sin_addr.s_addr = inet_addr (server)) != INADDR_NONE) {
1295                 if (family)
1296                         *family = AF_INET;
1297                 if (bind_size)
1298                         *bind_size = sizeof(in4);
1299                 in4.sin_family = AF_INET;
1300                 in4.sin_port = htons (port);
1301                 return (struct sockaddr*)&in4;
1302         }
1303
1304         #ifdef OS_UNIX
1305         memset (&in6, 0, sizeof(in6));
1306         if (inet_pton (AF_INET6, server, in6.sin6_addr.s6_addr) > 0) {
1307                 if (family)
1308                         *family = AF_INET6;
1309                 if (bind_size)
1310                         *bind_size = sizeof(in6);
1311                 in6.sin6_family = AF_INET6;
1312                 in6.sin6_port = htons (port);
1313                 return (struct sockaddr*)&in6;
1314         }
1315         #endif
1316
1317         #ifdef OS_WIN32
1318         // TODO, must complete this branch. Windows doesn't have inet_pton.
1319         // A possible approach is to make a getaddrinfo call with the supplied
1320         // server address, constraining the hints to ipv6 and seeing if we
1321         // get any addresses.
1322         // For the time being, Ipv6 addresses aren't supported on Windows.
1323         #endif
1324
1325         hp = gethostbyname ((char*)server); // Windows requires the cast.
1326         if (hp) {
1327                 in4.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
1328                 if (family)
1329                         *family = AF_INET;
1330                 if (bind_size)
1331                         *bind_size = sizeof(in4);
1332                 in4.sin_family = AF_INET;
1333                 in4.sin_port = htons (port);
1334                 return (struct sockaddr*)&in4;
1335         }
1336
1337         return NULL;
1338 }
1339
1340
1341 /*******************************
1342 EventMachine_t::CreateTcpServer
1343 *******************************/
1344
1345 const char *EventMachine_t::CreateTcpServer (const char *server, int port)
1346 {
1347         /* Create a TCP-acceptor (server) socket and add it to the event machine.
1348          * Return the binding of the new acceptor to the caller.
1349          * This binding will be referenced when the new acceptor sends events
1350          * to indicate accepted connections.
1351          */
1352
1353
1354         int family, bind_size;
1355         struct sockaddr *bind_here = name2address (server, port, &family, &bind_size);
1356         if (!bind_here)
1357                 return NULL;
1358
1359         const char *output_binding = NULL;
1360
1361         //struct sockaddr_in sin;
1362
1363         int sd_accept = socket (family, SOCK_STREAM, 0);
1364         if (sd_accept == INVALID_SOCKET) {
1365                 goto fail;
1366         }
1367
1368         /*
1369         memset (&sin, 0, sizeof(sin));
1370         sin.sin_family = AF_INET;
1371         sin.sin_addr.s_addr = INADDR_ANY;
1372         sin.sin_port = htons (port);
1373
1374         if (server && *server) {
1375                 sin.sin_addr.s_addr = inet_addr (server);
1376                 if (sin.sin_addr.s_addr == INADDR_NONE) {
1377                         hostent *hp = gethostbyname ((char*)server); // Windows requires the cast.
1378                         if (hp == NULL) {
1379                                 //__warning ("hostname not resolved: ", server);
1380                                 goto fail;
1381                         }
1382                         sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
1383                 }
1384         }
1385         */
1386
1387         { // set reuseaddr to improve performance on restarts.
1388                 int oval = 1;
1389                 if (setsockopt (sd_accept, SOL_SOCKET, SO_REUSEADDR, (char*)&oval, sizeof(oval)) < 0) {
1390                         //__warning ("setsockopt failed while creating listener","");
1391                         goto fail;
1392                 }
1393         }
1394
1395         { // set CLOEXEC. Only makes sense on Unix
1396                 #ifdef OS_UNIX
1397                 int cloexec = fcntl (sd_accept, F_GETFD, 0);
1398                 assert (cloexec >= 0);
1399                 cloexec |= FD_CLOEXEC;
1400                 fcntl (sd_accept, F_SETFD, cloexec);
1401                 #endif
1402         }
1403
1404
1405         //if (bind (sd_accept, (struct sockaddr*)&sin, sizeof(sin))) {
1406         if (bind (sd_accept, bind_here, bind_size)) {
1407                 //__warning ("binding failed");
1408                 goto fail;
1409         }
1410
1411         if (listen (sd_accept, 100)) {
1412                 //__warning ("listen failed");
1413                 goto fail;
1414         }
1415
1416         {
1417                 // Set the acceptor non-blocking.
1418                 // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
1419                 if (!SetSocketNonblocking (sd_accept)) {
1420                 //int val = fcntl (sd_accept, F_GETFL, 0);
1421                 //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
1422                         goto fail;
1423                 }
1424         }
1425
1426         { // Looking good.
1427                 AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
1428                 if (!ad)
1429                         throw std::runtime_error ("unable to allocate acceptor");
1430                 Add (ad);
1431                 output_binding = ad->GetBinding().c_str();
1432         }
1433
1434         return output_binding;
1435
1436         fail:
1437         if (sd_accept != INVALID_SOCKET)
1438                 closesocket (sd_accept);
1439         return NULL;
1440 }
1441
1442
1443 /**********************************
1444 EventMachine_t::OpenDatagramSocket
1445 **********************************/
1446
1447 const char *EventMachine_t::OpenDatagramSocket (const char *address, int port)
1448 {
1449         const char *output_binding = NULL;
1450
1451         int sd = socket (AF_INET, SOCK_DGRAM, 0);
1452         if (sd == INVALID_SOCKET)
1453                 goto fail;
1454         // from here on, early returns must close the socket!
1455
1456
1457         struct sockaddr_in sin;
1458         memset (&sin, 0, sizeof(sin));
1459         sin.sin_family = AF_INET;
1460         sin.sin_port = htons (port);
1461
1462
1463         if (address && *address) {
1464                 sin.sin_addr.s_addr = inet_addr (address);
1465                 if (sin.sin_addr.s_addr == INADDR_NONE) {
1466                         hostent *hp = gethostbyname ((char*)address); // Windows requires the cast.
1467                         if (hp == NULL)
1468                                 goto fail;
1469                         sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
1470                 }
1471         }
1472         else
1473                 sin.sin_addr.s_addr = htonl (INADDR_ANY);
1474
1475
1476         // Set the new socket nonblocking.
1477         {
1478                 if (!SetSocketNonblocking (sd))
1479                 //int val = fcntl (sd, F_GETFL, 0);
1480                 //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1)
1481                         goto fail;
1482         }
1483
1484         if (bind (sd, (struct sockaddr*)&sin, sizeof(sin)) != 0)
1485                 goto fail;
1486
1487         { // Looking good.
1488                 DatagramDescriptor *ds = new DatagramDescriptor (sd, this);
1489                 if (!ds)
1490                         throw std::runtime_error ("unable to allocate datagram-socket");
1491                 Add (ds);
1492                 output_binding = ds->GetBinding().c_str();
1493         }
1494
1495         return output_binding;
1496
1497         fail:
1498         if (sd != INVALID_SOCKET)
1499                 closesocket (sd);
1500         return NULL;
1501 }
1502
1503
1504
1505 /*******************
1506 EventMachine_t::Add
1507 *******************/
1508
1509 void EventMachine_t::Add (EventableDescriptor *ed)
1510 {
1511         if (!ed)
1512                 throw std::runtime_error ("added bad descriptor");
1513         ed->SetEventCallback (EventCallback);
1514         NewDescriptors.push_back (ed);
1515 }
1516
1517
1518 /*******************************
1519 EventMachine_t::ArmKqueueWriter
1520 *******************************/
1521
1522 void EventMachine_t::ArmKqueueWriter (EventableDescriptor *ed)
1523 {
1524         #ifdef HAVE_KQUEUE
1525         if (bKqueue) {
1526                 if (!ed)
1527                         throw std::runtime_error ("added bad descriptor");
1528                 struct kevent k;
1529                 EV_SET (&k, ed->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, ed);
1530                 int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1531                 assert (t == 0);
1532         }
1533         #endif
1534 }
1535
1536 /*******************************
1537 EventMachine_t::ArmKqueueReader
1538 *******************************/
1539
1540 void EventMachine_t::ArmKqueueReader (EventableDescriptor *ed)
1541 {
1542         #ifdef HAVE_KQUEUE
1543         if (bKqueue) {
1544                 if (!ed)
1545                         throw std::runtime_error ("added bad descriptor");
1546                 struct kevent k;
1547                 EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed);
1548                 int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1549                 assert (t == 0);
1550         }
1551         #endif
1552 }
1553
1554 /**********************************
1555 EventMachine_t::_AddNewDescriptors
1556 **********************************/
1557
1558 void EventMachine_t::_AddNewDescriptors()
1559 {
1560         /* Avoid adding descriptors to the main descriptor list
1561          * while we're actually traversing the list.
1562          * Any descriptors that are added as a result of processing timers
1563          * or acceptors should go on a temporary queue and then added
1564          * while we're not traversing the main list.
1565          * Also, it (rarely) happens that a newly-created descriptor
1566          * is immediately scheduled to close. It might be a good
1567          * idea not to bother scheduling these for I/O but if
1568          * we do that, we might bypass some important processing.
1569          */
1570
1571         for (size_t i = 0; i < NewDescriptors.size(); i++) {
1572                 EventableDescriptor *ed = NewDescriptors[i];
1573                 if (ed == NULL)
1574                         throw std::runtime_error ("adding bad descriptor");
1575
1576                 #if HAVE_EPOLL
1577                 if (bEpoll) {
1578                         assert (epfd != -1);
1579                         int e = epoll_ctl (epfd, EPOLL_CTL_ADD, ed->GetSocket(), ed->GetEpollEvent());
1580                         if (e) {
1581                                 char buf [200];
1582                                 snprintf (buf, sizeof(buf)-1, "unable to add new descriptor: %s", strerror(errno));
1583                                 throw std::runtime_error (buf);
1584                         }
1585                 }
1586                 #endif
1587
1588                 #if HAVE_KQUEUE
1589                 /*
1590                 if (bKqueue) {
1591                         // INCOMPLETE. Some descriptors don't want to be readable.
1592                         assert (kqfd != -1);
1593                         struct kevent k;
1594                         EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed);
1595                         int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
1596                         assert (t == 0);
1597                 }
1598                 */
1599                 #endif
1600
1601                 Descriptors.push_back (ed);
1602         }
1603         NewDescriptors.clear();
1604 }
1605
1606
1607 /**********************************
1608 EventMachine_t::_ModifyDescriptors
1609 **********************************/
1610
1611 void EventMachine_t::_ModifyDescriptors()
1612 {
1613         /* For implementations which don't level check every descriptor on
1614          * every pass through the machine, as select does.
1615          * If we're not selecting, then descriptors need a way to signal to the
1616          * machine that their readable or writable status has changed.
1617          * That's what the ::Modify call is for. We do it this way to avoid
1618          * modifying descriptors during the loop traversal, where it can easily
1619          * happen that an object (like a UDP socket) gets data written on it by
1620          * the application during #post_init. That would take place BEFORE the
1621          * descriptor even gets added to the epoll descriptor, so the modify
1622          * operation will crash messily.
1623          * Another really messy possibility is for a descriptor to put itself
1624          * on the Modified list, and then get deleted before we get here.
1625          * Remember, deletes happen after the I/O traversal and before the
1626          * next pass through here. So we have to make sure when we delete a
1627          * descriptor to remove it from the Modified list.
1628          */
1629
1630         #ifdef HAVE_EPOLL
1631         if (bEpoll) {
1632                 set<EventableDescriptor*>::iterator i = ModifiedDescriptors.begin();
1633                 while (i != ModifiedDescriptors.end()) {
1634                         assert (*i);
1635                         _ModifyEpollEvent (*i);
1636                         ++i;
1637                 }
1638         }
1639         #endif
1640
1641         ModifiedDescriptors.clear();
1642 }
1643
1644
1645 /**********************
1646 EventMachine_t::Modify
1647 **********************/
1648
1649 void EventMachine_t::Modify (EventableDescriptor *ed)
1650 {
1651         if (!ed)
1652                 throw std::runtime_error ("modified bad descriptor");
1653         ModifiedDescriptors.insert (ed);
1654 }
1655
1656
1657 /***********************************
1658 EventMachine_t::_OpenFileForWriting
1659 ***********************************/
1660
1661 const char *EventMachine_t::_OpenFileForWriting (const char *filename)
1662 {
1663   /*
1664          * Return the binding-text of the newly-opened file,
1665          * or NULL if there was a problem.
1666          */
1667
1668         if (!filename || !*filename)
1669                 return NULL;
1670
1671   int fd = open (filename, O_CREAT|O_TRUNC|O_WRONLY|O_NONBLOCK, 0644);
1672  
1673         FileStreamDescriptor *fsd = new FileStreamDescriptor (fd, this);
1674   if (!fsd)
1675         throw std::runtime_error ("no file-stream allocated");
1676   Add (fsd);
1677   return fsd->GetBinding().c_str();
1678
1679 }
1680
1681
1682 /**************************************
1683 EventMachine_t::CreateUnixDomainServer
1684 **************************************/
1685
1686 const char *EventMachine_t::CreateUnixDomainServer (const char *filename)
1687 {
1688         /* Create a UNIX-domain acceptor (server) socket and add it to the event machine.
1689          * Return the binding of the new acceptor to the caller.
1690          * This binding will be referenced when the new acceptor sends events
1691          * to indicate accepted connections.
1692          * THERE IS NO MEANINGFUL IMPLEMENTATION ON WINDOWS.
1693          */
1694
1695         #ifdef OS_WIN32
1696         throw std::runtime_error ("unix-domain server unavailable on this platform");
1697         #endif
1698
1699         // The whole rest of this function is only compiled on Unix systems.
1700         #ifdef OS_UNIX
1701         const char *output_binding = NULL;
1702
1703         struct sockaddr_un s_sun;
1704
1705         int sd_accept = socket (AF_LOCAL, SOCK_STREAM, 0);
1706         if (sd_accept == INVALID_SOCKET) {
1707                 goto fail;
1708         }
1709
1710         if (!filename || !*filename)
1711                 goto fail;
1712         unlink (filename);
1713
1714         bzero (&s_sun, sizeof(s_sun));
1715         s_sun.sun_family = AF_LOCAL;
1716         strncpy (s_sun.sun_path, filename, sizeof(s_sun.sun_path)-1);
1717
1718         // don't bother with reuseaddr for a local socket.
1719
1720         { // set CLOEXEC. Only makes sense on Unix
1721                 #ifdef OS_UNIX
1722                 int cloexec = fcntl (sd_accept, F_GETFD, 0);
1723                 assert (cloexec >= 0);
1724                 cloexec |= FD_CLOEXEC;
1725                 fcntl (sd_accept, F_SETFD, cloexec);
1726                 #endif
1727         }
1728
1729         if (bind (sd_accept, (struct sockaddr*)&s_sun, sizeof(s_sun))) {
1730                 //__warning ("binding failed");
1731                 goto fail;
1732         }
1733
1734         if (listen (sd_accept, 100)) {
1735                 //__warning ("listen failed");
1736                 goto fail;
1737         }
1738
1739         {
1740                 // Set the acceptor non-blocking.
1741                 // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
1742                 if (!SetSocketNonblocking (sd_accept)) {
1743                 //int val = fcntl (sd_accept, F_GETFL, 0);
1744                 //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
1745                         goto fail;
1746                 }
1747         }
1748
1749         { // Looking good.
1750                 AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
1751                 if (!ad)
1752                         throw std::runtime_error ("unable to allocate acceptor");
1753                 Add (ad);
1754                 output_binding = ad->GetBinding().c_str();
1755         }
1756
1757         return output_binding;
1758
1759         fail:
1760         if (sd_accept != INVALID_SOCKET)
1761                 closesocket (sd_accept);
1762         return NULL;
1763         #endif // OS_UNIX
1764 }
1765
1766
1767 /*********************
1768 EventMachine_t::Popen
1769 *********************/
1770 #if OBSOLETE
1771 const char *EventMachine_t::Popen (const char *cmd, const char *mode)
1772 {
1773         #ifdef OS_WIN32
1774         throw std::runtime_error ("popen is currently unavailable on this platform");
1775         #endif
1776
1777         // The whole rest of this function is only compiled on Unix systems.
1778         // Eventually we need this functionality (or a full-duplex equivalent) on Windows.
1779         #ifdef OS_UNIX
1780         const char *output_binding = NULL;
1781
1782         FILE *fp = popen (cmd, mode);
1783         if (!fp)
1784                 return NULL;
1785
1786         // From here, all early returns must pclose the stream.
1787
1788         // According to the pipe(2) manpage, descriptors returned from pipe have both
1789         // CLOEXEC and NONBLOCK clear. Do NOT set CLOEXEC. DO set nonblocking.
1790         if (!SetSocketNonblocking (fileno (fp))) {
1791                 pclose (fp);
1792                 return NULL;
1793         }
1794
1795         { // Looking good.
1796                 PipeDescriptor *pd = new PipeDescriptor (fp, this);
1797                 if (!pd)
1798                         throw std::runtime_error ("unable to allocate pipe");
1799                 Add (pd);
1800                 output_binding = pd->GetBinding().c_str();
1801         }
1802
1803         return output_binding;
1804         #endif
1805 }
1806 #endif // OBSOLETE
1807
1808 /**************************
1809 EventMachine_t::Socketpair
1810 **************************/
1811
1812 const char *EventMachine_t::Socketpair (char * const*cmd_strings)
1813 {
1814         #ifdef OS_WIN32
1815         throw std::runtime_error ("socketpair is currently unavailable on this platform");
1816         #endif
1817
1818         // The whole rest of this function is only compiled on Unix systems.
1819         // Eventually we need this functionality (or a full-duplex equivalent) on Windows.
1820         #ifdef OS_UNIX
1821         // Make sure the incoming array of command strings is sane.
1822         if (!cmd_strings)
1823                 return NULL;
1824         int j;
1825         for (j=0; j < 100 && cmd_strings[j]; j++)
1826                 ;
1827         if ((j==0) || (j==100))
1828                 return NULL;
1829
1830         const char *output_binding = NULL;
1831
1832         int sv[2];
1833         if (socketpair (AF_LOCAL, SOCK_STREAM, 0, sv) < 0)
1834                 return NULL;
1835         // from here, all early returns must close the pair of sockets.
1836
1837         // Set the parent side of the socketpair nonblocking.
1838         // We don't care about the child side, and most child processes will expect their
1839         // stdout to be blocking. Thanks to Duane Johnson and Bill Kelly for pointing this out.
1840         // Obviously DON'T set CLOEXEC.
1841         if (!SetSocketNonblocking (sv[0])) {
1842                 close (sv[0]);
1843                 close (sv[1]);
1844                 return NULL;
1845         }
1846
1847         pid_t f = fork();
1848         if (f > 0) {
1849                 close (sv[1]);
1850                 PipeDescriptor *pd = new PipeDescriptor (sv[0], f, this);
1851                 if (!pd)
1852                         throw std::runtime_error ("unable to allocate pipe");
1853                 Add (pd);
1854                 output_binding = pd->GetBinding().c_str();
1855         }
1856         else if (f == 0) {
1857                 close (sv[0]);
1858                 dup2 (sv[1], STDIN_FILENO);
1859                 close (sv[1]);
1860                 dup2 (STDIN_FILENO, STDOUT_FILENO);
1861                 execvp (cmd_strings[0], cmd_strings+1);
1862                 exit (-1); // end the child process if the exec doesn't work.
1863         }
1864         else
1865                 throw std::runtime_error ("no fork");
1866
1867         return output_binding;
1868         #endif
1869 }
1870
1871
1872 /****************************
1873 EventMachine_t::OpenKeyboard
1874 ****************************/
1875
1876 const char *EventMachine_t::OpenKeyboard()
1877 {
1878         KeyboardDescriptor *kd = new KeyboardDescriptor (this);
1879         if (!kd)
1880                 throw std::runtime_error ("no keyboard-object allocated");
1881         Add (kd);
1882         return kd->GetBinding().c_str();
1883 }
1884
1885
1886
1887
1888
1889 //#endif // OS_UNIX
1890
Note: See TracBrowser for help on using the browser.