Changeset 611

Show
Ignore:
Timestamp:
12/06/07 07:31:39 (1 year ago)
Author:
blackhedd
Message:

added heartbeat and inactivity timers to pure Ruby

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • version_0/lib/pr_eventmachine.rb

    r610 r611  
    194194    def set_comm_inactivity_timeout sig, tm 
    195195      r = Reactor.instance.get_selectable( sig ) or raise "unknown set_comm_inactivity_timeout target" 
    196       p "?????????????" # TODO, IMPLEMENT THIS. 
     196      r.set_inactivity_timeout tm 
    197197    end 
    198198  end 
     
    248248 
    249249  TimerFired = 100 
    250        ConnectionData = 101 
    251        ConnectionUnbound = 102 
    252        ConnectionAccepted = 103 
    253        ConnectionCompleted = 104 
    254        LoopbreakSignalled = 105 
     250  ConnectionData = 101 
     251  ConnectionUnbound = 102 
     252  ConnectionAccepted = 103 
     253  ConnectionCompleted = 104 
     254  LoopbreakSignalled = 105 
    255255 
    256256end 
     
    261261class Reactor 
    262262  include Singleton 
     263 
     264  HeartbeatInterval = 2 
     265 
     266  attr_reader :current_loop_time 
    263267 
    264268  def initialize 
     
    284288    @timers = SortedSet.new # [] 
    285289    set_timer_quantum(0.1) 
     290    @current_loop_time = Time.now 
     291    @next_heartbeat = @current_loop_time + HeartbeatInterval 
    286292  end 
    287293 
     
    302308 
    303309      loop { 
     310        @current_loop_time = Time.now 
     311 
    304312        break if @stop_scheduled 
    305313        run_timers 
    306314        break if @stop_scheduled 
    307315        crank_selectables 
     316        break if @stop_scheduled 
     317        run_heartbeats 
    308318      } 
    309319    ensure 
     
    318328 
    319329  def run_timers 
    320     now = Time.now 
    321330    @timers.each {|t| 
    322       if t.first <= now 
     331      if t.first <= @current_loop_time 
    323332        @timers.delete t 
    324333        EventMachine::event_callback "", TimerFired, t.last 
     
    331340    #  EventMachine::event_callback "", TimerFired, t.last 
    332341    #end 
     342  end 
     343 
     344  def run_heartbeats 
     345    if @next_heartbeat <= @current_loop_time 
     346      @next_heartbeat = @current_loop_time + HeartbeatInterval 
     347      @selectables.each {|k,io| io.heartbeat} 
     348    end 
    333349  end 
    334350 
     
    397413  def_delegator :@my_selectable, :send_datagram 
    398414  def_delegator :@my_selectable, :get_outbound_data_size 
     415  def_delegator :@my_selectable, :set_inactivity_timeout 
     416  def_delegator :@my_selectable, :heartbeat 
    399417end 
    400418 
     
    409427      @uuid = UuidGenerator.generate 
    410428      @io = io 
     429      @last_activity = Reactor.instance.current_loop_time 
    411430 
    412431      m = @io.fcntl(Fcntl::F_GETFL, 0) 
     
    437456    end 
    438457 
     458    def set_inactivity_timeout tm 
     459      @inactivity_timeout = tm 
     460    end 
     461 
     462    def heartbeat 
     463    end 
    439464  end 
    440465 
     
    477502    # If we have it, then we can read multiple times safely to improve 
    478503    # performance. 
     504    # The last-activity clock ASSUMES that we only come here when we 
     505    # have selected readable. 
    479506    # TODO, coalesce multiple reads into a single event. 
    480507    # TODO, do the function check somewhere else and cache it. 
    481508    def eventable_read 
     509      @last_activity = Reactor.instance.current_loop_time 
    482510      begin 
    483511        if io.respond_to?(:read_nonblock) 
     
    509537    # connections. Also we should coalesce small writes. 
    510538    # URGENT TODO: Coalesce small writes. They are a performance killer. 
     539    # The last-activity recorder ASSUMES we'll only come here if we've 
     540    # selected writable. 
    511541    def eventable_write 
    512542      # coalesce the outbound array here, perhaps 
     543      @last_activity = Reactor.instance.current_loop_time 
    513544      while data = @outbound_q.shift do 
    514545        begin 
     
    565596    end 
    566597 
     598    def heartbeat 
     599      if @inactivity_timeout and (@last_activity + @inactivity_timeout) < Reactor.instance.current_loop_time 
     600        schedule_close true 
     601      end 
     602    end 
    567603  end 
    568604 
  • version_0/tests/test_pure.rb

    r607 r611  
    113113  end 
    114114 
     115 
     116 
     117  def test_reactor_running 
     118    a = false 
     119    EM.run { 
     120      a = EM.reactor_running? 
     121      EM.next_tick {EM.stop} 
     122    } 
     123    assert a 
     124  end 
     125 
     126 
    115127end