root/trunk/lib/pr_eventmachine.rb

Revision 788, 24.5 kB (checked in by raggi, 8 months ago)

Merge of branches/raggi
Most notable work and patches by Aman Gupta, Roger Pack, and James Tucker.
Patches / Tickets also submitted by: Jeremy Evans, aanand, darix, mmmurf,
danielaquino, macournoyer.

  • Moved docs into docs/ dir
  • Major refactor of rakefile, added generic rakefile helpers in tasks
  • Added example CPP build rakefile in tasks/cpp.rake
  • Moved rake tests out to tasks/tests.rake
  • Added svn ignores where appropriate
  • Fixed jruby build on older java platforms
  • Gem now builds from Rakefile rather than directly via extconf
  • Gem unified for jruby, C++ and pure ruby.
  • Correction for pure C++ build, removing ruby dependency
  • Fix for CYGWIN builds on ipv6
  • Major refactor for extconf.rb
  • Working mingw builds
  • extconf optionally uses pkg_config over manual configuration
  • extconf builds for 1.9 on any system that has 1.9
  • extconf no longer links pthread explicitly
  • looks for kqueue on all *nix systems
  • better error output on std::runtime_error, now says where it came from
  • Fixed some tests on jruby
  • Added test for general send_data flaw, required for a bugfix in jruby build
  • Added timeout to epoll tests
  • Added fixes for java reactor ruby api
  • Small addition of some docs in httpclient.rb and httpcli2.rb
  • Some refactor and fixes in smtpserver.rb
  • Added parenthesis where possible to avoid excess ruby warnings
  • Refactor of $eventmachine_library logic for accuracy and maintenance, jruby
  • EM::start_server now supports unix sockets
  • EM::connect now supports unix sockets
  • EM::defer @threadqueue now handled more gracefully
  • Added better messages on exceptions raised
  • Fix edge case in timer fires
  • Explicitly require buftok.rb
  • Add protocols to autoload, rather than require them all immediately
  • Fix a bug in pr_eventmachine for outbound_q
  • Refactors to take some of the use of defer out of tests.
  • Fixes in EM.defer under start/stop conditions. Reduced scope of threads.
  • Property svn:keywords set to Id
