Ticket #87: scheduler.rb

File scheduler.rb, 3.7 kB (added by mebaran, 8 months ago)

Implemenation of an eventmachine based scheduler.

Line 
1 require 'thread'
2
3 module EventMachine
4   def self::schedule_operation(operation, callback=nil, errback=nil, opts={})
5     self::scheduler.put(operation, callback, errback, opts)
6   end
7   def self::scheduler
8     EM::start_scheduler unless @scheduler_started
9     @operation_scheduler
10   end
11   def self::start_scheduler
12     @operation_scheduler = OperationScheduler.new
13     @resultqueue = Queue.new
14     @scheduler_started = true
15   end
16   def self::resultqueue
17     @resultqueue
18   end
19  
20   class OperationScheduler
21     attr_reader :threadcount, :maintain
22     attr_accessor :default_thread_options, :default_exception_handler
23    
24     def initialize
25       @threadcount, @maintain, @ko = 20, 2, 5
26       @threadpool, @tasks = ThreadGroup.new, Queue.new
27       @eid, @tid = 0, 0
28       @default_exception_handler = lambda { |e| raise e }
29       @default_thread_options = {}
30       add_threads(@threadcount)
31       @maintainer = EM.add_periodic_timer(@maintain){ self.maintain! } if @maintain.to_i > 0
32     end
33    
34     def threadpool_size=(n)
35       adjust_threadpool_size(n - @threadcount)
36     end
37     def threadpool_size
38       @threadpool.list.size
39     end
40    
41     def threads
42       active, inactive = [], []
43       @threadpool.list.each do |th|
44         th[:active] ? active << th : inactive << th
45       end
46       [active, inactive]
47     end
48    
49     def maintain!
50       active = self.threads[0].select{|t| t.alive? }.length
51       add_threads(@threadcount - active)
52     end
53    
54     def maintenance_interval=(i)
55       EM.cancel_timer(@maintainer)
56       @maintain = i
57       @maintainer = EM.add_periodic_timer(@maintain) { self.maintain! }
58     end
59    
60     def put(operation, callback=nil, errback=nil, opts={})
61       eid = new_eid
62       @tasks << [eid, operation, callback, errback, opts.merge!(@default_thread_options)]
63       return eid
64     end
65    
66     def get
67       @tasks.pop
68     end
69    
70     private
71     def new_tid
72       @tid += 1
73     end
74     def new_eid
75       @eid += 1
76     end
77    
78     def add_threads(n=1, force=false)
79       n.times do
80         tid = new_tid
81         t = Thread.new do
82           Thread.current[:tid] = tid
83           Thread.current[:active] = true
84           Thread.abort_on_exception = true
85           proceed while Thread.current[:active]
86         end
87         @threadpool.add(t)
88       end
89     end
90    
91     def remove_threads(n=1, force=false)
92       dead = @threadpool.list[0, n]
93       dead.each { |d| d[:active] = false }
94       EM.add_timer(@ko){ dead.each{ |d| d.kill if d.alive? } } if @ko
95     end
96    
97     def adjust_threadpool_size(n)
98       n > 0 ? add_threads(n) : remove_threads(n.abs)
99       @threadcount = @threadcount + n
100     end
101
102     def proceed
103       eid, op, cb, eb, opts = self.get
104       runner = Thread.current
105       runner[:executing] = eid
106       if opts[:timeout]
107         timeout = EM.add_timer(opts[:timeout]) do
108           if runner[:executing] == eid
109             runner.raise(Overtime.new("Exceeded #{opts[:timeout]}"))
110           end           
111         end
112       end
113       begin
114         result = op.call
115         runner[:executing] = nil
116         result = Array(result) << eid if opts[:eid]
117         result = Array(result) << opts[:carry] if opts[:carry]
118         conclusion = (cb ? [result, cb] : nil)
119       rescue Exception => e
120         runner[:executing] = nil
121         exception = e
122         exception = Array(exception) << eid if opts[:eid]
123         exception = Array(exception) << opts[:carry] if opts[:carry]
124         conclusion = (eb ? [exception, eb] : [exception, @default_exception_handler])
125       ensure
126         (EM.cancel_timer(timeout) if opts[:timeout]) rescue nil
127         conclude conclusion
128       end
129     end
130    
131     def conclude(c)
132       EM::resultqueue << c if c
133       EM.signal_loopbreak
134     end
135   end
136  
137   class Canceled < Exception; end
138   class Overtime < Exception; end
139 end