Revert "FIX: Redis fallback handler refactoring (#8771)" (#8776)

This reverts commit 4f677854d3df2555ddeaa1d02c228960d33e0248.
This commit is contained in:
Krzysztof Kotlarek
2020-01-24 09:20:17 +11:00
committed by GitHub
parent 8eb2147f1f
commit 8cc09fc668
4 changed files with 235 additions and 720 deletions

View File

@ -1,21 +0,0 @@
# frozen_string_literal: true
# This is the 'actually concurrent' counterpart to
# Concurrency::Scenario::Execution from spec/support/concurrency.rb
module Concurrency
class ThreadedExecution
def new_mutex
Mutex.new
end
def sleep(delay)
super(delay)
nil
end
def spawn(&blk)
Thread.new(&blk)
nil
end
end
end

View File

@ -3,248 +3,139 @@
# #
# A wrapper around redis that namespaces keys with the current site id # A wrapper around redis that namespaces keys with the current site id
# #
require_dependency 'cache'
require_dependency 'concurrency'
class DiscourseRedis class DiscourseRedis
class RedisStatus class FallbackHandler
include Singleton
MASTER_ROLE_STATUS = "role:master".freeze MASTER_ROLE_STATUS = "role:master".freeze
MASTER_LOADING_STATUS = "loading:1".freeze
MASTER_LOADED_STATUS = "loading:0".freeze MASTER_LOADED_STATUS = "loading:0".freeze
CONNECTION_TYPES = %w{normal pubsub}.each(&:freeze) CONNECTION_TYPES = %w{normal pubsub}.each(&:freeze)
def initialize(master_config, slave_config) def initialize
master_config = master_config.dup.freeze unless master_config.frozen? @master = true
slave_config = slave_config.dup.freeze unless slave_config.frozen? @running = false
@mutex = Mutex.new
@master_config = master_config @slave_config = DiscourseRedis.slave_config
@slave_config = slave_config @message_bus_keepalive_interval = MessageBus.keepalive_interval
end end
def master_alive? def verify_master
master_client = connect(@master_config) synchronize do
return if @thread && @thread.alive?
@thread = Thread.new do
loop do
begin begin
info = master_client.call([:info]) thread = Thread.new { initiate_fallback_to_master }
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex thread.join
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known" break if synchronize { @master }
warn "Master not alive, error connecting" sleep 5
return false
ensure ensure
master_client.disconnect thread.kill
end
end
end
end
end end
unless info.include?(MASTER_LOADED_STATUS) def initiate_fallback_to_master
warn "Master not alive, status is loading" success = false
return false
end
unless info.include?(MASTER_ROLE_STATUS)
warn "Master not alive, role != master"
return false
end
true
end
def fallback
warn "Killing connections to slave..."
slave_client = connect(@slave_config)
begin begin
redis_config = DiscourseRedis.config.dup
redis_config.delete(:connector)
master_client = ::Redis::Client.new(redis_config)
logger.warn "#{log_prefix}: Checking connection to master server..."
info = master_client.call([:info])
if info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS)
begin
logger.warn "#{log_prefix}: Master server is active, killing all connections to slave..."
self.master = true
slave_client = ::Redis::Client.new(@slave_config)
CONNECTION_TYPES.each do |connection_type| CONNECTION_TYPES.each do |connection_type|
slave_client.call([:client, [:kill, 'type', connection_type]]) slave_client.call([:client, [:kill, 'type', connection_type]])
end end
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known" MessageBus.keepalive_interval = @message_bus_keepalive_interval
warn "Attempted a redis fallback, but connection to slave failed" Discourse.clear_readonly!
Discourse.request_refresh!
success = true
ensure ensure
slave_client.disconnect slave_client&.disconnect
end
end
rescue => e
logger.warn "#{log_prefix}: Connection to Master server failed with '#{e.message}'"
ensure
master_client&.disconnect
end
success
end
def master
synchronize { @master }
end
def master=(args)
synchronize do
@master = args
# Disables MessageBus keepalive when Redis is in readonly mode
MessageBus.keepalive_interval = 0 if !@master
end end
end end
private private
def connect(config) def synchronize
config = config.dup @mutex.synchronize { yield }
config.delete(:connector) end
::Redis::Client.new(config)
def logger
Rails.logger
end end
def log_prefix def log_prefix
@log_prefix ||= begin "#{self.class}"
master_string = "#{@master_config[:host]}:#{@master_config[:port]}"
slave_string = "#{@slave_config[:host]}:#{@slave_config[:port]}"
"RedisStatus master=#{master_string} slave=#{slave_string}"
end
end
def warn(message)
Rails.logger.warn "#{log_prefix}: #{message}"
end
end
class FallbackHandler
def initialize(log_prefix, redis_status, execution)
@log_prefix = log_prefix
@redis_status = redis_status
@mutex = execution.new_mutex
@execution = execution
@master = true
@event_handlers = []
end
def add_callbacks(handler)
@mutex.synchronize do
@event_handlers << handler
end
end
def start_reset
@mutex.synchronize do
if @master
@master = false
trigger(:down)
true
else
false
end
end
end
def use_master?
master = @mutex.synchronize { @master }
if !master
false
elsif safe_master_alive?
true
else
if start_reset
@execution.spawn do
loop do
@execution.sleep 5
info "Checking connection to master"
if safe_master_alive?
@mutex.synchronize do
@master = true
@redis_status.fallback
trigger(:up)
end
break
end
end
end
end
false
end
end
private
attr_reader :log_prefix
def trigger(event)
@event_handlers.each do |handler|
begin
handler.public_send(event)
rescue Exception => e
Discourse.warn_exception(e, message: "Error running FallbackHandler callback")
end
end
end
def info(message)
Rails.logger.info "#{log_prefix}: #{message}"
end
def safe_master_alive?
begin
@redis_status.master_alive?
rescue Exception => e
Discourse.warn_exception(e, message: "Error running master_alive?")
false
end
end
end
class MessageBusFallbackCallbacks
def down
@keepalive_interval, MessageBus.keepalive_interval =
MessageBus.keepalive_interval, 0
end
def up
MessageBus.keepalive_interval = @keepalive_interval
end
end
class MainRedisReadOnlyCallbacks
def down
end
def up
Discourse.clear_readonly!
Discourse.request_refresh!
end
end
class FallbackHandlers
include Singleton
def initialize
@mutex = Mutex.new
@fallback_handlers = {}
end
def handler_for(config)
config = config.dup.freeze unless config.frozen?
@mutex.synchronize do
@fallback_handlers[[config[:host], config[:port]]] ||= begin
log_prefix = "FallbackHandler #{config[:host]}:#{config[:port]}"
slave_config = DiscourseRedis.slave_config(config)
redis_status = RedisStatus.new(config, slave_config)
handler =
FallbackHandler.new(
log_prefix,
redis_status,
Concurrency::ThreadedExecution.new
)
if config == GlobalSetting.redis_config
handler.add_callbacks(MainRedisReadOnlyCallbacks.new)
end
if config == GlobalSetting.message_bus_redis_config
handler.add_callbacks(MessageBusFallbackCallbacks.new)
end
handler
end
end
end
def self.handler_for(config)
instance.handler_for(config)
end end
end end
class Connector < Redis::Client::Connector class Connector < Redis::Client::Connector
def initialize(options) def initialize(options)
options = options.dup.freeze unless options.frozen?
super(options) super(options)
@slave_options = DiscourseRedis.slave_config(options).freeze @slave_options = DiscourseRedis.slave_config(options)
@fallback_handler = DiscourseRedis::FallbackHandlers.handler_for(options) @fallback_handler = DiscourseRedis::FallbackHandler.instance
end end
def resolve def resolve(client = nil)
if @fallback_handler.use_master? if !@fallback_handler.master
@options @fallback_handler.verify_master
else return @slave_options
@slave_options end
begin
options = @options.dup
options.delete(:connector)
client ||= Redis::Client.new(options)
loading = client.call([:info, :persistence]).include?(
DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS
)
loading ? @slave_options : @options
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
@fallback_handler.master = false
@fallback_handler.verify_master
raise ex
ensure
client.disconnect
end end
end end
end end
@ -268,6 +159,10 @@ class DiscourseRedis
@namespace = namespace @namespace = namespace
end end
def self.fallback_handler
@fallback_handler ||= DiscourseRedis::FallbackHandler.instance
end
def without_namespace def without_namespace
# Only use this if you want to store and fetch data that's shared between sites # Only use this if you want to store and fetch data that's shared between sites
@redis @redis
@ -281,6 +176,7 @@ class DiscourseRedis
STDERR.puts "WARN: Redis is in a readonly state. Performed a noop" STDERR.puts "WARN: Redis is in a readonly state. Performed a noop"
end end
fallback_handler.verify_master if !fallback_handler.master
Discourse.received_redis_readonly! Discourse.received_redis_readonly!
nil nil
else else
@ -406,4 +302,5 @@ class DiscourseRedis
def remove_namespace(key) def remove_namespace(key)
key[(namespace.length + 1)..-1] key[(namespace.length + 1)..-1]
end end
end end