Line 
1 # $Id$
2 #
3 # Author:: Francis Cianfrocca (gmail: blackhedd)
4 # Homepage::  http://rubyeventmachine.com
5 # Date:: 8 Apr 2006
6 #
7 # See EventMachine and EventMachine::Connection for documentation and
8 # usage examples.
9 #
10 #----------------------------------------------------------------------------
11 #
12 # Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
13 # Gmail: blackhedd
14 #
15 # This program is free software; you can redistribute it and/or modify
16 # it under the terms of either: 1) the GNU General Public License
17 # as published by the Free Software Foundation; either version 2 of the
18 # License, or (at your option) any later version; or 2) Ruby's License.
19 #
20 # See the file COPYING for complete licensing information.
21 #
22 #-------------------------------------------------------------------
23 #
24 #
25
26 # TODO List:
27 # TCP-connects currently assume non-blocking connect is available- need to
28 #  degrade automatically on versions of Ruby prior to June 2006.
29 #
30
31 require 'singleton'
32 require 'forwardable'
33 require 'socket'
34 require 'fcntl'
35 require 'set'
36
37
38 module EventMachine
39
40
41   class << self
42     # This is mostly useful for automated tests.
43     # Return a distinctive symbol so the caller knows whether he's dealing
44     # with an extension or with a pure-Ruby library.
45     def library_type
46       :pure_ruby
47     end
48
49     # #initialize_event_machine
50     def initialize_event_machine
51       Reactor.instance.initialize_for_run
52     end
53
54     # #add_oneshot_timer
55     #--
56     # Changed 04Oct06: intervals from the caller are now in milliseconds, but our native-ruby
57     # processor still wants them in seconds.
58     def add_oneshot_timer interval
59       Reactor.instance.install_oneshot_timer(interval / 1000)
60     end
61
62     # run_machine
63     def run_machine
64       Reactor.instance.run
65     end
66
67     # release_machine. Probably a no-op.
68     def release_machine
69     end
70
71     # #stop
72     def stop
73       Reactor.instance.stop
74     end
75
76     # #connect_server. Return a connection descriptor to the caller.
77     # TODO, what do we return here if we can't connect?
78     def connect_server host, port
79       EvmaTCPClient.connect(host, port).uuid
80     end
81
82     # #send_data
83     def send_data target, data, datalength
84       selectable = Reactor.instance.get_selectable( target ) or raise "unknown send_data target"
85       selectable.send_data data
86     end
87
88     # #close_connection
89     # The extension version does NOT raise any kind of an error if an attempt is made
90     # to close a non-existent connection. Not sure whether we should. For now, we'll
91     # raise an error here in that case.
92     def close_connection target, after_writing
93       selectable = Reactor.instance.get_selectable( target ) or raise "unknown close_connection target"
94       selectable.schedule_close after_writing
95     end
96
97     # #start_tcp_server
98     def start_tcp_server host, port
99       (s = EvmaTCPServer.start_server host, port) or raise "no acceptor"
100       s.uuid
101     end
102
103     # #stop_tcp_server
104     def stop_tcp_server sig
105       s = Reactor.instance.get_selectable(sig)
106       s.schedule_close
107     end
108
109     # #start_unix_server
110     def start_unix_server chain
111       (s = EvmaUNIXServer.start_server chain) or raise "no acceptor"
112       s.uuid
113     end
114
115     # #connect_unix_server
116     def connect_unix_server chain
117       EvmaUNIXClient.connect(chain).uuid
118     end
119
120     # #signal_loopbreak
121     def signal_loopbreak
122       Reactor.instance.signal_loopbreak
123     end
124
125     # #get_peername
126     def get_peername sig
127       selectable = Reactor.instance.get_selectable( sig ) or raise "unknown get_peername target"
128       selectable.get_peername
129     end
130
131     # #open_udp_socket
132     def open_udp_socket host, port
133       EvmaUDPSocket.create(host, port).uuid
134     end
135
136     # #send_datagram. This is currently only for UDP!
137     # We need to make it work with unix-domain sockets as well.
138     def send_datagram target, data, datalength, host, port
139       selectable = Reactor.instance.get_selectable( target ) or raise "unknown send_data target"
140       selectable.send_datagram data, Socket::pack_sockaddr_in(port, host)
141     end
142
143
144     # #set_timer_quantum in milliseconds. The underlying Reactor function wants a (possibly
145     # fractional) number of seconds.
146     def set_timer_quantum interval
147       Reactor.instance.set_timer_quantum(( 1.0 * interval) / 1000.0)
148     end
149
150     # #epoll is a harmless no-op in the pure-Ruby implementation. This is intended to ensure
151     # that user code behaves properly across different EM implementations.
152     def epoll
153     end
154
155     # #set_rlimit_nofile is a no-op in the pure-Ruby implementation. We simply return Ruby's built-in
156     # per-process file-descriptor limit.
157     def set_rlimit_nofile n
158       1024
159     end
160
161     # #set_max_timer_count is a harmless no-op in pure Ruby, which doesn't have a built-in limit
162     # on the number of available timers.
163     def set_max_timer_count n
164     end
165
166     # #send_file_data
167     def send_file_data sig, filename
168       sz = File.size(filename)
169       raise "file too large" if sz > 32*1024
170       data =
171       begin
172         File.read filename
173       rescue
174         ""
175       end
176       send_data sig, data, data.length
177     end
178
179     # #get_outbound_data_size
180     #
181     def get_outbound_data_size sig
182       r = Reactor.instance.get_selectable( sig ) or raise "unknown get_outbound_data_size target"
183       r.get_outbound_data_size
184     end
185
186     # #read_keyboard
187     #
188     def read_keyboard
189       EvmaKeyboard.open.uuid
190     end
191
192     # #set_comm_inactivity_timeout
193     #
194     def set_comm_inactivity_timeout sig, tm
195       r = Reactor.instance.get_selectable( sig ) or raise "unknown set_comm_inactivity_timeout target"
196       r.set_inactivity_timeout tm
197     end
198   end
199
200 end
201
202
203 #-----------------------------------------------------------------
204
205 module EventMachine
206
207   class Error < Exception; end
208
209 end
210
211 #-----------------------------------------------------------------
212
213 module EventMachine
214   class Connection
215     def get_outbound_data_size
216       EventMachine::get_outbound_data_size @signature
217     end
218   end
219 end
220
221 #-----------------------------------------------------------------
222
223 module EventMachine
224
225   # Factored out so we can substitute other implementations
226   # here if desired, such as the one in ActiveRBAC.
227   module UuidGenerator
228
229     def self.generate
230       if @ix and @ix >= 10000
231         @ix = nil
232         @seed = nil
233       end
234
235       @seed ||= `uuidgen`.chomp.gsub(/-/,"")
236       @ix ||= 0
237
238       "#{@seed}#{@ix += 1}"
239     end
240
241   end
242
243 end
244
245 #-----------------------------------------------------------------
246
247 module EventMachine
248
249   TimerFired = 100
250   ConnectionData = 101
251   ConnectionUnbound = 102
252   ConnectionAccepted = 103
253   ConnectionCompleted = 104
254   LoopbreakSignalled = 105
255
256 end
257
258 #-----------------------------------------------------------------
259
260 module EventMachine
261 class Reactor
262   include Singleton
263
264   HeartbeatInterval = 2
265
266   attr_reader :current_loop_time
267
268   def initialize
269     initialize_for_run
270   end
271
272   #--
273   # Replaced original implementation 05Dec07, was way too slow because of the sort.
274   def install_oneshot_timer interval
275     uuid = UuidGenerator::generate
276     #@timers << [Time.now + interval, uuid]
277     #@timers.sort! {|a,b| a.first <=> b.first}
278     @timers.add([Time.now + interval, uuid])
279     uuid
280   end
281
282   # Called before run, this is a good place to clear out arrays
283   # with cruft that may be left over from a previous run.
284   def initialize_for_run
285     @running = false
286     @stop_scheduled = false
287     @selectables ||= {}; @selectables.clear
288     @timers = SortedSet.new # []
289     set_timer_quantum(0.1)
290     @current_loop_time = Time.now
291     @next_heartbeat = @current_loop_time + HeartbeatInterval
292   end
293
294   def add_selectable io
295     @selectables[io.uuid] = io
296   end
297
298   def get_selectable uuid
299     @selectables[uuid]
300   end
301
302   def run
303     raise Error.new( "already running" ) if @running
304     @running = true
305
306     begin
307       open_loopbreaker
308
309       loop {
310         @current_loop_time = Time.now
311
312         break if @stop_scheduled
313         run_timers
314         break if @stop_scheduled
315         crank_selectables
316         break if @stop_scheduled
317         run_heartbeats
318       }
319     ensure
320       close_loopbreaker
321       @selectables.each {|k, io| io.close}
322       @selectables.clear
323
324       @running = false
325     end
326
327   end
328
329   def run_timers
330     @timers.each {|t|
331       if t.first <= @current_loop_time
332         @timers.delete t
333         EventMachine::event_callback "", TimerFired, t.last
334       else
335         break
336       end
337     }
338     #while @timers.length > 0 and @timers.first.first <= now
339     #  t = @timers.shift
340     #  EventMachine::event_callback "", TimerFired, t.last
341     #end
342   end
343
344   def run_heartbeats
345     if @next_heartbeat <= @current_loop_time
346       @next_heartbeat = @current_loop_time + HeartbeatInterval
347       @selectables.each {|k,io| io.heartbeat}
348     end
349   end
350
351   def crank_selectables
352       #$stderr.write 'R'
353
354       readers = @selectables.values.select {|io| io.select_for_reading?}
355       writers = @selectables.values.select {|io| io.select_for_writing?}
356
357       s = select( readers, writers, nil, @timer_quantum)
358
359       s and s[1] and s[1].each {|w| w.eventable_write }
360       s and s[0] and s[0].each {|r| r.eventable_read }
361
362       @selectables.delete_if {|k,io|
363         if io.close_scheduled?
364           io.close
365           true
366         end
367       }
368   end
369
370   # #stop
371   def stop
372     raise Error.new( "not running") unless @running
373     @stop_scheduled = true
374   end
375
376   def open_loopbreaker
377           # Can't use an IO.pipe because they can't be set nonselectable in Windows.
378           # Pick a random localhost UDP port.
379     #@loopbreak_writer.close if @loopbreak_writer
380     #rd,@loopbreak_writer = IO.pipe
381         @loopbreak_reader = UDPSocket.new
382         @loopbreak_writer = UDPSocket.new
383         bound = false
384         100.times {
385                 @loopbreak_port = rand(10000) + 40000
386                 begin
387                         @loopbreak_reader.bind "localhost", @loopbreak_port
388                         bound = true
389                         break
390                 rescue
391                 end
392         }
393         raise "Unable to bind Loopbreaker" unless bound
394         LoopbreakReader.new(@loopbreak_reader)
395   end
396
397   def close_loopbreaker
398     @loopbreak_writer.close
399     @loopbreak_writer = nil
400   end
401
402   def signal_loopbreak
403     #@loopbreak_writer.write '+' if @loopbreak_writer
404         @loopbreak_writer.send('+',0,"localhost",@loopbreak_port) if @loopbreak_writer
405   end
406
407   def set_timer_quantum interval_in_seconds
408     @timer_quantum = interval_in_seconds
409   end
410
411 end
412
413 end
414
415
416 #--------------------------------------------------------------
417
418 class IO
419   extend Forwardable
420   def_delegator :@my_selectable, :close_scheduled?
421   def_delegator :@my_selectable, :select_for_reading?
422   def_delegator :@my_selectable, :select_for_writing?
423   def_delegator :@my_selectable, :eventable_read
424   def_delegator :@my_selectable, :eventable_write
425   def_delegator :@my_selectable, :uuid
426   def_delegator :@my_selectable, :send_data
427   def_delegator :@my_selectable, :schedule_close
428   def_delegator :@my_selectable, :get_peername
429   def_delegator :@my_selectable, :send_datagram
430   def_delegator :@my_selectable, :get_outbound_data_size
431   def_delegator :@my_selectable, :set_inactivity_timeout
432   def_delegator :@my_selectable, :heartbeat
433 end
434
435 #--------------------------------------------------------------
436
437 module EventMachine
438   class Selectable
439
440     attr_reader :io, :uuid
441
442     def initialize io
443       @uuid = UuidGenerator.generate
444       @io = io
445       @last_activity = Reactor.instance.current_loop_time
446
447       if defined?(Fcntl::F_GETFL)
448         m = @io.fcntl(Fcntl::F_GETFL, 0)
449         @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | m)
450       else
451               # Windows doesn't define F_GETFL.
452               # It's not very reliable about setting descriptors nonblocking either.
453               begin
454                 s = Socket.for_fd(@io.fileno)
455                 s.fcntl( Fcntl::F_SETFL, Fcntl::O_NONBLOCK )
456               rescue Errno::EINVAL, Errno::EBADF
457                       STDERR.puts "Serious error: unable to set descriptor non-blocking"
458               end
459       end
460       # TODO, should set CLOEXEC on Unix?
461
462       @close_scheduled = false
463       @close_requested = false
464
465       se = self; @io.instance_eval { @my_selectable = se }
466       Reactor.instance.add_selectable @io
467     end
468
469     def close_scheduled?
470       @close_scheduled
471     end
472
473     def select_for_reading?
474       false
475     end
476
477     def select_for_writing?
478       false
479     end
480
481     def get_peername
482       nil
483     end
484
485     def set_inactivity_timeout tm
486       @inactivity_timeout = tm
487     end
488
489     def heartbeat
490     end
491   end
492
493 end
494
495 #--------------------------------------------------------------
496
497
498 module EventMachine
499
500   class StreamObject < Selectable
501     def initialize io
502       super io
503       @outbound_q = []
504     end
505
506     # If we have to close, or a close-after-writing has been requested,
507     # then don't read any more data.
508     def select_for_reading?
509       true unless (@close_scheduled || @close_requested)
510     end
511
512     # If we have to close, don't select for writing.
513     # Otherwise, see if the protocol is ready to close.
514     # If not, see if he has data to send.
515     # If a close-after-writing has been requested and the outbound queue
516     # is empty, convert the status to close_scheduled.
517     def select_for_writing?
518       unless @close_scheduled
519         if @outbound_q.empty?
520           @close_scheduled = true if @close_requested
521           false
522         else
523           true
524         end
525       end
526     end
527
528     # Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006.
529     # If we have it, then we can read multiple times safely to improve
530     # performance.
531     # The last-activity clock ASSUMES that we only come here when we
532     # have selected readable.
533     # TODO, coalesce multiple reads into a single event.
534     # TODO, do the function check somewhere else and cache it.
535     def eventable_read
536       @last_activity = Reactor.instance.current_loop_time
537       begin
538         if io.respond_to?(:read_nonblock)
539           10.times {
540             data = io.read_nonblock(4096)
541             EventMachine::event_callback uuid, ConnectionData, data
542           }
543         else
544           data = io.sysread(4096)
545           EventMachine::event_callback uuid, ConnectionData, data
546         end
547       rescue Errno::EAGAIN, Errno::EWOULDBLOCK
548         # no-op
549       rescue Errno::ECONNRESET, Errno::ECONNREFUSED, EOFError
550         @close_scheduled = true
551         EventMachine::event_callback uuid, ConnectionUnbound, nil
552       end
553
554     end
555
556     # Provisional implementation. Will be re-implemented in subclasses.
557     # TODO: Complete this implementation. As it stands, this only writes
558     # a single packet per cycle. Highly inefficient, but required unless
559     # we're running on a Ruby with proper nonblocking I/O (Ruby 1.8.4
560     # built from sources from May 25, 2006 or newer).
561     # We need to improve the loop so it writes multiple times, however
562     # not more than a certain number of bytes per cycle, otherwise
563     # one busy connection could hog output buffers and slow down other
564     # connections. Also we should coalesce small writes.
565     # URGENT TODO: Coalesce small writes. They are a performance killer.
566     # The last-activity recorder ASSUMES we'll only come here if we've
567     # selected writable.
568     def eventable_write
569       # coalesce the outbound array here, perhaps
570       @last_activity = Reactor.instance.current_loop_time
571       while data = @outbound_q.shift do
572         begin
573           data = data.to_s
574           w = if io.respond_to?(:write_nonblock)
575             io.write_nonblock data
576           else
577             io.syswrite data
578           end
579
580           if w < data.length
581             @outbound_q.unshift data[w..-1]
582             break
583           end
584         rescue Errno::EAGAIN
585           @outbound_q.unshift data
586         rescue EOFError, Errno::ECONNRESET, Errno::ECONNREFUSED
587           @close_scheduled = true
588           @outbound_q.clear
589         end
590       end
591
592     end
593
594     # #send_data
595     def send_data data
596       # TODO, coalesce here perhaps by being smarter about appending to @outbound_q.last?
597       unless @close_scheduled or @close_requested or !data or data.length <= 0
598         @outbound_q << data.to_s
599       end
600     end
601
602     # #schedule_close
603     # The application wants to close the connection.
604     def schedule_close after_writing
605       if after_writing
606         @close_requested = true
607       else
608         @close_scheduled = true
609       end
610     end
611
612     # #get_peername
613     # This is defined in the normal way on connected stream objects.
614     # Return an object that is suitable for passing to Socket#unpack_sockaddr_in or variants.
615     # We could also use a convenience method that did the unpacking automatically.
616     def get_peername
617       io.getpeername
618     end
619
620     # #get_outbound_data_size
621     def get_outbound_data_size
622       @outbound_q.inject(0) {|memo,obj| memo += (obj || "").length}
623     end
624
625     def heartbeat
626       if @inactivity_timeout and (@last_activity + @inactivity_timeout) < Reactor.instance.current_loop_time
627         schedule_close true
628       end
629     end
630   end
631
632
633 end
634
635
636 #--------------------------------------------------------------
637
638
639
640 module EventMachine
641   class EvmaTCPClient < StreamObject
642
643     def self.connect host, port
644       sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
645       begin
646         # TODO, this assumes a current Ruby snapshot.
647         # We need to degrade to a nonblocking connect otherwise.
648         sd.connect_nonblock( Socket.pack_sockaddr_in( port, host ))
649       rescue Errno::EINPROGRESS
650       end
651       EvmaTCPClient.new sd
652     end
653
654
655     def initialize io
656       super
657       @pending = true
658     end
659
660
661     def select_for_writing?
662       @pending ? true : super
663     end
664
665     def select_for_reading?
666       @pending ? false : super
667     end
668
669     def eventable_write
670       if @pending
671         @pending = false
672         if 0 == io.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR).unpack("i").first
673           EventMachine::event_callback uuid, ConnectionCompleted, ""
674         end
675       else
676         super
677       end
678     end
679
680
681
682   end
683 end
684
685 #--------------------------------------------------------------
686
687
688
689 module EventMachine
690   class EvmaKeyboard < StreamObject
691
692     def self.open
693       EvmaKeyboard.new STDIN
694     end
695
696
697     def initialize io
698       super
699     end
700
701
702     def select_for_writing?
703       false
704     end
705
706     def select_for_reading?
707       true
708     end
709
710
711   end
712 end
713
714
715 #--------------------------------------------------------------
716
717
718
719 module EventMachine
720   class EvmaUNIXClient < StreamObject
721
722     def self.connect chain
723       sd = Socket.new( Socket::AF_LOCAL, Socket::SOCK_STREAM, 0 )
724       begin
725         # TODO, this assumes a current Ruby snapshot.
726         # We need to degrade to a nonblocking connect otherwise.
727         sd.connect_nonblock( Socket.pack_sockaddr_un( chain ))
728       rescue Errno::EINPROGRESS
729       end
730       EvmaUNIXClient.new sd
731     end
732
733
734     def initialize io
735       super
736       @pending = true
737     end
738
739
740     def select_for_writing?
741       @pending ? true : super
742     end
743
744     def select_for_reading?
745       @pending ? false : super
746     end
747
748     def eventable_write
749       if @pending
750         @pending = false
751         if 0 == io.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR).unpack("i").first
752           EventMachine::event_callback uuid, ConnectionCompleted, ""
753         end
754       else
755         super
756       end
757     end
758
759
760
761   end
762 end
763
764
765 #--------------------------------------------------------------
766
767 module EventMachine
768   class EvmaTCPServer < Selectable
769
770     # TODO, refactor and unify with EvmaUNIXServer.
771
772     class << self
773       # Versions of ruby 1.8.4 later than May 26 2006 will work properly
774       # with an object of type TCPServer. Prior versions won't so we
775       # play it safe and just build a socket.
776       #
777       def start_server host, port
778         sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
779         sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true )
780         sd.bind( Socket.pack_sockaddr_in( port, host ))
781         sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough.
782         EvmaTCPServer.new sd
783       end
784     end
785
786     def initialize io
787       super io
788     end
789
790
791     def select_for_reading?
792       true
793     end
794
795     #--
796     # accept_nonblock returns an array consisting of the accepted
797     # socket and a sockaddr_in which names the peer.
798     # Don't accept more than 10 at a time.
799     def eventable_read
800       begin
801         10.times {
802           descriptor,peername = io.accept_nonblock
803           sd = StreamObject.new descriptor
804           EventMachine::event_callback uuid, ConnectionAccepted, sd.uuid
805         }
806       rescue Errno::EWOULDBLOCK, Errno::EAGAIN
807       end
808     end
809
810     #--
811     #
812     def schedule_close
813       @close_scheduled = true
814     end
815
816   end
817 end
818
819
820 #--------------------------------------------------------------
821
822 module EventMachine
823   class EvmaUNIXServer < Selectable
824
825     # TODO, refactor and unify with EvmaTCPServer.
826
827     class << self
828       # Versions of ruby 1.8.4 later than May 26 2006 will work properly
829       # with an object of type TCPServer. Prior versions won't so we
830       # play it safe and just build a socket.
831       #
832       def start_server chain
833         sd = Socket.new( Socket::AF_LOCAL, Socket::SOCK_STREAM, 0 )
834         sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true )
835         sd.bind( Socket.pack_sockaddr_un( chain ))
836         sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough.
837         EvmaUNIXServer.new sd
838       end
839     end
840
841     def initialize io
842       super io
843     end
844
845
846     def select_for_reading?
847       true
848     end
849
850     #--
851     # accept_nonblock returns an array consisting of the accepted
852     # socket and a sockaddr_in which names the peer.
853     # Don't accept more than 10 at a time.
854     def eventable_read
855       begin
856         10.times {
857           descriptor,peername = io.accept_nonblock
858           sd = StreamObject.new descriptor
859           EventMachine::event_callback uuid, ConnectionAccepted, sd.uuid
860         }
861       rescue Errno::EWOULDBLOCK, Errno::EAGAIN
862       end
863     end
864
865     #--
866     #
867     def schedule_close
868       @close_scheduled = true
869     end
870
871   end
872 end
873
874
875
876 #--------------------------------------------------------------
877
878 module EventMachine
879   class LoopbreakReader < Selectable
880
881     def select_for_reading?
882       true
883     end
884
885     def eventable_read
886           io.sysread(128)
887           EventMachine::event_callback "", LoopbreakSignalled, ""
888     end
889
890   end
891 end
892
893 #--------------------------------------------------------------
894
895
896 module EventMachine
897
898   class DatagramObject < Selectable
899     def initialize io
900       super io
901       @outbound_q = []
902     end
903
904     # #send_datagram
905     def send_datagram data, target
906       # TODO, coalesce here perhaps by being smarter about appending to @outbound_q.last?
907       unless @close_scheduled or @close_requested
908         @outbound_q << [data.to_s, target]
909       end
910     end
911
912     # #select_for_writing?
913     def select_for_writing?
914       unless @close_scheduled
915         if @outbound_q.empty?
916           @close_scheduled = true if @close_requested
917           false
918         else
919           true
920         end
921       end
922     end
923
924     # #select_for_reading?
925     def select_for_reading?
926       true
927     end
928
929     # #get_outbound_data_size
930     def get_outbound_data_size
931       @outbound_q.inject(0) {|memo,obj| memo += (obj || "").length}
932     end
933
934
935   end
936
937
938 end
939
940
941 #--------------------------------------------------------------
942
943 module EventMachine
944   class EvmaUDPSocket < DatagramObject
945
946     class << self
947       def create host, port
948         sd = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 )
949         sd.bind Socket::pack_sockaddr_in( port, host )
950         EvmaUDPSocket.new sd
951       end
952     end
953
954     # #eventable_write
955     # This really belongs in DatagramObject, but there is some UDP-specific stuff.
956     def eventable_write
957       40.times {
958         break if @outbound_q.empty?
959         begin
960           data,target = @outbound_q.first
961
962           # This damn better be nonblocking.
963           io.send data.to_s, 0, target
964
965           @outbound_q.shift
966         rescue Errno::EAGAIN
967           # It's not been observed in testing that we ever get here.
968           # True to the definition, packets will be accepted and quietly dropped
969           # if the system is under pressure.
970           break
971         rescue EOFError, Errno::ECONNRESET
972           @close_scheduled = true
973           @outbound_q.clear
974         end
975       }
976     end
977
978     # Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006.
979     # If we have it, then we can read multiple times safely to improve
980     # performance.
981     def eventable_read
982       begin
983         if io.respond_to?(:recvfrom_nonblock)
984           40.times {
985             data,@return_address = io.recvfrom_nonblock(16384)
986             EventMachine::event_callback uuid, ConnectionData, data
987             @return_address = nil
988           }
989         else
990           raise "unimplemented datagram-read operation on this Ruby"
991         end
992       rescue Errno::EAGAIN
993         # no-op
994       rescue Errno::ECONNRESET, EOFError
995         @close_scheduled = true
996         EventMachine::event_callback uuid, ConnectionUnbound, nil
997       end
998
999     end
1000
1001
1002     def send_data data
1003       send_datagram data, @return_address
1004     end
1005
1006   end
1007 end
1008
1009 #--------------------------------------------------------------
1010
1011
Note: See TracBrowser for help on using the browser.