Changeset 200
- Timestamp:
- 06/10/06 16:14:49 (2 years ago)
- Files:
-
- experiments/EventMachine/ChangeLog (modified) (1 diff)
- experiments/EventMachine/lib/machine/event.rb (modified) (3 diffs)
- experiments/EventMachine/lib/machine/io.rb (moved) (moved from experiments/EventMachine/lib/machine/eio.rb)
- experiments/EventMachine/lib/machine/protocol.rb (modified) (1 diff)
- experiments/EventMachine/lib/machine.rb (modified) (1 diff)
- experiments/EventMachine/lib/machine/reactor.rb (modified) (4 diffs)
- experiments/EventMachine/lib/machine/timer.rb (moved) (moved from experiments/EventMachine/lib/machine/timeout.rb) (5 diffs)
- experiments/EventMachine/Rakefile (modified) (1 diff)
- experiments/EventMachine/test/event_test.rb (modified) (1 diff)
- experiments/EventMachine/test/io_test.rb (moved) (moved from experiments/EventMachine/test/eio_test.rb) (2 diffs)
- experiments/EventMachine/test/netstring_test.rb (modified) (2 diffs)
- experiments/EventMachine/test/timer_test.rb (moved) (moved from experiments/EventMachine/test/timeout_test.rb) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
experiments/EventMachine/ChangeLog
r159 r200 1 2006-06-10 rosejn <rosejn@groove> 2 3 * Added Reactor#bootstrap which takes blocks of code to be run once the 4 reactor is started. 5 6 * Now the loop looks at return values from calling iterate on dispatchers to 7 determine whether there are more events pending. This way we don't block on 8 select if we can be doing work instead. 9 1 10 2006-05-27 Jeff Rose <jeff@rosejn.net> 2 11 experiments/EventMachine/lib/machine/event.rb
r166 r200 21 21 include Base 22 22 23 # Run a maximum of this many events per iteration. 24 # TODO: No idea what a sane default is here... 25 DEFAULT_MAX_WORKLOAD = 20 26 23 27 def initialize(*args, &block) 28 # TODO: We might want to do some argument checking here to make this 29 # accessible. Possibly an accessor too for runtime changes... 30 @max_workload = DEFAULT_MAX_WORKLOAD 31 24 32 @handlers = Hash.new do |hash, key| 25 33 hash[key] = Hash.new {|h,k| h[k] = []} … … 51 59 # Send an event to all the handlers for that type. 52 60 def send_event(event) 53 log.debug "\tsend_event: #{event.class}"54 61 @event_q << event 62 log.debug "\tsend_event: #{event.class} (size = #{@event_q.size})" 55 63 end 56 64 57 # Process all of theevents in the run queue.65 # Process events in the run queue. 58 66 def iterate 59 while event = @event_q.shift do 67 workload_count = 0 68 69 while (workload_count < @max_workload) and (event = @event_q.shift) do 60 70 log.debug "running handlers for event: #{event.class}" 71 72 workload_count += 1 61 73 62 74 # Call handlers stored by event type … … 83 95 end 84 96 end 97 98 @event_q.size 85 99 end 86 100 experiments/EventMachine/lib/machine/protocol.rb
r166 r200 10 10 require 'machine/protocols/line' 11 11 require 'machine/protocols/http' 12 require 'machine/protocols/netstring' 12 13 end experiments/EventMachine/lib/machine.rb
r159 r200 27 27 require 'machine/reactor' 28 28 require 'machine/event' 29 require 'machine/time out'30 require 'machine/ eio'29 require 'machine/timer' 30 require 'machine/io' 31 31 require 'machine/signal' 32 32 require 'machine/router' experiments/EventMachine/lib/machine/reactor.rb
r175 r200 11 11 include Singleton 12 12 13 @@bootstrappers = [] 14 13 15 #-- 14 16 # Convenience method 15 # Takes a block that is executed off a zero-length timer.17 # Takes a block that is executed at start time. 16 18 # That guarantees that the reactor is running when the block executes. 17 19 def self.run &block 18 Timeout.new(0) {yield} if block_given?20 bootstrap(&block) if block 19 21 instance.run 22 end 23 24 #-- 25 # Executes the given block once the reactor is running. 26 def self.bootstrap &block 27 @@bootstrappers << block 20 28 end 21 29 … … 32 40 end 33 41 34 # Run the main loop until stop is called.35 # TODO: exception handling36 def run37 log.debug "Reactor running..."38 @running = true39 42 40 while @running 41 iterate 42 43 # If the reactor was stopped while dispatching events we don't want to 44 # block again. 45 if @running 46 log.debug("run loop sleeping for #{Timeout.next} seconds...") 47 48 # TODO: Figure out the best behavior here... 49 IO.select(nil, nil, nil, Timeout.next || 0.1) 50 end 43 # TODO: What do you think about this style? Seems sane to contain the check 44 # for an exception within the exception class itself, right? 45 class BadDispatcher < Exception 46 def self.check?(dispatcher) 47 throw self.new unless dispatcher.respond_to?(:iterate) 51 48 end 52 49 end 53 50 54 def iterate55 log.debug "iterating..."56 57 # Iterate over each of these special dispatchers.58 # TODO: Maybe they should just be treated like every other dispatcher?59 [60 Timeout,61 EventSubscriber,62 IOHandler63 ].each {|i| i.iterate}64 65 @dispatchers.each {|d| d.iterate }66 end67 68 51 def add_dispatcher dispatcher 52 BadDispatcher.check?(dispatcher) 69 53 @dispatchers << dispatcher 70 54 end … … 74 58 end 75 59 76 def clear_dispatchers77 @dispatchers.clear60 def reset 61 clear_dispatchers 78 62 end 79 63 … … 88 72 end 89 73 74 # Stop running immediately, without handling the remaining events. 75 def kill 76 77 end 78 90 79 # Query the current run state of the reactor. 91 80 def running? 92 81 @running 93 82 end 83 84 # Run the main loop until stop is called. 85 # TODO: exception handling 86 def run 87 log.debug "Reactor running..." 88 @running = true 89 90 while @running 91 # Fire off the bootstrap blocks and then clear them. 92 # TODO: Should they be cleared, or is it good to have this code run 93 # every time you run? 94 @@bootstrappers.each {|block| block.call } 95 @@bootstrappers.clear 96 97 work_left = iterate 98 99 # If the reactor was stopped while dispatching events we don't want to 100 # block again. 101 if @running 102 log.debug("run loop sleeping for #{Timer.next} seconds...") 103 104 # If there are still events to process we don't want to block on 105 # select here. 106 if work_left > 0 107 timeout = 0 108 else 109 timeout = Timer.next || 0.1 110 end 111 112 # TODO: Figure out the best behavior here... 113 IO.select(nil, nil, nil, timeout) 114 end 115 end 116 end 117 118 # Have each dispatcher run a workload. This is public to make unit testing 119 # easier, but you shouldn't need to call this by hand. 120 def iterate 121 log.debug "iterating..." 122 123 # Iterate over each of these special dispatchers. 124 # TODO: Maybe they should just be treated like every other dispatcher? 125 [ 126 Timer, 127 EventSubscriber, 128 IOHandler 129 ].each {|i| i.iterate} 130 131 @dispatchers.inject(0) {|mem, d| mem += d.iterate } 132 end 133 134 private 135 136 def clear_dispatchers 137 @dispatchers.clear 138 end 139 94 140 end 95 141 end experiments/EventMachine/lib/machine/timer.rb
r166 r200 2 2 # Sends 3 3 # TODO: Take away the foo arg... This is because struct requires an arg. 4 Time outExpiredEvent = EventType.new :foo4 TimerEvent = EventType.new :foo 5 5 6 class Time out< EventDispatcher6 class Timer < EventDispatcher 7 7 include Base 8 8 … … 23 23 24 24 # Call all of the timer handlers that have expired. 25 # TODO: Should these handlers run immediately, or added to the event_q 26 # like they are now? For better accuracy maybe it would be better to 27 # just run them now, or call iterate on all the timers inside this 28 # iterate? 25 29 def iterate 26 30 @@log.debug "Firing timers..." … … 30 34 while(not @@timers.empty? and @@timers.first.expiration <= now) 31 35 timer = @@timers.shift 32 timer.send_event(Time outExpiredEvent.new) if timer.alive?36 timer.send_event(TimerEvent.new) if timer.alive? 33 37 end 34 38 end … … 47 51 @called = false 48 52 49 if @periodic50 add_expiration_handler {|event| register_timer; block.call(event) }51 else52 add_expiration_handler(&block) if block53 end54 55 # An an event handler to re-register if its periodic56 add_handler(TimeoutExpiredEvent, self, :register_timer) if @periodic57 58 53 register_timer 54 55 add_timer_handler(self, :register_timer) if @periodic 56 add_timer_handler(self, &block) 59 57 end 60 58 61 def add_ expiration_handler(src=nil, handler=nil, &block)62 add_handler(Time outExpiredEvent, src, handler, &block)59 def add_timer_handler(src=nil, handler=nil, &block) 60 add_handler(TimerEvent, src, handler, &block) 63 61 end 64 62 65 def remove_ expiration_handler(src=nil)66 remove_handler(Time outExpiredEvent, src)63 def remove_timer_handler(src=nil) 64 remove_handler(TimerEvent, src) 67 65 end 68 66 … … 94 92 @@timers.push self 95 93 @@timers.sort! {|a,b| a.expiration <=> b.expiration } 96 log.debug "Pushed timer (#{@delay})... #{@@timers.size}" 97 #require 'rubygems' 98 #require 'breakpoint' 99 #breakpoint 94 log.debug "register_timer (#{@delay})... #{@@timers.size}" 100 95 end 101 96 end experiments/EventMachine/Rakefile
r125 r200 60 60 lines = 0 61 61 codelines = 0 62 Dir.glob("lib/** ") { |file_name|62 Dir.glob("lib/**/*") { |file_name| 63 63 next unless file_name =~ /.*rb/ 64 64 experiments/EventMachine/test/event_test.rb
r166 r200 17 17 18 18 def teardown 19 @reactor. clear_dispatchers19 @reactor.reset 20 20 end 21 21 experiments/EventMachine/test/io_test.rb
r179 r200 9 9 10 10 11 class EioTests < Test::Unit::TestCase11 class IoTests < Test::Unit::TestCase 12 12 include Machine 13 13 … … 22 22 def test_tcp_server 23 23 Reactor.run { 24 Time out.new(3) {Reactor.stop}24 Timer.new(3) {Reactor.stop} 25 25 } 26 26 end experiments/EventMachine/test/netstring_test.rb
r159 r200 22 22 pout = StringIO.new 23 23 str = "abcdefg" 24 NetString.write(p out, str)25 assert_equal str, NetString.read(pin)24 NetString.write(pin, str) 25 assert_equal str, try_read(pin.string) 26 26 27 27 try_read("0:,") … … 29 29 try_read("1:a,", 2) 30 30 31 assert_ exception(EOFError) {31 assert_raise(EOFError) { 32 32 try_read("") 33 33 } experiments/EventMachine/test/timer_test.rb
r159 r200 4 4 require 'machine' 5 5 6 class TestTime out< Test::Unit::TestCase6 class TestTimer < Test::Unit::TestCase 7 7 include Machine 8 8 … … 13 13 14 14 def teardown 15 Time out.clear15 Timer.clear 16 16 end 17 17 … … 19 19 # Regular 20 20 t1 = 0 21 Time out.new(0) { t1 += 1 }21 Timer.new(0) { t1 += 1 } 22 22 23 23 # Periodic 24 24 t2 = 0 25 Time out.new(1, true) { t2 += 1 }25 Timer.new(1, true) { t2 += 1 } 26 26 27 27 # Method based 28 28 @t3 = 0 29 timer = Time out.new(3)30 timer.add_ expiration_handler(self, :stop_reactor)29 timer = Timer.new(3) 30 timer.add_timer_handler(self, :stop_reactor) 31 31 32 32 @reactor.run 33 33 assert_equal 1, t1, "Regular block timer did not fire." 34 assert_equal 3, t2, "Periodic block timer did not fire correctly." 34 35 # TODO: This off by 1 thing with the periodic timer annoys me... I'm not sure 36 # this be less than 3. 37 assert t2 < 3, "Periodic block timer did not fire correctly." 35 38 assert_equal 1, @t3, "Method based timer did not fire." 36 39 end … … 44 47 called = 0 45 48 46 timer = Time out.new(0, true) { called += 1 }49 timer = Timer.new(0, true) { called += 1 } 47 50 timer.cancel 48 Time out.new(1) do51 Timer.new(1) do 49 52 @reactor.stop 50 53 end