def run
@lock.synchronize do
@running += 1
raise StoppedError if @stopped
end
while task = @work.pop
case task
when :start
@adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
when Container
r, w = [@wake[0]], []
@lock.synchronize do
@selectable.each do |s|
r << s if s.send :can_read?
w << s if s.send :can_write?
end
end
r, w = IO.select(r, w)
selected = Set.new(r).merge(w)
drain_wake if selected.delete?(@wake[0])
stop_select = nil
@lock.synchronize do
if stop_select = @stopped
selected += @selectable
selected.each { |s| s.close @stop_err }
@wake.each { |fd| fd.close() }
end
@selectable -= selected
end
selected.each { |s| @work << s }
@work << self unless stop_select
when ConnectionTask then
task.process
rearm task
when ListenTask then
io, opts = task.process
add(connection_driver(io, opts, true)) if io
rearm task
end
end
ensure
@lock.synchronize do
if (@running -= 1) > 0
work_wake nil
else
@adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop
end
end
end