Changeset 202

Show
Ignore:
Timestamp:
06/14/06 11:06:37 (2 years ago)
Author:
rosejn
Message:

Implemented new dispatcher scheme, almost have protocol stack chaining done.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • experiments/EventMachine/lib/machine/event.rb

    r201 r202  
    1313    include Base 
    1414 
     15    class << self 
     16      # This allows for aesthetic creation of stacks 
     17      def <<(*args) 
     18        [self, args].flatten 
     19      end 
     20    end 
     21 
    1522    def initialize(dispatcher, *args, &block) 
    1623      @dispatcher = dispatcher 
     
    2532    DEFAULT_MAX_WORKLOAD = 20 
    2633 
    27     def initialize(*args, &block) 
     34    def initialize 
    2835      # TODO: We might want to do some argument checking here to make this 
    2936      # accessible.  Possibly an accessor too for runtime changes... 
     
    4148    # Add a new handler or chain of handlers for the given event type. 
    4249    # TODO: error checking... 
    43     def add_handler(type, target=nil, handler=nil, &block) 
    44       @handlers[type][target] << handler if (target and handler) 
     50    def add_handler(type, src=nil, handler=nil, &block) 
     51      @handlers[type][src] << handler if (src and handler) 
    4552 
    4653      if block 
    47         target = eval('self', block.binding) 
    48         log.debug "add_handler - type: #{type} target: #{target}" 
     54        src = eval('self', block.binding) 
     55        log.debug "add_handler - type: #{type} src: #{src}" 
    4956         
    50         @handlers[type][target] << block  
     57        @handlers[type][src] << block  
    5158      end 
    5259    end 
    5360 
    5461    # Remove the handler(s) for the given event type. 
    55     def remove_handler(type, target
    56       @handlers[type].delete(target) if @handlers.has_key? type 
     62    def remove_handler(type, src
     63      @handlers[type].delete(src) if @handlers.has_key? type 
    5764    end 
    5865 
  • experiments/EventMachine/lib/machine/io.rb

    r200 r202  
    44require 'fcntl' 
    55require 'socket' 
     6require 'delegate' 
    67 
    78module Machine 
     
    1516 
    1617  class IOHandler < Handler 
    17     SelectTimeout = 0.5 
     18    NIO_READ_ATTEMPTS = 10 
    1819 
    1920    @@io_objects = {} 
     
    2122    class << self 
    2223 
    23       #TODO: Can this be done by the dispatcher?  Or at least triggered by it? 
    2424      #-- 
    2525      # Run the I/O machine through one cycle. 
     
    2727      # Then select writables and readables (MAY BLOCK!) 
    2828      # Then post the events. 
    29       def iterate 
     29      def iterate(timeout) 
    3030        @@log.debug '+' # log not defined at class level, kill this when no longer needed 
    31         @@io_objects.delete_if {|io,obj| 
    32           if obj.close_scheduled? 
    33             io.close 
    34             true 
    35           end 
    36         } 
    37  
    38         readers = @@io_objects.map {|io,obj| obj.select_readable? ? io : nil}.compact 
    39         writers = @@io_objects.map {|io,obj| obj.select_writable? ? io : nil}.compact 
    40  
    41         s = select( readers, writers, nil, SelectTimeout ) 
    42  
    43         s and s[1] and s[1].each {|w| @@io_objects[w].event_write } 
    44         s and s[0] and s[0].each {|r| @@io_objects[r].event_read } 
     31 
     32        # TODO: This could get slow if we do it every iteration over a lot of 
     33        # IO objects.  Maybe drive the readers & writers lists from the IO 
     34        # objects (have them call class methods to register) rather than 
     35        # querying here. 
     36        readers = @@io_objects.keys.select {|io| @@io_objects[io].select_readable?} 
     37        writers = @@io_objects.keys.select {|io| @@io_objects[io].select_writable?} 
     38        @@log.debug "select readers.size = #{readers.size} #{readers.inspect}" 
     39        @@log.debug "select writers.size = #{readers.size} #{writers.inspect}" 
     40        s = select(readers, 
     41                   writers, 
     42                   nil, timeout) 
     43 
     44        @@log.debug "select return array: #{s.inspect}" 
     45        s and s[1] and s[1].each {|w| @@io_objects[w].do_write } 
     46        s and s[0] and s[0].each {|r| @@io_objects[r].do_read } 
     47      end 
     48 
     49      def add_io(io, handler) 
     50        @@io_objects[io] = handler 
     51        @@log.debug "Added new io, size: (#{@@io_objects.size})" 
     52      end 
     53 
     54      def remove_io(io) 
     55        @@io_objects.delete(io) 
    4556      end 
    4657    end 
     
    4859    attr_reader :io 
    4960 
    50     # 
    51     # 
    52     # 
    53     def initialize dispatcher, io 
     61    def initialize(dispatcher, io) 
    5462      super 
    5563 
    5664      # Set the socket nonblocking. The new Ruby will actually nonblocking APIs. 
    57       m = io.fcntl(Fcntl::F_GETFL, 0) 
    58       io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | m
     65      io.fcntl(Fcntl::F_SETFL,  
     66               io.fcntl(Fcntl::F_GETFL, 0) | Fcntl::O_NONBLOCK
    5967      @io = io 
    60       @@io_objects[io] = self 
    61  
    62       @outbound_q = [] 
    63       @dispatcher.add_handler(:write) {|evt| 
    64         unless close_scheduled? || close_requested? 
    65           @outbound_q << evt.data 
    66         end 
    67       } 
    68       log.debug "Pushed EventableIO (#{@@io_objects.size})" 
    69  
    70       # Give the user a chance to initialize some stuff BEFORE 
    71       # sending the initialization event. We DEFINE that behavior 
    72       # so people can depend on it. 
    73       # A handler for :bind can be added in the initialize block, 
    74       # but in single-threaded programs it can probably also be added 
    75       # after this initialize method completes. 
    76       yield self if block_given? 
    77       @dispatcher.send_event BindEvent.new 
    78       #send_event( Event.new( :bind )) 
    79  
    80       @should_close = false 
     68      @delegate = SimpleDelegator.new(io) 
     69      build_read(io) 
     70      build_write(io) 
     71      IOHandler.add_io(@io, self) 
     72 
    8173      @closed = false 
    82       @dispatcher.add_handler(:close_connection_event, self, :close_connection) 
    83       @dispatcher.add_handler(:write_raw_data_event, self, :send_data) 
    84     end 
    85  
    86     def close_connection 
    87       @should_close = true 
    88     end 
    89  
    90     def closed? 
    91       @closed 
    92     end 
    93  
    94     def schedule_close 
    95       send_event( Event.new( :unbind )) 
    96       @close_scheduled = true 
    97     end 
    98  
    99     def close_scheduled? 
    100       @close_scheduled 
    101     end 
    102  
    103     def request_close 
    104       # Will close stream after all outbound data has been written. 
    105       @close_requested = true 
    106     end 
    107  
    108     def close_requested? 
    109       @close_requested 
     74      @write_q = [] 
     75 
     76      @dispatcher.add_handler(CloseConnectionEvent, self, :on_close) 
     77      @dispatcher.add_handler(SendRawDataEvent, self, :on_send_raw_data) 
     78 
     79      @dispatcher.send_event(BindEvent.new) 
    11080    end 
    11181 
     
    11585 
    11686    def select_writable? 
    117       @outbound_q.empty? ? false : true 
    118     end 
    119  
    120     #-- 
    121     # sugar 
    122     def send_data data 
    123       send_event( DataEvent.new( :write, data )) 
    124     end 
    125  
    126     def send_close_after_writing 
    127       send_event( Event.new( :close_after_writing )) 
    128     end 
    129  
    130     def close 
    131       @outbound_q.clear 
    132       schedule_close 
    133     end 
    134  
    135     def close_after_writing 
    136       if @outbound_q.empty? 
    137         schedule_close 
    138       else 
    139         request_close 
    140       end 
     87      @write_q.size > 0 
     88    end 
     89 
     90    def on_send_raw_data(event) 
     91      @write_q << event.data 
     92      log.debug "send_raw_data: #{event.data.size} bytes added to write_q" 
     93    end 
     94 
     95    def on_close(event) 
     96      close(event.now) 
     97    end 
     98 
     99    # Default behavior is to complete all outgoing IO, and then close the 
     100    # connection.  If kill = true then pending events will be dropped and the 
     101    # connection will close immediately. 
     102    def close(now = false) 
     103      @closed = true 
     104      @dispatcher.remove_handler(RecvRawDataEvent, self) 
     105      @dispatcher.remove_handler(CloseConnectionEvent, self) 
     106      log.debug "IOHandler closed..." 
     107 
     108      kill if now or @write_q.empty? 
     109    end 
     110 
     111    private 
     112 
     113    def kill 
     114      IOHandler.remove_io(self) 
     115      @write_q.clear 
     116      log.debug "IOHandler killed!!!" 
    141117    end 
    142118 
     
    146122    # performance. 
    147123    # TODO, coalesce multiple reads into a single event. 
    148     # TODO, do the function check somewhere else and cache it. 
    149     def read_handler 
    150       begin 
    151         if io.respond_to?(:read_nonblock) 
    152           10.times { 
    153             r = io.read_nonblock(4096) 
    154             send_event( DataEvent.new( :read, r )) 
    155           } 
    156         else 
    157           r = io.sysread(4096) 
    158           send_event( DataEvent.new( :read, r )) 
     124    def build_read(io) 
     125      if io.respond_to?(:read_nonblock) 
     126 
     127        def do_read 
     128          data = '' 
     129          begin 
     130            NIO_READ_ATTEMPTS.times do 
     131              data << io.read_nonblock(4096) 
     132            end 
     133          rescue Errno::EAGAIN 
     134          rescue EOFError, Errno::ECONNRESET 
     135            kill 
     136          end 
     137          @dispatcher.send_event(RecvRawDataEvent.new(data)) 
    159138        end 
    160       rescue Errno::EAGAIN 
    161       rescue EOFError, Errno::ECONNRESET 
    162         schedule_close 
    163       end 
    164     end 
    165  
    166     # Provisional implementation. Will be re-implemented in subclasses. 
     139 
     140      else 
     141 
     142        def do_read 
     143          begin 
     144          data = io.sysread(4096) 
     145          @dispatcher.send_event(RecvRawDataEvent.new(data)) 
     146          rescue Errno::EAGAIN 
     147          rescue EOFError, Errno::ECONNRESET 
     148            kill 
     149          end 
     150        end 
     151      end 
     152    end 
     153 
    167154    # TODO: Complete this implementation. As it stands, this only writes 
    168155    # a single packet per cycle. Highly inefficient, but required unless 
     
    173160    # one busy connection could hog output buffers and slow down other 
    174161    # connections. Also we should coalesce small writes. 
    175     def write_handler 
    176       if data = @outbound_q.shift 
    177         begin 
    178           data = data.to_s 
    179  
    180           w = if io.respond_to?(:write_nonblock) 
    181                 io.write_nonblock( data ) 
    182               else 
    183                 io.syswrite( data ) 
    184               end 
    185  
    186           @outbound_q.unshift( data[w..-1] ) if w < data.length 
    187           schedule_close if (close_requested? and @outbound_q.empty?) 
    188         rescue Errno::EAGAIN 
    189           @outbound_q.unshift data 
    190         rescue EOFError, Errno::ECONNRESET 
    191           schedule_close 
     162    def build_write(io) 
     163      if io.respond_to?(:write_nonblock) 
     164 
     165        def do_write 
     166          log.debug "inside do_write..." 
     167          if data = @write_q.shift 
     168            begin 
     169              wrote = io.write_nonblock(data.to_s) 
     170              log.debug "write_nonblock sent #{wrote} bytes" 
     171              @write_q.unshift(data[wrote..-1]) if wrote < data.length 
     172            rescue Errno::EAGAIN 
     173              @write_q.unshift data 
     174            rescue EOFError, Errno::ECONNRESET 
     175              close 
     176            end 
     177          end 
     178          kill if @closed and @write_q.empty? 
    192179        end 
    193       end 
    194     end 
    195  
     180 
     181      else 
     182 
     183        def do_write 
     184          if data = @write_q.shift 
     185            begin 
     186              wrote = io.syswrite(data.to_s) 
     187              @write_q.unshift(data[wrote..-1]) if wrote < data.length 
     188            rescue Errno::EAGAIN 
     189              @write_q.unshift data 
     190            rescue EOFError, Errno::ECONNRESET 
     191              close 
     192            end 
     193          end 
     194          kill if @closed and @write_q.empty? 
     195        end 
     196      end 
     197    end 
    196198  end 
    197 end 
    198  
    199 ##################################### 
    200  
    201 module Machine 
    202   class TcpConnectEventableIO < IOHandler 
     199 
     200 
     201  class TCPClient < IOHandler 
     202    ConnectEvent = EventType.new :host, :port 
    203203 
    204204    # We assume we're getting a TCP socket on which 
     
    207207    # When it selects writable, the connect has completed. 
    208208    # 
    209     def initialize *args 
    210       @pending = true 
    211     end 
    212  
     209    def initialize(dispatcher, host, port, stack = [], &block) 
     210      @dispatcher = dispatcher 
     211      @host = host 
     212      @port = port 
     213 
     214      sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) 
     215      begin 
     216        sd.connect_nonblock( Socket.pack_sockaddr_in( port, host )) 
     217      rescue Errno::EINPROGRESS 
     218      end 
     219 
     220      super(dispatcher, sd) 
     221 
     222      stack.each {|proto| proto.new(dispatcher)} 
     223      block.call(dispatcher) if block 
     224    end 
     225 
     226    # This method is only defined until connection is made. 
    213227    def select_writable? 
    214       @pending ? true : super 
    215     end 
    216  
     228      true 
     229    end 
     230 
     231    # This method is only defined until connection is made. 
    217232    def select_readable? 
    218       @pending ? false : super 
    219     end 
    220  
    221     def write_handler 
    222       if @pending 
    223         @pending = false 
    224         send_event Event.new(:connect) 
    225       else 
    226         super 
    227       end 
    228     end 
    229  
     233      false 
     234    end 
     235 
     236    # This method will be called once on connect, send the connect event and 
     237    # then all further calls will go to the parent, IOHandler. 
     238    def do_write 
     239      remove_method :select_writable? 
     240      remove_method :select_readable? 
     241      remove_method :do_write 
     242      @dispatcher.send_event(ConnectEvent.new(host, port)) 
     243    end 
    230244  end 
    231 end 
    232  
    233  
    234 ##################################### 
    235  
    236 module Machine 
    237   class TcpServer < IOHandler 
     245 
     246 
     247  class TCPServer < IOHandler 
     248    AcceptEvent = EventType.new :host, :port 
     249 
     250    LISTEN_BACKLOG_SIZE = 50 # 5 is what you see in all the books. Ain't enough. 
     251    NIO_ACCEPT_ATTEMPTS = 10 
    238252 
    239253    #-- 
     
    247261    # Of course we'll also need named pipes and whatever that Windows 
    248262    # near-equivalent is called. 
    249     # RETURNS: the newly-created eventable-io object, so the caller 
    250     # can add handlers, etc. 
    251     # 
    252     def self.start_server host, port 
    253       sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) 
    254       sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true ) 
    255       sd.bind( Socket.pack_sockaddr_in( port, host )) 
    256       sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough. 
    257       TcpServerEventableIO.new sd 
    258     end 
    259  
    260     #-- 
    261     # Ruby accept_nonblock is applied on class Socket, 
    262     # but for some unknown reason, TCPServer is not a 
    263     # subclass of Socket. It's a subclass of IO->BasicSocket. 
    264     # So we can't do non-blocking I/O of TCPServers. 
    265     # This is the required idiom for creating a TCP server: 
    266     # sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0) 
    267     # sd.bind( Socket.pack_sockaddr_in( port, server )) 
    268     # sd.listen(5) 
    269     # eio = TcpServerEventableIO.new( sd ) 
    270     # 
    271     # For Unix-domain sockets, the idiom is: 
    272     # sd = Socket.new( Socket::AF_UNIX, Socket::SOCK_STREAM, 0) 
    273     # sd.bind( Socket.pack_sockaddr_un( socketname )) 
    274     # sd.listen(5) 
    275     # eio = TcpServerEventableIO.new( sd ) 
    276     # 
    277     # 
    278     def initialize io 
    279       super 
     263    def self.initialize(dispatcher, host, port, stack = [], &block) 
     264      sd = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) 
     265      sd.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true) 
     266      sd.bind( Socket.pack_sockaddr_in(port, host)) 
     267      sd.listen(LISTEN_BACKLOG_SIZE) 
     268 
     269      super(dispatcher, sd) 
    280270    end 
    281271 
     
    291281    # accept_nonblock returns an array consisting of the accepted 
    292282    # socket and a sockaddr_in which names the peer. 
    293     def read_handler 
     283    def do_read 
    294284      begin 
    295         10.times { 
    296           sd = io.accept_nonblock 
    297           send_event( DataEvent.new( :accept, sd[0] )) 
    298         } 
     285        NIO_ACCEPT_ATTEMPTS.times do 
     286          sd, addr = io.accept_nonblock 
     287        end  
    299288      rescue Errno::EWOULDBLOCK, Errno::EAGAIN 
    300289      end 
    301     end 
    302  
     290 
     291      # Now we build the stack on top of the new socket. 
     292      stack.each {|proto| proto.new(dispatcher)} 
     293      block.call(dispatcher) if block 
     294 
     295      port, host = Socket.unpack_sockaddr_in(addr) 
     296      @dispatcher.send_event(AcceptEvent.new(host, port)) 
     297    end 
    303298  end 
     299 
    304300end 
    305301 
  • experiments/EventMachine/lib/machine/protocol.rb

    r200 r202  
    11module Machine 
    2   class Protocol 
    3     include Base 
    4  
    5     def initialize(dispatcher, *args, &block) 
    6       @dispatcher = dispatcher 
    7     end 
     2  # This might not be necessary any more... 
     3  class Protocol < Handler 
    84  end 
    95 
  • experiments/EventMachine/lib/machine/protocols/line.rb

    r166 r202  
    1010 
    1111  # Basic line handler that breaks up raw input based on a delimiter. 
    12   class LineHandler < Handler 
     12  class LineHandler < Protocol 
    1313    LINE_DELIMITER = "/r/n" 
    1414    LINE_DELIMITER_RE = /\r?\n/m 
     
    2222      @linebuffer = "" 
    2323 
    24       @dispatcher.add_handler(:recv_raw_data, self, :receive_raw_data) 
    25       @dispatcher.add_handler(:send_line_event, self, :send_line) 
     24      @dispatcher.add_handler(:recv_raw_data, self, :handle_receive_raw_data) 
     25      @dispatcher.add_handler(:send_line_event, self, :handle_send_line) 
    2626    end 
    2727 
    28     def receive_data event 
     28    def handle_receive_raw_data(event) 
    2929      @linebuffer << event.data 
    3030      while a = @linebuffer.split( /\r?\n/m, 2 ) 
    3131        if a[1] 
    3232          @linebuffer = a[1] 
    33           send_event DataEvent.new(:read_line, a[0] ) 
     33          line_received a[0] 
    3434        else 
    3535          break 
    3636        end 
    3737      end 
    38  
    3938    end 
    4039 
    41     def send_line event 
     40    # Default implementation is to send an event.  Classes can subclass 
     41    # LineHandler and implement this method to create custom line handlers. 
     42    def line_received(line) 
     43      send_event DataEvent.new(:read_line, line) 
     44    end 
     45 
     46    def send_line(event) 
    4247      sender = proc {|line| @dispatcher.send_event(SendRawData.new(line + LINE_DELIMITER)) } 
    4348 
  • experiments/EventMachine/lib/machine/reactor.rb

    r200 r202  
    1010    include Base 
    1111    include Singleton 
     12 
     13    DEFAULT_SELECT_TIMEOUT = 0.1 
    1214 
    1315    @@bootstrappers = [] 
     
    9597        @@bootstrappers.clear 
    9698 
     99        next_timeout = Timer.iterate  
     100 
    97101        work_left = iterate 
     102        log.debug "work left from dispatchers: #{work_left}" 
    98103 
    99104        # If the reactor was stopped while dispatching events we don't want to 
    100105        # block again. 
    101106        if @running 
    102           log.debug("run loop sleeping for #{Timer.next} seconds...") 
    103  
    104107          # If there are still events to process we don't want to block on 
    105108          # select here. 
     
    107110            timeout = 0 
    108111          else 
    109             timeout = Timer.next || 0.1 
     112            timeout = DEFAULT_SELECT_TIMEOUT 
    110113          end 
    111114 
     115          log.debug("run loop sleeping for #{timeout} seconds...") 
     116 
    112117          # TODO: Figure out the best behavior here... 
    113           IO.select(nil, nil, nil, timeout) 
     118          IOHandler.iterate(timeout) 
    114119        end 
    115120      end 
     
    120125    def iterate 
    121126      log.debug "iterating..." 
    122  
    123       # Iterate over each of these special dispatchers. 
    124       # TODO: Maybe they should just be treated like every other dispatcher? 
    125       [ 
    126         Timer,  
    127         EventSubscriber,  
    128         IOHandler 
    129       ].each {|i| i.iterate} 
    130127 
    131128      @dispatchers.inject(0) {|mem, d| mem += d.iterate } 
  • experiments/EventMachine/lib/machine/signal.rb

    r166 r202  
    2323 
    2424    def initialize(sig, &block) 
    25       super 
     25      super() 
    2626 
    2727      @sig = sig 
  • experiments/EventMachine/lib/machine/timer.rb

    r201 r202  
    1616 
    1717      # Get the time in seconds until the next timer should fire. 
    18       def next 
    19         return 0 if @@timers.empty? 
     18      def next_timeout 
     19        return nil if @@timers.empty? 
    2020 
    2121        [0, (@@timers.first.expiration - Time.now.to_i)].max 
     
    3636          timer.send_event(TimerEvent.new, true) if timer.alive? 
    3737        end 
     38 
     39        next_timeout 
    3840      end 
    3941    end 
     
    4345 
    4446    def initialize(delay, periodic = false, &block) 
    45       super 
     47      super() 
    4648 
    4749      @delay = delay 
  • experiments/EventMachine/test/event_test.rb

    r200 r202  
    5959  end 
    6060 
     61  def test_handler_chain 
     62    stack = Handler << Handler << Handler 
     63    assert_equal([Handler, Handler, Handler], stack) 
     64  end 
     65 
    6166  def stage_one event 
    6267    event.value += 1 
  • experiments/EventMachine/test/io_test.rb

    r200 r202  
    22 
    33$:.unshift(File.dirname(__FILE__) + '/../lib') 
    4  
    54 
    65require 'test/unit' 
     
    87require 'socket' 
    98 
    10  
    11 class IoTests < Test::Unit::TestCase 
     9class TestIOHandler < Test::Unit::TestCase 
    1210  include Machine 
    1311 
     12  TEST_DATA_1 = "aaabbbccc" 
     13 
    1414  def setup 
    15     # Have a thread going, because it messes up Ruby's io.sysread function 
    16     Thread.new {sleep 1000} 
     15    @reactor = Reactor.instance 
    1716  end 
    1817 
     
    2019  end 
    2120 
    22   def test_tcp_server 
    23     Reactor.run { 
    24       Timer.new(3) {Reactor.stop} 
    25     } 
    26   end 
     21  def test_io_handler 
     22#    @reactor.verbose 
    2723 
     24    a_dispatch = EventDispatcher.new 
     25    b_dispatch = EventDispatcher.new 
    2826 
    29   def test_eio 
    30 =begin 
    31     sin = Socket::pack_sockaddr_in(25, "relay.spheriq.net") 
    32     40.times { 
    33     #sd = TCPSocket.new("relay.spheriq.net", 25) 
    34     #p "Connected" 
    35     #eio = EventableIO.new(sd) {|me| 
    36     sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0) 
    37     begin 
    38     sd.connect_nonblock sin 
    39     rescue Errno::EINPROGRESS 
    40     p "In progress!" 
     27    got_bind = false 
     28    a_dispatch.add_handler(BindEvent) do  
     29      got_bind = true 
    4130    end 
    42     eio = TcpConnectEventableIO.new(sd) {|me| 
    43       me.add_handler(:connect) { 
    44         p "CONNECTED$$$" 
    45       } 
    46       me.add_handler(:read) {|data| 
    47         p "READ DATA: #{data.data}" 
    48       } 
    49       me.add_handler(:unbind) {|data| 
    50         p "UNBOUND" 
    51       } 
    52     } 
    53     } 
    54 =end 
    5531 
    56       #sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0) 
    57       #sockaddr = Socket.pack_sockaddr_in( 8901, "127.0.0.1" ) 
    58       #sd.bind sockaddr 
    59       #sd.listen( 100 ) 
     32    a,b = Socket::socketpair( Socket::AF_UNIX, Socket::SOCK_STREAM, 0) 
     33    a_handler = IOHandler.new(a_dispatch, a) 
     34    b_handler = IOHandler.new(b_dispatch, b) 
    6035 
    61 =begin 
    62       File.unlink("./a.chain") if File.exists?("/a.chain") 
    63       sd = Socket.new( Socket::AF_UNIX, Socket::SOCK_STREAM, 0) 
    64       sockaddr = Socket.pack_sockaddr_un( "./a.chain" ) 
    65       sd.bind sockaddr 
    66       sd.listen( 100 ) 
    67       eio = TcpServerEventableIO.new(sd) {|me| 
    68         me.add_handler(:accept) {|evt| 
    69           eio = EventableIO.new(evt.data) {|me| 
    70             me.add_handler(:read) {|evt| 
    71               me.send_data "<<<#{evt.data.chomp}>>>\n" 
    72             } 
    73           } 
    74         } 
    75       } 
    76       Reactor.instance.run 
    77 =end 
     36    raw_data = nil 
     37    b_dispatch.add_handler(RecvRawDataEvent) do |event| 
     38      raw_data = event.data 
     39      b_dispatch.send_event(CloseConnectionEvent.new(false)) 
     40    end 
    7841 
    79     assert_equal(1,1) 
     42    Timer.new(0) { a_dispatch.send_event(SendRawDataEvent.new(TEST_DATA_1)) } 
     43    Timer.new(2) { @reactor.stop } 
     44 
     45    @reactor.run 
     46 
     47    assert(got_bind, "Didn't get bind event from IOHandler") 
     48    assert_equal(TEST_DATA_1, raw_data.slice(0, TEST_DATA_1.size)) 
    8049  end 
    8150 
  • experiments/EventMachine/test/timer_test.rb

    r200 r202  
    3333    assert_equal 1, t1, "Regular block timer did not fire." 
    3434     
    35     # TODO:  This off by 1 thing with the periodic timer annoys me...  I'm not sure 
    36     # this be less than 3. 
    37     assert t2 < 3, "Periodic block timer did not fire correctly." 
     35    assert t2 = 3, "Periodic block timer did not fire correctly." 
    3836    assert_equal 1, @t3, "Method based timer did not fire." 
    3937  end 
  • experiments/EventMachine/TODO

    r159 r202  
    1  
    2 * Figure out how to unit test the bottom end networking code. 
    3  
    4 * Make EventDispatcher initialize even if the including class doesn't have 
    5 an initialize method.