View File

@ -3,84 +3,20 @@
require 'rails_helper' require 'rails_helper'
describe DiscourseRedis do describe DiscourseRedis do
before do
DiscourseRedis::FallbackHandlers.instance.instance_variable_set(:@fallback_handlers, {})
end
let(:slave_host) { 'testhost' } let(:slave_host) { 'testhost' }
let(:slave_port) { 1234 } let(:slave_port) { 1234 }
let(:config) do let(:config) do
GlobalSetting.redis_config.dup.merge(slave_host: 'testhost', slave_port: 1234, connector: DiscourseRedis::Connector) DiscourseRedis.config.dup.merge(slave_host: 'testhost', slave_port: 1234, connector: DiscourseRedis::Connector)
end end
let(:slave_config) { DiscourseRedis.slave_config(config) } let(:fallback_handler) { DiscourseRedis::FallbackHandler.instance }
it "ignore_readonly returns nil from a pure exception" do it "ignore_readonly returns nil from a pure exception" do
result = DiscourseRedis.ignore_readonly { raise Redis::CommandError.new("READONLY") } result = DiscourseRedis.ignore_readonly { raise Redis::CommandError.new("READONLY") }
expect(result).to eq(nil) expect(result).to eq(nil)
end end
let!(:master_conn) { mock('master') }
def self.use_fake_threads
attr_reader :execution
around(:each) do |example|
scenario =
Concurrency::Scenario.new do |execution|
@execution = execution
example.run
end
scenario.run(sleep_order: true, runs: 1)
end
after(:each) do
# Doing this here, as opposed to after example.run, ensures that it
# happens before the mocha expectations are checked.
execution.wait_done
end
end
def stop_after(time)
execution.sleep(time)
execution.stop_other_tasks
end
def expect_master_info(conf = config)
conf = conf.dup
conf.delete(:connector)
Redis::Client.expects(:new)
.with(conf)
.returns(master_conn)
master_conn.expects(:disconnect)
master_conn
.expects(:call)
.with([:info])
end
def info_response(*values)
values.map { |x| x.join(':') }.join("\r\n")
end
def expect_fallback(config = slave_config)
slave_conn = mock('slave')
config = config.dup
config.delete(:connector)
Redis::Client.expects(:new)
.with(config)
.returns(slave_conn)
slave_conn.expects(:call).with([:client, [:kill, 'type', 'normal']])
slave_conn.expects(:call).with([:client, [:kill, 'type', 'pubsub']])
slave_conn.expects(:disconnect)
end
describe 'redis commands' do describe 'redis commands' do
let(:raw_redis) { Redis.new(DiscourseRedis.config) } let(:raw_redis) { Redis.new(DiscourseRedis.config) }
@ -161,349 +97,150 @@ describe DiscourseRedis do
end end
end end
describe DiscourseRedis::RedisStatus do context 'when redis connection is to a slave redis server' do
let(:redis_status) { DiscourseRedis::RedisStatus.new(config, slave_config) } it 'should check the status of the master server' do
begin
context "#master_alive?" do fallback_handler.master = false
it "returns false when the master's hostname cannot be resolved" do Discourse.redis.without_namespace.expects(:set).raises(Redis::CommandError.new("READONLY"))
expect_master_info fallback_handler.expects(:verify_master).once
.raises(RuntimeError.new('Name or service not known')) Discourse.redis.set('test', '1')
ensure
expect(redis_status.master_alive?).to eq(false) fallback_handler.master = true
end Discourse.redis.del('test')
it "raises an error if a runtime error is raised" do
error = RuntimeError.new('a random runtime error')
expect_master_info.raises(error)
expect {
redis_status.master_alive?
}.to raise_error(error)
end
it "returns false if the master is unavailable" do
expect_master_info.raises(Redis::ConnectionError.new)
expect(redis_status.master_alive?).to eq(false)
end
it "returns false if the master is loading" do
expect_master_info
.returns(info_response(['loading', '1'], ['role', 'master']))
expect(redis_status.master_alive?).to eq(false)
end
it "returns false if the master is a slave" do
expect_master_info
.returns(info_response(['loading', '0'], ['role', 'slave']))
expect(redis_status.master_alive?).to eq(false)
end
it "returns true when the master isn't loading and the role is master" do
expect_master_info
.returns(info_response(['loading', '0'], ['role', 'master']))
expect(redis_status.master_alive?).to eq(true)
end
end
context "#fallback" do
it "instructs redis to kill client connections" do
expect_fallback
redis_status.fallback
end end
end end
end end
describe DiscourseRedis::Connector do describe DiscourseRedis::Connector do
let(:connector) { DiscourseRedis::Connector.new(config) } let(:connector) { DiscourseRedis::Connector.new(config) }
let(:fallback_handler) { mock('fallback_handler') }
before do after do
DiscourseRedis::FallbackHandlers.stubs(:handler_for).returns(fallback_handler) fallback_handler.master = true
end end
it 'should return the master config when master is up' do it 'should return the master config when master is up' do
fallback_handler.expects(:use_master?).returns(true)
expect(connector.resolve).to eq(config) expect(connector.resolve).to eq(config)
end end
class BrokenRedis
def initialize(error)
@error = error
end
def call(*args)
raise @error
end
def disconnect
end
end
it 'should return the slave config when master is down' do it 'should return the slave config when master is down' do
fallback_handler.expects(:use_master?).returns(false) error = Redis::CannotConnectError
expect(connector.resolve).to eq(slave_config)
expect do
connector.resolve(BrokenRedis.new(error))
end.to raise_error(Redis::CannotConnectError)
config = connector.resolve
expect(config[:host]).to eq(slave_host)
expect(config[:port]).to eq(slave_port)
end
it "should return the slave config when master's hostname cannot be resolved" do
error = RuntimeError.new('Name or service not known')
expect do
connector.resolve(BrokenRedis.new(error))
end.to raise_error(error)
expect(fallback_handler.master).to eq(false)
config = connector.resolve
expect(config[:host]).to eq(slave_host)
expect(config[:port]).to eq(slave_port)
expect(fallback_handler.master).to eq(false)
end
it "should return the slave config when master is still loading data" do
Redis::Client.any_instance
.expects(:call)
.with([:info, :persistence])
.returns("
someconfig:haha\r
#{DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS}
")
config = connector.resolve
expect(config[:host]).to eq(slave_host)
expect(config[:port]).to eq(slave_port)
end
it "should raise the right error" do
error = RuntimeError.new('test')
2.times do
expect { connector.resolve(BrokenRedis.new(error)) }
.to raise_error(error)
end
end end
end end
describe DiscourseRedis::FallbackHandler do describe DiscourseRedis::FallbackHandler do
use_fake_threads
let!(:redis_status) { mock }
let!(:fallback_handler) { DiscourseRedis::FallbackHandler.new("", redis_status, execution) }
context "in the initial configuration" do
it "tests that the master is alive and returns true if it is" do
redis_status.expects(:master_alive?).returns(true)
expect(fallback_handler.use_master?).to eq(true)
end
it "tests that the master is alive and returns false if it is not" do
redis_status.expects(:master_alive?).returns(false)
expect(fallback_handler.use_master?).to eq(false)
stop_after(1)
end
it "tests that the master is alive and returns false if it raises an exception" do
error = Exception.new
redis_status.expects(:master_alive?).raises(error)
Discourse.expects(:warn_exception)
.with(error, message: "Error running master_alive?")
expect(fallback_handler.use_master?).to eq(false)
stop_after(1)
end
end
context "after master_alive? has returned false" do
before do before do
redis_status.expects(:master_alive?).returns(false) @original_keepalive_interval = MessageBus.keepalive_interval
expect(fallback_handler.use_master?).to eq(false)
end end
it "responds with false to the next call to use_master? without consulting redis_status" do after do
expect(fallback_handler.use_master?).to eq(false) fallback_handler.master = true
MessageBus.keepalive_interval = @original_keepalive_interval
stop_after(1)
end end
it "checks that master is alive again after a timeout" do describe '#initiate_fallback_to_master' do
redis_status.expects(:master_alive?).returns(false) it 'should return the right value if the master server is still down' do
fallback_handler.master = false
Redis::Client.any_instance.expects(:call).with([:info]).returns("Some other stuff")
stop_after(6) expect(fallback_handler.initiate_fallback_to_master).to eq(false)
expect(MessageBus.keepalive_interval).to eq(0)
end end
it "checks that master is alive again and checks again if an exception is raised" do it 'should fallback to the master server once it is up' do
error = Exception.new fallback_handler.master = false
redis_status.expects(:master_alive?).raises(error) master_conn = mock('master')
slave_conn = mock('slave')
Discourse.expects(:warn_exception) Redis::Client.expects(:new)
.with(error, message: "Error running master_alive?") .with(DiscourseRedis.config)
.returns(master_conn)
execution.sleep(6) Redis::Client.expects(:new)
.with(DiscourseRedis.slave_config)
.returns(slave_conn)
redis_status.expects(:master_alive?).returns(true) master_conn.expects(:call)
redis_status.expects(:fallback) .with([:info])
.returns("
#{DiscourseRedis::FallbackHandler::MASTER_ROLE_STATUS}\r\n
#{DiscourseRedis::FallbackHandler::MASTER_LOADED_STATUS}
")
stop_after(5) DiscourseRedis::FallbackHandler::CONNECTION_TYPES.each do |connection_type|
slave_conn.expects(:call).with(
[:client, [:kill, 'type', connection_type]]
)
end end
it "triggers a fallback after master_alive? returns true" do master_conn.expects(:disconnect)
redis_status.expects(:master_alive?).returns(true) slave_conn.expects(:disconnect)
redis_status.expects(:fallback)
stop_after(6) expect(fallback_handler.initiate_fallback_to_master).to eq(true)
end expect(fallback_handler.master).to eq(true)
expect(Discourse.recently_readonly?).to eq(false)
context "after falling back" do expect(MessageBus.keepalive_interval).to eq(-1)
before do
redis_status.expects(:master_alive?).returns(true)
redis_status.expects(:fallback)
stop_after(6)
end
it "tests that the master is alive and returns true if it is" do
redis_status.expects(:master_alive?).returns(true)
expect(fallback_handler.use_master?).to eq(true)
end
it "tests that the master is alive and returns false if it is not" do
redis_status.expects(:master_alive?).returns(false)
expect(fallback_handler.use_master?).to eq(false)
stop_after(1)
end
it "tests that the master is alive and returns false if it raises an exception" do
error = Exception.new
redis_status.expects(:master_alive?).raises(error)
Discourse.expects(:warn_exception)
.with(error, message: "Error running master_alive?")
expect(fallback_handler.use_master?).to eq(false)
stop_after(1)
end
it "doesn't do anything to redis_status for a really long time" do
stop_after(1e9)
end
end
end
end
context "when message bus and main are on the same host" do
use_fake_threads
before do
# Since config is based on GlobalSetting, we need to fetch it before
# stubbing
conf = config
GlobalSetting.stubs(:redis_config).returns(conf)
GlobalSetting.stubs(:message_bus_redis_config).returns(conf)
Concurrency::ThreadedExecution.stubs(:new).returns(execution)
end
context "when the redis master goes down" do
it "sets the message bus keepalive interval to 0" do
expect_master_info
.raises(Redis::ConnectionError.new)
MessageBus.expects(:keepalive_interval=).with(0)
DiscourseRedis::Connector.new(config).resolve
execution.stop_other_tasks
end
end
context "when the redis master comes back up" do
before do
MessageBus.keepalive_interval = 60
expect_master_info
.raises(Redis::ConnectionError.new)
DiscourseRedis::Connector.new(config).resolve
expect_master_info
.returns(info_response(['loading', '0'], ['role', 'master']))
expect_fallback
end
it "sets the message bus keepalive interval to its original value" do
MessageBus.expects(:keepalive_interval=).with(60)
end
it "calls clear_readonly! and request_refresh! on Discourse" do
Discourse.expects(:clear_readonly!)
Discourse.expects(:request_refresh!)
end
end
end
context "when message bus and main are on different hosts" do
use_fake_threads
before do
# Since config is based on GlobalSetting, we need to fetch it before stubbing
conf = config
GlobalSetting.stubs(:redis_config).returns(conf)
message_bus_config = conf.dup
message_bus_config[:port] = message_bus_config[:port].to_i + 1
message_bus_config[:slave_port] = message_bus_config[:slave_port].to_i + 1
GlobalSetting.stubs(:message_bus_redis_config).returns(message_bus_config)
Concurrency::ThreadedExecution.stubs(:new).returns(execution)
end
let(:message_bus_master_config) {
GlobalSetting.message_bus_redis_config
}
context "when the message bus master goes down" do
before do
expect_master_info(message_bus_master_config)
.raises(Redis::ConnectionError.new)
end
it "sets the message bus keepalive interval to 0" do
MessageBus.expects(:keepalive_interval=).with(0)
DiscourseRedis::Connector.new(message_bus_master_config).resolve
execution.stop_other_tasks
end
it "does not call clear_readonly! or request_refresh! on Discourse" do
Discourse.expects(:clear_readonly!).never
Discourse.expects(:request_refresh!).never
DiscourseRedis::Connector.new(message_bus_master_config).resolve
execution.stop_other_tasks
end
end
context "when the message bus master comes back up" do
before do
MessageBus.keepalive_interval = 60
expect_master_info(message_bus_master_config)
.raises(Redis::ConnectionError.new)
DiscourseRedis::Connector.new(message_bus_master_config).resolve
expect_master_info(message_bus_master_config)
.returns(info_response(['loading', '0'], ['role', 'master']))
expect_fallback(DiscourseRedis.slave_config(message_bus_master_config))
end
it "sets the message bus keepalive interval to its original value" do
MessageBus.expects(:keepalive_interval=).with(60)
end
end
context "when the main master goes down" do
before do
expect_master_info
.raises(Redis::ConnectionError.new)
end
it "does not change the message bus keepalive interval" do
MessageBus.expects(:keepalive_interval=).never
DiscourseRedis::Connector.new(config).resolve
execution.stop_other_tasks
end
end
context "when the main master comes back up" do
before do
expect_master_info
.raises(Redis::ConnectionError.new)
DiscourseRedis::Connector.new(config).resolve
expect_master_info
.returns(info_response(['loading', '0'], ['role', 'master']))
expect_fallback
end
it "does not change the message bus keepalive interval" do
MessageBus.expects(:keepalive_interval=).never
end
it "calls clear_readonly! and request_refresh! on Discourse" do
Discourse.expects(:clear_readonly!)
Discourse.expects(:request_refresh!)
end end
end end
end end

View File

@ -30,7 +30,7 @@ module Concurrency
end end
def choose_with_weights(*options) def choose_with_weights(*options)
choose(*options.map(&:first)) choose(options.map(&:first))
end end
def dead_end def dead_end
@ -147,11 +147,10 @@ module Concurrency
def initialize(path) def initialize(path)
@path = path @path = path
@tasks = [] @tasks = []
@time = 0
end end
def yield def yield
sleep(0) Fiber.yield
end end
def choose(*options) def choose(*options)
@ -162,86 +161,30 @@ module Concurrency
@path.choose_with_weights(*options) @path.choose_with_weights(*options)
end end
def stop_other_tasks
@tasts = @tasks.select! { |task| task[:fiber] == Fiber.current }
end
def sleep(length)
Fiber.yield(@time + length)
end
def start_root(&blk)
descriptor = {
fiber: Fiber.new(&blk),
run_at: 0
}
@tasks << descriptor
end
def spawn(&blk) def spawn(&blk)
descriptor = { @tasks << Fiber.new(&blk)
fiber: Fiber.new(&blk),
run_at: @time
}
@tasks << descriptor
self.yield
end end
def run(sleep_order: false) def run
until @tasks.empty? until @tasks.empty?
descriptor = task = @path.choose(*@tasks)
if sleep_order task.resume
@tasks.sort_by! { |x| x[:run_at] } unless task.alive?
run_at = @tasks.first[:run_at] @tasks.delete(task)
@path.choose(*@tasks.take_while { |x| x[:run_at] == run_at })
else
@path.choose(*@tasks)
end end
@time = [@time, descriptor[:run_at]].max
fiber = descriptor[:fiber]
begin
run_at = fiber.resume
rescue Exception
end
if fiber.alive?
descriptor[:run_at] = run_at
else
@tasks.delete(descriptor)
end end
end end
end end
def wait_done def run_with_path(path)
until @tasks.size == 1
self.sleep(1e9)
end
end
def new_mutex
Mutex.new(self)
end
end
def run_with_path(path, sleep_order: false)
execution = Execution.new(path) execution = Execution.new(path)
result = {} result = @blk.call(execution)
execution.start_root { execution.run
result[:value] = @blk.call(execution)
}
execution.run(sleep_order: sleep_order)
result result
end end
def run(sleep_order: false, **opts) def run(**opts)
Logic.run(**opts) do |path| Logic.run(**opts, &method(:run_with_path))
run_with_path(path, sleep_order: sleep_order)
end
end end
end end
@ -307,45 +250,4 @@ module Concurrency
result result
end end
end end
class Mutex
def initialize(execution)
@execution = execution
@locked_by = nil
end
def lock
@execution.yield
fiber = Fiber.current
while true
if @locked_by.nil?
@locked_by = fiber
return
elsif @locked_by == fiber
raise ThreadError, "deadlock; recursive locking"
else
@execution.yield
end
end
end
def unlock
@execution.yield
if @locked_by != Fiber.current
raise ThreadError, "Attempt to unlock a mutex which is locked by another thread"
end
@locked_by = nil
end
def synchronize
lock
begin
yield
ensure
unlock
end
end
end
end end