Changeset 166

Show
Ignore:
Timestamp:
05/31/06 13:30:08 (2 years ago)
Author:
rosejn
Message:

Adding new branch for the even more event based EventMachine

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • experiments/EventMachine/lib/machine/eio.rb

    r159 r166  
    66 
    77module Machine 
    8   class DataEvent < Event 
    9     attr_accessor :data 
    10     def initialize type, init_data = nil 
    11       super type 
    12       @data = init_data 
    13     end 
     8  # Receives 
     9  SendRawDataEvent = EventType.new :data 
     10  CloseConnectionEvent = EventType.new :now 
     11 
     12  # Sends 
     13  RecvRawDataEvent = EventType.new :data 
     14 
     15  class IOHandler < Handler 
     16    SelectTimeout = 0.5 
     17 
     18    @@io_objects = {} 
     19 
     20    class << self 
     21 
     22      #TODO: Can this be done by the dispatcher?  Or at least triggered by it? 
     23      #-- 
     24      # Run the I/O machine through one cycle. 
     25      # First close and delete any object that is closed or closing. 
     26      # Then select writables and readables (MAY BLOCK!) 
     27      # Then post the events. 
     28      def iterate 
     29        @@log.debug '+' # log not defined at class level, kill this when no longer needed 
     30        @@io_objects.delete_if {|io,obj| 
     31          if obj.close_scheduled? 
     32            io.close 
     33            true 
     34          end 
     35        } 
     36 
     37        readers = @@io_objects.map {|io,obj| obj.select_readable? ? io : nil}.compact 
     38        writers = @@io_objects.map {|io,obj| obj.select_writable? ? io : nil}.compact 
     39 
     40        s = select( readers, writers, nil, SelectTimeout ) 
     41 
     42        s and s[1] and s[1].each {|w| @@io_objects[w].event_write } 
     43        s and s[0] and s[0].each {|r| @@io_objects[r].event_read } 
     44      end 
     45    end 
     46 
     47    attr_reader :io 
     48 
     49    # 
     50    # 
     51    # 
     52    def initialize dispatcher, io = nil 
     53      super 
     54 
     55      # Set the socket nonblocking. The new Ruby will actually nonblocking APIs. 
     56      m = io.fcntl(Fcntl::F_GETFL, 0) 
     57      io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | m) 
     58      @io = io 
     59      @@io_objects[io] = self 
     60 
     61      @outbound_q = [] 
     62      add_handler(:write) {|evt| 
     63        unless close_scheduled? || close_requested? 
     64          @outbound_q << evt.data 
     65        end 
     66      } 
     67      log.debug "Pushed EventableIO (#{@@io_objects.size})" 
     68 
     69      # Give the user a chance to initialize some stuff BEFORE 
     70      # sending the initialization event. We DEFINE that behavior 
     71      # so people can depend on it. 
     72      # A handler for :bind can be added in the initialize block, 
     73      # but in single-threaded programs it can probably also be added 
     74      # after this initialize method completes. 
     75      yield self if block_given? 
     76      send_event( Event.new( :bind )) 
     77 
     78      @should_close = false 
     79      @closed = false 
     80      @dispatcher.add_handler(:close_connection_event, self, :close_connection) 
     81      @dispatcher.add_handler(:write_raw_data_event, self, :send_data) 
     82    end 
     83 
     84    def close_connection 
     85      @should_close = true 
     86    end 
     87 
     88    def closed? 
     89      @closed 
     90    end 
     91 
     92    def schedule_close 
     93      send_event( Event.new( :unbind )) 
     94      @close_scheduled = true 
     95    end 
     96 
     97    def close_scheduled? 
     98      @close_scheduled 
     99    end 
     100 
     101    def request_close 
     102      # Will close stream after all outbound data has been written. 
     103      @close_requested = true 
     104    end 
     105 
     106    def close_requested? 
     107      @close_requested 
     108    end 
     109 
     110    def select_readable? 
     111      true 
     112    end 
     113 
     114    def select_writable? 
     115      @outbound_q.empty? ? false : true 
     116    end 
     117 
     118    #-- 
     119    # sugar 
     120    def send_data data 
     121      send_event( DataEvent.new( :write, data )) 
     122    end 
     123 
     124    def send_close_after_writing 
     125      send_event( Event.new( :close_after_writing )) 
     126    end 
     127 
     128    def close 
     129      @outbound_q.clear 
     130      schedule_close 
     131    end 
     132 
     133    def close_after_writing 
     134      if @outbound_q.empty? 
     135        schedule_close 
     136      else 
     137        request_close 
     138      end 
     139    end 
     140 
     141    # Provisional implementation. Will be re-implemented in subclasses. 
     142    # Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006. 
     143    # If we have it, then we can read multiple times safely to improve 
     144    # performance. 
     145    # TODO, coalesce multiple reads into a single event. 
     146    # TODO, do the function check somewhere else and cache it. 
     147    def read_handler 
     148      begin 
     149        if io.respond_to?(:read_nonblock) 
     150          10.times { 
     151            r = io.read_nonblock(4096) 
     152            send_event( DataEvent.new( :read, r )) 
     153          } 
     154        else 
     155          r = io.sysread(4096) 
     156          send_event( DataEvent.new( :read, r )) 
     157        end 
     158      rescue Errno::EAGAIN 
     159      rescue EOFError, Errno::ECONNRESET 
     160        schedule_close 
     161      end 
     162    end 
     163 
     164    # Provisional implementation. Will be re-implemented in subclasses. 
     165    # TODO: Complete this implementation. As it stands, this only writes 
     166    # a single packet per cycle. Highly inefficient, but required unless 
     167    # we're running on a Ruby with proper nonblocking I/O (Ruby 1.8.4 
     168    # built from sources from May 25, 2006 or newer). 
     169    # We need to improve the loop so it writes multiple times, however 
     170    # not more than a certain number of bytes per cycle, otherwise 
     171    # one busy connection could hog output buffers and slow down other 
     172    # connections. Also we should coalesce small writes. 
     173    def write_handler 
     174      if data = @outbound_q.shift 
     175        begin 
     176          data = data.to_s 
     177 
     178          w = if io.respond_to?(:write_nonblock) 
     179                io.write_nonblock( data ) 
     180              else 
     181                io.syswrite( data ) 
     182              end 
     183 
     184          @outbound_q.unshift( data[w..-1] ) if w < data.length 
     185          schedule_close if (close_requested? and @outbound_q.empty?) 
     186        rescue Errno::EAGAIN 
     187          @outbound_q.unshift data 
     188        rescue EOFError, Errno::ECONNRESET 
     189          schedule_close 
     190        end 
     191      end 
     192    end 
     193 
    14194  end 
    15195end 
    16196 
     197##################################### 
    17198 
    18199module Machine 
    19 class EventableIO 
    20   include Base 
    21   include EventDispatcher 
    22  
    23   SelectTimeout = 0.5 
    24  
    25   @@io_objects = {} 
    26  
    27  
    28   class << self 
     200  class TcpConnectEventableIO < IOHandler 
     201 
     202    # We assume we're getting a TCP socket on which 
     203    # connect_nonblock has been called. 
     204    # DO NOT attempt to read the socket. 
     205    # When it selects writable, the connect has completed. 
     206    # 
     207    def initialize *args 
     208      @pending = true 
     209    end 
     210 
     211    def select_writable? 
     212      @pending ? true : super 
     213    end 
     214 
     215    def select_readable? 
     216      @pending ? false : super 
     217    end 
     218 
     219    def write_handler 
     220      if @pending 
     221        @pending = false 
     222        send_event Event.new(:connect) 
     223      else 
     224        super 
     225      end 
     226    end 
     227 
     228  end 
     229end 
     230 
     231 
     232##################################### 
     233 
     234module Machine 
     235  class TcpServer < IOHandler 
    29236 
    30237    #-- 
    31     # Run the I/O machine through one cycle. 
    32     # First close and delete any object that is closed or closing. 
    33     # Then select writables and readables (MAY BLOCK!) 
    34     # Then post the events. 
    35     def run_one_cycle 
    36       @@log.debug '+' # log not defined at class level, kill this when no longer needed 
    37       @@io_objects.delete_if {|io,obj| 
    38         if obj.close_scheduled? 
    39           io.close 
    40           true 
    41         end 
    42       } 
    43  
    44       readers = @@io_objects.map {|io,obj| obj.select_readable? ? io : nil}.compact 
    45       writers = @@io_objects.map {|io,obj| obj.select_writable? ? io : nil}.compact 
    46  
    47       s = select( readers, writers, nil, SelectTimeout ) 
    48  
    49       s and s[1] and s[1].each {|w| @@io_objects[w].event_write } 
    50       s and s[0] and s[0].each {|r| @@io_objects[r].event_read } 
    51     end 
     238    # sugar over starting a TCP server. 
     239    # INCOMPLETE, will throw a bunch of different socket-library 
     240    # errors (DNS, no-bind, etc) which we ought to wrap and 
     241    # either re-raise, or generate events for. 
     242    # INCOMPLETE, need to similarly sugar creation of Unix-domain sockets. 
     243    # Either a different method, or observe the params: for unix 
     244    # only a filename is needed. 
     245    # Of course we'll also need named pipes and whatever that Windows 
     246    # near-equivalent is called. 
     247    # RETURNS: the newly-created eventable-io object, so the caller 
     248    # can add handlers, etc. 
     249    # 
     250    def self.start_server host, port 
     251      sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) 
     252      sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true ) 
     253      sd.bind( Socket.pack_sockaddr_in( port, host )) 
     254      sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough. 
     255      TcpServerEventableIO.new sd 
     256    end 
     257 
     258    #-- 
     259    # Ruby accept_nonblock is applied on class Socket, 
     260    # but for some unknown reason, TCPServer is not a 
     261    # subclass of Socket. It's a subclass of IO->BasicSocket. 
     262    # So we can't do non-blocking I/O of TCPServers. 
     263    # This is the required idiom for creating a TCP server: 
     264    # sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0) 
     265    # sd.bind( Socket.pack_sockaddr_in( port, server )) 
     266    # sd.listen(5) 
     267    # eio = TcpServerEventableIO.new( sd ) 
     268    # 
     269    # For Unix-domain sockets, the idiom is: 
     270    # sd = Socket.new( Socket::AF_UNIX, Socket::SOCK_STREAM, 0) 
     271    # sd.bind( Socket.pack_sockaddr_un( socketname )) 
     272    # sd.listen(5) 
     273    # eio = TcpServerEventableIO.new( sd ) 
     274    # 
     275    # 
     276    def initialize io 
     277      super 
     278    end 
     279 
     280    def select_writable? 
     281      false 
     282    end 
     283 
     284    def select_readable? 
     285      true 
     286    end 
     287 
     288    #-- 
     289    # accept_nonblock returns an array consisting of the accepted 
     290    # socket and a sockaddr_in which names the peer. 
     291    def read_handler 
     292      begin 
     293        10.times { 
     294          sd = io.accept_nonblock 
     295          send_event( DataEvent.new( :accept, sd[0] )) 
     296        } 
     297      rescue Errno::EWOULDBLOCK, Errno::EAGAIN 
     298      end 
     299    end 
     300 
    52301  end 
    53  
    54  
    55   attr_reader :io 
    56  
    57   # 
    58   # 
    59   # 
    60   def initialize io = nil 
    61     # Set the socket nonblocking. The new Ruby will actually nonblocking APIs. 
    62     m = io.fcntl(Fcntl::F_GETFL, 0) 
    63     io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | m) 
    64     @io = io 
    65     @@io_objects[io] = self 
    66  
    67     @outbound_q = [] 
    68     add_handler(:write) {|evt| 
    69       unless close_scheduled? || close_requested? 
    70         @outbound_q << evt.data 
    71       end 
    72     } 
    73     add_handler(:close_after_writing) { 
    74       close_after_writing 
    75     } 
    76  
    77     log.debug "Pushed EventableIO (#{@@io_objects.size})" 
    78  
    79     # Give the user a chance to initialize some stuff BEFORE 
    80     # sending the initialization event. We DEFINE that behavior 
    81     # so people can depend on it. 
    82     # A handler for :bind can be added in the initialize block, 
    83     # but in single-threaded programs it can probably also be added 
    84     # after this initialize method completes. 
    85     yield self if block_given? 
    86     send_event( Event.new( :bind )) 
    87   end 
    88  
    89   def schedule_close 
    90     send_event( Event.new( :unbind )) 
    91     @close_scheduled = true 
    92   end 
    93  
    94   def close_scheduled? 
    95     @close_scheduled 
    96   end 
    97  
    98   def request_close 
    99     # Will close stream after all outbound data has been written. 
    100     @close_requested = true 
    101   end 
    102  
    103   def close_requested? 
    104     @close_requested 
    105   end 
    106  
    107   def select_readable? 
    108     true 
    109   end 
    110  
    111   def select_writable? 
    112     @outbound_q.empty? ? false : true 
    113   end 
    114  
    115   #-- 
    116   # sugar 
    117   def send_data data 
    118     send_event( DataEvent.new( :write, data )) 
    119   end 
    120  
    121   def send_close_after_writing 
    122     send_event( Event.new( :close_after_writing )) 
    123   end 
    124  
    125   def close 
    126     @outbound_q.clear 
    127     schedule_close 
    128   end 
    129  
    130   def close_after_writing 
    131     if @outbound_q.empty? 
    132       schedule_close 
    133     else 
    134       request_close 
    135     end 
    136   end 
    137  
    138   # Provisional implementation. Will be re-implemented in subclasses. 
    139   # Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006. 
    140   # If we have it, then we can read multiple times safely to improve 
    141   # performance. 
    142   # TODO, coalesce multiple reads into a single event. 
    143   # TODO, do the function check somewhere else and cache it. 
    144   def event_read 
    145     begin 
    146       if io.respond_to?(:read_nonblock) 
    147         10.times { 
    148           r = io.read_nonblock(4096) 
    149           send_event( DataEvent.new( :read, r )) 
    150         } 
    151       else 
    152         r = io.sysread(4096) 
    153         send_event( DataEvent.new( :read, r )) 
    154       end 
    155     rescue Errno::EAGAIN 
    156     rescue EOFError, Errno::ECONNRESET 
    157       schedule_close 
    158     end 
    159   end 
    160  
    161   # Provisional implementation. Will be re-implemented in subclasses. 
    162   # TODO: Complete this implementation. As it stands, this only writes 
    163   # a single packet per cycle. Highly inefficient, but required unless 
    164   # we're running on a Ruby with proper nonblocking I/O (Ruby 1.8.4 
    165   # built from sources from May 25, 2006 or newer). 
    166   # We need to improve the loop so it writes multiple times, however 
    167   # not more than a certain number of bytes per cycle, otherwise 
    168   # one busy connection could hog output buffers and slow down other 
    169   # connections. Also we should coalesce small writes. 
    170   def event_write 
    171     if data = @outbound_q.shift 
    172       begin 
    173         data = data.to_s 
    174  
    175         w = if io.respond_to?(:write_nonblock) 
    176           io.write_nonblock( data ) 
    177         else 
    178           io.syswrite( data ) 
    179         end 
    180  
    181         @outbound_q.unshift( data[w..-1] ) if w < data.length 
    182         schedule_close if (close_requested? and @outbound_q.empty?) 
    183       rescue Errno::EAGAIN 
    184         @outbound_q.unshift data 
    185       rescue EOFError, Errno::ECONNRESET 
    186         schedule_close 
    187       end 
    188     end 
    189   end 
    190  
    191302end 
    192 end 
    193  
    194 ##################################### 
    195  
    196 module Machine 
    197 class TcpConnectEventableIO < EventableIO 
    198  
    199   # We assume we're getting a TCP socket on which 
    200   # connect_nonblock has been called. 
    201   # DO NOT attempt to read the socket. 
    202   # When it selects writable, the connect has completed. 
    203   # 
    204   def initialize *args 
    205     @pending = true 
    206   end 
    207  
    208   def select_writable? 
    209     @pending ? true : super 
    210   end 
    211  
    212   def select_readable? 
    213     @pending ? false : super 
    214   end 
    215  
    216   def event_write 
    217     if @pending 
    218       @pending = false 
    219       send_event Event.new(:connect) 
    220     else 
    221       super 
    222     end 
    223   end 
    224  
    225 end 
    226 end 
    227  
    228  
    229 ##################################### 
    230  
    231 module Machine 
    232 class TcpServerEventableIO < EventableIO 
    233  
    234   #-- 
    235   # sugar over starting a TCP server. 
    236   # INCOMPLETE, will throw a bunch of different socket-library 
    237   # errors (DNS, no-bind, etc) which we ought to wrap and 
    238   # either re-raise, or generate events for. 
    239   # INCOMPLETE, need to similarly sugar creation of Unix-domain sockets. 
    240   # Either a different method, or observe the params: for unix 
    241   # only a filename is needed. 
    242   # Of course we'll also need named pipes and whatever that Windows 
    243   # near-equivalent is called. 
    244   # RETURNS: the newly-created eventable-io object, so the caller 
    245   # can add handlers, etc. 
    246   # 
    247   def self.start_server host, port 
    248     sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) 
    249     sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true ) 
    250     sd.bind( Socket.pack_sockaddr_in( port, host )) 
    251     sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough. 
    252     TcpServerEventableIO.new sd 
    253   end 
    254  
    255   #-- 
    256   # Ruby accept_nonblock is applied on class Socket, 
    257   # but for some unknown reason, TCPServer is not a 
    258   # subclass of Socket. It's a subclass of IO->BasicSocket. 
    259   # So we can't do non-blocking I/O of TCPServers. 
    260   # This is the required idiom for creating a TCP server: 
    261   # sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0) 
    262   # sd.bind( Socket.pack_sockaddr_in( port, server )) 
    263   # sd.listen(5) 
    264   # eio = TcpServerEventableIO.new( sd ) 
    265   # 
    266   # For Unix-domain sockets, the idiom is: 
    267   # sd = Socket.new( Socket::AF_UNIX, Socket::SOCK_STREAM, 0) 
    268   # sd.bind( Socket.pack_sockaddr_un( socketname )) 
    269   # sd.listen(5) 
    270   # eio = TcpServerEventableIO.new( sd ) 
    271   # 
    272   # 
    273   def initialize io 
    274     super 
    275   end 
    276  
    277   def select_writable? 
    278     false 
    279   end 
    280  
    281   def select_readable? 
    282     true 
    283   end 
    284  
    285   #-- 
    286   # accept_nonblock returns an array consisting of the accepted 
    287   # socket and a sockaddr_in which names the peer. 
    288   def event_read 
    289     begin 
    290       10.times { 
    291         sd = io.accept_nonblock 
    292         send_event( DataEvent.new( :accept, sd[0] )) 
    293       } 
    294     rescue Errno::EWOULDBLOCK, Errno::EAGAIN 
    295     end 
    296   end 
    297  
    298 end 
    299 end 
    300  
    301  
    302  
     303 
  • experiments/EventMachine/lib/machine/event.rb

    r159 r166  
    55  require 'thread' 
    66 
    7   class Event 
     7  # TODO: Extend Struct so it supports default argument values. 
     8  # TODO: Make it so no arguments are necessary to create events where the type 
     9  # in itself is enough, without fields. 
     10  class EventType < Struct; end 
     11 
     12  class Handler 
    813    include Base 
    914 
    10     attr_reader :type 
    11  
    12     def initialize(type) 
    13       @type = type 
     15    def initialize(dispatcher, *args, &block) 
     16      @dispatcher = dispatcher 
    1417    end 
    1518  end 
    1619 
    17   module EventDispatcher 
     20  class EventDispatcher 
    1821    include Base 
    1922 
    20     # Wrap the including classes initialize method so we are automatically 
    21     # initialized without the includer having to call super. 
    22     def self.included(other) 
    23       install_initialize = lambda do 
    24         other.class_eval do 
    25           alias :old_initialize :initialize 
    26  
    27           def initialize(*args, &block) 
    28             intercept_initialize unless @intercept_initialized 
    29             old_initialize(*args, &block) 
    30           end 
    31         end 
    32       end 
    33  
    34       class << other; self; end.class_eval do 
    35         alias :old_method_added :method_added 
    36  
    37         ignore = false 
    38         define_method(:method_added) do |name| 
    39           return if ignore 
    40  
    41           case name 
    42           when :initialize then 
    43             Thread.exclusive do 
    44               ignore = true 
    45               install_initialize.call 
    46               ignore = false 
    47             end 
    48           else 
    49             old_method_added(name) 
    50           end 
    51         end 
    52       end 
    53  
    54       install_initialize.call 
    55       super 
    56     end 
    57  
    58     # Called before the initialize method of the including class. 
    59     def intercept_initialize 
    60       @intercept_initialized = true 
     23    def initialize(*args, &block) 
    6124      @handlers = Hash.new do |hash, key|  
    6225        hash[key] = Hash.new {|h,k| h[k] = []}  
     
    6831    end 
    6932 
    70     unless respond_to? :initialize 
    71       def initialize 
    72       end 
    73     end 
    74  
    7533    # Add a new handler or chain of handlers for the given event type. 
    7634    # TODO: error checking... 
    7735    def add_handler(type, target=nil, handler=nil, &block) 
    78       raise "initialization failed- did you call super in your constructor?" unless @handlers 
    79  
    8036      @handlers[type][target] << handler if (target and handler) 
    8137 
    8238      if block 
    8339        target = eval('self', block.binding) 
     40        log.debug "add_handler - type: #{type} target: #{target}" 
    8441         
    8542        @handlers[type][target] << block  
     
    9451    # Send an event to all the handlers for that type. 
    9552    def send_event(event) 
    96       log.debug "\tsend_event: #{event.type}" 
     53      log.debug "\tsend_event: #{event.class}" 
    9754      @event_q << event 
    9855    end 
     
    10158    def iterate 
    10259      while event = @event_q.shift do  
    103         log.debug "running handlers for event:  #{event.type}" 
     60        log.debug "running handlers for event:  #{event.class}" 
    10461 
    10562        # Call handlers stored by event type 
    106         @handlers[event.type].each do |target, handlers|  
     63        @handlers[event.class].each do |target, handlers|  
    10764          log.debug "target: #{target.class} with #{handlers.size} handlers" 
    10865 
  • experiments/EventMachine/lib/machine/protocol.rb

    r159 r166  
    22  class Protocol 
    33    include Base 
    4     include EventDispatcher 
    54 
    6     # Have to have this for now to guarantee EventDispatcher initializes 
    7     def initialize 
     5    def initialize(dispatcher, *args, &block) 
     6      @dispatcher = dispatcher 
    87    end 
    98  end 
    109 
    1110  require 'machine/protocols/line' 
    12   require 'machine/protocols/netstring
     11  require 'machine/protocols/http
    1312end 
  • experiments/EventMachine/lib/machine/protocols/line.rb

    r159 r166  
    11module Machine 
    2   class LineHandler < Protocol 
    3     def initialize 
     2   
     3  # Sends: 
     4  #   SendRawDataEvent 
     5  RecvLineEvent = Struct.new :line 
     6 
     7  # Receives: 
     8  #   RecvRawDataEvent 
     9  SendLineEvent = Struct.new :line 
     10 
     11  # Basic line handler that breaks up raw input based on a delimiter. 
     12  class LineHandler < Handler 
     13    LINE_DELIMITER = "/r/n" 
     14    LINE_DELIMITER_RE = /\r?\n/m 
     15 
     16    def initialize(dispatcher, delimeter=nil, delimeter_re=nil) 
    417      super 
    518 
     19      @delimiter = delimeter || LINE_DELIMITER  
     20      @delimiter_re = delimiter_re || LINE_DELIMITER_RE 
     21 
    622      @linebuffer = "" 
     23 
     24      @dispatcher.add_handler(:recv_raw_data, self, :receive_raw_data) 
     25      @dispatcher.add_handler(:send_line_event, self, :send_line) 
    726    end 
    827 
     
    2039    end 
    2140 
    22     def send_line line  
     41    def send_line event 
     42      sender = proc {|line| @dispatcher.send_event(SendRawData.new(line + LINE_DELIMITER)) } 
    2343 
     44      # Send an array of lines if passed an array... 
     45      # TODO: Maybe this isn't needed? 
     46      if event.line.respond_to? :to_ary 
     47        event.line.to_ary.each(&sender) 
     48      else 
     49        sender.call(line) 
     50      end 
    2451    end 
    25  
    26     def receive_line 
    27  
    28     end 
    29  
    3052  end 
    3153end 
  • experiments/EventMachine/lib/machine/reactor.rb

    r154 r166  
    4949      log.debug "iterating..." 
    5050 
    51       Timeout.fire_timers 
    52       EventSubscriber.route_events 
    53       EventableIO.run_one_cycle 
     51      # Iterate over each of these special dispatchers. 
     52      # TODO: Maybe they should just be treated like every other dispatcher? 
     53      [ 
     54        Timeout,  
     55        EventSubscriber,  
     56        IOHandler 
     57      ].each {|i| i.iterate} 
    5458 
    5559      @dispatchers.each {|d| d.iterate } 
  • experiments/EventMachine/lib/machine/router.rb

    r159 r166  
    44 
    55module Machine 
    6 class EventSubscriber 
     6class EventSubscriber < EventDispatcher 
    77  include Base 
    8   include EventDispatcher 
    98 
    109  @@subscribers = Hash.new {|k,v| k[v] = Queue.new} 
     
    1211 
    1312  @@reactor_signal = Socket::socketpair( Socket::AF_UNIX, Socket::SOCK_STREAM, 0) 
    14   #EventableIO.new(@@reactor_signal[1]) 
    1513  @@reactor_signalled = false 
    1614 
    1715  class << self 
    18     def route_events 
     16    def iterate 
    1917      @@log.debug "R" 
    2018      @@reactor_signalled = false 
  • experiments/EventMachine/lib/machine/signal.rb

    r159 r166  
    11module Machine 
    2   class SignalDispatcher 
     2  #Sends 
     3  SignalEvent = EventType.new :signal 
     4 
     5  class SignalDispatcher < EventDispatcher 
    36    include Base 
    4     include EventDispatcher 
    57     
    68    # This adds all of the currently supported signal types as constants. 
     
    2123 
    2224    def initialize(sig, &block) 
    23       @sig= sig 
     25      super 
    2426 
    25       add_handler(@sig, self, block) if block 
     27      @sig = sig 
     28 
     29      add_handler(SignalEvent, self, block) if block 
    2630      setup_signal 
    2731    end 
    2832 
    2933    def add_signal_handler(src=nil, handler=nil, &block) 
    30       add_handler(@sig, src, handler, &block) 
     34      add_handler(SignalEvent, src, handler, &block) 
    3135    end 
    3236 
    3337    def remove_signal_handler(src=nil) 
    34       remove_handler(@sig, src) 
     38      remove_handler(SignalEvent, src) 
    3539    end 
    3640 
     
    4246      Signal.trap(@sig) do 
    4347        @@signal_handlers[@sig].each do |sig| 
    44           sig.send_event(Event.new(@sig)) 
     48          sig.send_event(SignalEvent.new(@sig)) 
    4549        end 
    4650      end 
  • experiments/EventMachine/lib/machine/timeout.rb

    r159 r166  
    11module Machine 
    2   class Timeout 
     2  # Sends 
     3  # TODO: Take away the foo arg... This is because struct requires an arg. 
     4  TimeoutExpiredEvent = EventType.new :foo 
     5 
     6  class Timeout < EventDispatcher 
    37    include Base 
    4     include EventDispatcher 
    58 
    69    @@timers = [] 
     
    2023 
    2124      # Call all of the timer handlers that have expired. 
    22       def fire_timers 
     25      def iterate 
    2326        @@log.debug "Firing timers..." 
    2427         
     
    2730        while(not @@timers.empty? and @@timers.first.expiration <= now) 
    2831          timer = @@timers.shift 
    29           timer.send_event(Event.new(:timer_expired)) if timer.alive? 
     32          timer.send_event(TimeoutExpiredEvent.new) if timer.alive? 
    3033        end 
    3134      end 
     
    3639 
    3740    def initialize(delay, periodic = false, &block) 
     41      super 
     42 
    3843      @delay = delay 
    3944      @periodic = periodic 
     
    4954       
    5055      # An an event handler to re-register if its periodic 
    51       add_handler(:timer_expired, self, :register_timer) if @periodic 
     56      add_handler(TimeoutExpiredEvent, self, :register_timer) if @periodic 
    5257       
    5358      register_timer 
     
    5560 
    5661    def add_expiration_handler(src=nil, handler=nil, &block) 
    57       add_handler(:timer_expired, src, handler, &block) 
     62      add_handler(TimeoutExpiredEvent, src, handler, &block) 
    5863    end 
    5964 
    6065    def remove_expiration_handler(src=nil) 
    61       remove_handler(:timer_expired, src) 
     66      remove_handler(TimeoutExpiredEvent, src) 
    6267    end 
    6368 
  • experiments/EventMachine/test/event_test.rb

    r159 r166  
    77  include Machine 
    88 
    9   class TestEvent < Event 
    10     attr_accessor :value 
    11  
    12     def initialize(value) 
    13       super(:test_type) 
    14  
    15       @value = value 
    16     end 
    17   end 
    18  
    19   class TestDispatcher 
    20     include Machine::EventDispatcher 
    21  
    22     def initialize 
    23     end 
    24   end 
     9  TestEvent = EventType.new :value 
    2510 
    2611  def setup 
     
    2813    @reactor.quiet 
    2914 
    30     @dispatcher = TestDispatcher.new 
     15    @dispatcher = EventDispatcher.new 
    3116  end 
    3217 
     
    3722  def test_add_handler 
    3823    called = false 
    39     @dispatcher.add_handler(:test_type) { called = true } 
     24    @dispatcher.add_handler(TestEvent) { called = true } 
    4025 
    41     @dispatcher.add_handler(:test_type, self, [:stage_one, :stage_two]) 
     26    @dispatcher.add_handler(TestEvent, self, [:stage_one, :stage_two]) 
    4227 
    4328    event = TestEvent.new(1) 
     
    5338    called = false 
    5439 
    55     @dispatcher.add_handler(:test_type) { called = true } 
    56     @dispatcher.remove_handler(:test_type, self) 
     40    @dispatcher.add_handler(TestEvent) { called = true } 
     41    @dispatcher.remove_handler(TestEvent, self) 
    5742 
    5843    event = TestEvent.new(1) 
     
    6651  def test_error_handling 
    6752    assert_raise(Machine::BadHandler) do  
    68       @dispatcher.add_handler(:test_type, self, :bad_method_name) 
     53      @dispatcher.add_handler(TestEvent, self, :bad_method_name) 
    6954      event = TestEvent.new(1) 
    7055      @dispatcher.send_event(event)