diff --git a/lib/concurrency.rb b/lib/concurrency.rb deleted file mode 100644 index 3bca0031055..00000000000 --- a/lib/concurrency.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true - -# This is the 'actually concurrent' counterpart to -# Concurrency::Scenario::Execution from spec/support/concurrency.rb -module Concurrency - class ThreadedExecution - def new_mutex - Mutex.new - end - - def sleep(delay) - super(delay) - nil - end - - def spawn(&blk) - Thread.new(&blk) - nil - end - end -end diff --git a/lib/discourse_redis.rb b/lib/discourse_redis.rb index 912aea25843..728e244db74 100644 --- a/lib/discourse_redis.rb +++ b/lib/discourse_redis.rb @@ -3,248 +3,139 @@ # # A wrapper around redis that namespaces keys with the current site id # -require_dependency 'cache' -require_dependency 'concurrency' class DiscourseRedis - class RedisStatus + class FallbackHandler + include Singleton + MASTER_ROLE_STATUS = "role:master".freeze + MASTER_LOADING_STATUS = "loading:1".freeze MASTER_LOADED_STATUS = "loading:0".freeze CONNECTION_TYPES = %w{normal pubsub}.each(&:freeze) - def initialize(master_config, slave_config) - master_config = master_config.dup.freeze unless master_config.frozen? - slave_config = slave_config.dup.freeze unless slave_config.frozen? - - @master_config = master_config - @slave_config = slave_config - end - - def master_alive? - master_client = connect(@master_config) - - begin - info = master_client.call([:info]) - rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex - raise ex if ex.class == RuntimeError && ex.message != "Name or service not known" - warn "Master not alive, error connecting" - return false - ensure - master_client.disconnect - end - - unless info.include?(MASTER_LOADED_STATUS) - warn "Master not alive, status is loading" - return false - end - - unless info.include?(MASTER_ROLE_STATUS) - warn "Master not alive, role != master" - return false - end - - true - end - - def fallback - warn "Killing connections to slave..." - - slave_client = connect(@slave_config) - - begin - CONNECTION_TYPES.each do |connection_type| - slave_client.call([:client, [:kill, 'type', connection_type]]) - end - rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex - raise ex if ex.class == RuntimeError && ex.message != "Name or service not known" - warn "Attempted a redis fallback, but connection to slave failed" - ensure - slave_client.disconnect - end - end - - private - - def connect(config) - config = config.dup - config.delete(:connector) - ::Redis::Client.new(config) - end - - def log_prefix - @log_prefix ||= begin - master_string = "#{@master_config[:host]}:#{@master_config[:port]}" - slave_string = "#{@slave_config[:host]}:#{@slave_config[:port]}" - "RedisStatus master=#{master_string} slave=#{slave_string}" - end - end - - def warn(message) - Rails.logger.warn "#{log_prefix}: #{message}" - end - end - - class FallbackHandler - def initialize(log_prefix, redis_status, execution) - @log_prefix = log_prefix - @redis_status = redis_status - @mutex = execution.new_mutex - @execution = execution + def initialize @master = true - @event_handlers = [] + @running = false + @mutex = Mutex.new + @slave_config = DiscourseRedis.slave_config + @message_bus_keepalive_interval = MessageBus.keepalive_interval end - def add_callbacks(handler) - @mutex.synchronize do - @event_handlers << handler - end - end + def verify_master + synchronize do + return if @thread && @thread.alive? - def start_reset - @mutex.synchronize do - if @master - @master = false - trigger(:down) - true - else - false - end - end - end - - def use_master? - master = @mutex.synchronize { @master } - if !master - false - elsif safe_master_alive? - true - else - if start_reset - @execution.spawn do - loop do - @execution.sleep 5 - info "Checking connection to master" - if safe_master_alive? - @mutex.synchronize do - @master = true - @redis_status.fallback - trigger(:up) - end - break - end + @thread = Thread.new do + loop do + begin + thread = Thread.new { initiate_fallback_to_master } + thread.join + break if synchronize { @master } + sleep 5 + ensure + thread.kill end end end + end + end - false + def initiate_fallback_to_master + success = false + + begin + redis_config = DiscourseRedis.config.dup + redis_config.delete(:connector) + master_client = ::Redis::Client.new(redis_config) + logger.warn "#{log_prefix}: Checking connection to master server..." + info = master_client.call([:info]) + + if info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS) + begin + logger.warn "#{log_prefix}: Master server is active, killing all connections to slave..." + + self.master = true + slave_client = ::Redis::Client.new(@slave_config) + + CONNECTION_TYPES.each do |connection_type| + slave_client.call([:client, [:kill, 'type', connection_type]]) + end + + MessageBus.keepalive_interval = @message_bus_keepalive_interval + Discourse.clear_readonly! + Discourse.request_refresh! + success = true + ensure + slave_client&.disconnect + end + end + rescue => e + logger.warn "#{log_prefix}: Connection to Master server failed with '#{e.message}'" + ensure + master_client&.disconnect + end + + success + end + + def master + synchronize { @master } + end + + def master=(args) + synchronize do + @master = args + + # Disables MessageBus keepalive when Redis is in readonly mode + MessageBus.keepalive_interval = 0 if !@master end end private - attr_reader :log_prefix - - def trigger(event) - @event_handlers.each do |handler| - begin - handler.public_send(event) - rescue Exception => e - Discourse.warn_exception(e, message: "Error running FallbackHandler callback") - end - end + def synchronize + @mutex.synchronize { yield } end - def info(message) - Rails.logger.info "#{log_prefix}: #{message}" + def logger + Rails.logger end - def safe_master_alive? - begin - @redis_status.master_alive? - rescue Exception => e - Discourse.warn_exception(e, message: "Error running master_alive?") - false - end - end - end - - class MessageBusFallbackCallbacks - def down - @keepalive_interval, MessageBus.keepalive_interval = - MessageBus.keepalive_interval, 0 - end - - def up - MessageBus.keepalive_interval = @keepalive_interval - end - end - - class MainRedisReadOnlyCallbacks - def down - end - - def up - Discourse.clear_readonly! - Discourse.request_refresh! - end - end - - class FallbackHandlers - include Singleton - - def initialize - @mutex = Mutex.new - @fallback_handlers = {} - end - - def handler_for(config) - config = config.dup.freeze unless config.frozen? - - @mutex.synchronize do - @fallback_handlers[[config[:host], config[:port]]] ||= begin - log_prefix = "FallbackHandler #{config[:host]}:#{config[:port]}" - slave_config = DiscourseRedis.slave_config(config) - redis_status = RedisStatus.new(config, slave_config) - - handler = - FallbackHandler.new( - log_prefix, - redis_status, - Concurrency::ThreadedExecution.new - ) - - if config == GlobalSetting.redis_config - handler.add_callbacks(MainRedisReadOnlyCallbacks.new) - end - - if config == GlobalSetting.message_bus_redis_config - handler.add_callbacks(MessageBusFallbackCallbacks.new) - end - - handler - end - end - end - - def self.handler_for(config) - instance.handler_for(config) + def log_prefix + "#{self.class}" end end class Connector < Redis::Client::Connector def initialize(options) - options = options.dup.freeze unless options.frozen? - super(options) - @slave_options = DiscourseRedis.slave_config(options).freeze - @fallback_handler = DiscourseRedis::FallbackHandlers.handler_for(options) + @slave_options = DiscourseRedis.slave_config(options) + @fallback_handler = DiscourseRedis::FallbackHandler.instance end - def resolve - if @fallback_handler.use_master? - @options - else - @slave_options + def resolve(client = nil) + if !@fallback_handler.master + @fallback_handler.verify_master + return @slave_options + end + + begin + options = @options.dup + options.delete(:connector) + client ||= Redis::Client.new(options) + + loading = client.call([:info, :persistence]).include?( + DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS + ) + + loading ? @slave_options : @options + rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex + raise ex if ex.class == RuntimeError && ex.message != "Name or service not known" + @fallback_handler.master = false + @fallback_handler.verify_master + raise ex + ensure + client.disconnect end end end @@ -268,6 +159,10 @@ class DiscourseRedis @namespace = namespace end + def self.fallback_handler + @fallback_handler ||= DiscourseRedis::FallbackHandler.instance + end + def without_namespace # Only use this if you want to store and fetch data that's shared between sites @redis @@ -281,6 +176,7 @@ class DiscourseRedis STDERR.puts "WARN: Redis is in a readonly state. Performed a noop" end + fallback_handler.verify_master if !fallback_handler.master Discourse.received_redis_readonly! nil else @@ -406,4 +302,5 @@ class DiscourseRedis def remove_namespace(key) key[(namespace.length + 1)..-1] end + end diff --git a/spec/components/discourse_redis_spec.rb b/spec/components/discourse_redis_spec.rb index d769afdebfa..6d4af1d8174 100644 --- a/spec/components/discourse_redis_spec.rb +++ b/spec/components/discourse_redis_spec.rb @@ -3,84 +3,20 @@ require 'rails_helper' describe DiscourseRedis do - before do - DiscourseRedis::FallbackHandlers.instance.instance_variable_set(:@fallback_handlers, {}) - end - let(:slave_host) { 'testhost' } let(:slave_port) { 1234 } let(:config) do - GlobalSetting.redis_config.dup.merge(slave_host: 'testhost', slave_port: 1234, connector: DiscourseRedis::Connector) + DiscourseRedis.config.dup.merge(slave_host: 'testhost', slave_port: 1234, connector: DiscourseRedis::Connector) end - let(:slave_config) { DiscourseRedis.slave_config(config) } + let(:fallback_handler) { DiscourseRedis::FallbackHandler.instance } it "ignore_readonly returns nil from a pure exception" do result = DiscourseRedis.ignore_readonly { raise Redis::CommandError.new("READONLY") } expect(result).to eq(nil) end - let!(:master_conn) { mock('master') } - - def self.use_fake_threads - attr_reader :execution - - around(:each) do |example| - scenario = - Concurrency::Scenario.new do |execution| - @execution = execution - example.run - end - - scenario.run(sleep_order: true, runs: 1) - end - - after(:each) do - # Doing this here, as opposed to after example.run, ensures that it - # happens before the mocha expectations are checked. - execution.wait_done - end - end - - def stop_after(time) - execution.sleep(time) - execution.stop_other_tasks - end - - def expect_master_info(conf = config) - conf = conf.dup - conf.delete(:connector) - - Redis::Client.expects(:new) - .with(conf) - .returns(master_conn) - - master_conn.expects(:disconnect) - master_conn - .expects(:call) - .with([:info]) - end - - def info_response(*values) - values.map { |x| x.join(':') }.join("\r\n") - end - - def expect_fallback(config = slave_config) - slave_conn = mock('slave') - - config = config.dup - config.delete(:connector) - - Redis::Client.expects(:new) - .with(config) - .returns(slave_conn) - - slave_conn.expects(:call).with([:client, [:kill, 'type', 'normal']]) - slave_conn.expects(:call).with([:client, [:kill, 'type', 'pubsub']]) - slave_conn.expects(:disconnect) - end - describe 'redis commands' do let(:raw_redis) { Redis.new(DiscourseRedis.config) } @@ -161,349 +97,150 @@ describe DiscourseRedis do end end - describe DiscourseRedis::RedisStatus do - let(:redis_status) { DiscourseRedis::RedisStatus.new(config, slave_config) } - - context "#master_alive?" do - it "returns false when the master's hostname cannot be resolved" do - expect_master_info - .raises(RuntimeError.new('Name or service not known')) - - expect(redis_status.master_alive?).to eq(false) - end - - it "raises an error if a runtime error is raised" do - error = RuntimeError.new('a random runtime error') - expect_master_info.raises(error) - - expect { - redis_status.master_alive? - }.to raise_error(error) - end - - it "returns false if the master is unavailable" do - expect_master_info.raises(Redis::ConnectionError.new) - - expect(redis_status.master_alive?).to eq(false) - end - - it "returns false if the master is loading" do - expect_master_info - .returns(info_response(['loading', '1'], ['role', 'master'])) - - expect(redis_status.master_alive?).to eq(false) - end - - it "returns false if the master is a slave" do - expect_master_info - .returns(info_response(['loading', '0'], ['role', 'slave'])) - - expect(redis_status.master_alive?).to eq(false) - end - - it "returns true when the master isn't loading and the role is master" do - expect_master_info - .returns(info_response(['loading', '0'], ['role', 'master'])) - - expect(redis_status.master_alive?).to eq(true) - end - end - - context "#fallback" do - it "instructs redis to kill client connections" do - expect_fallback - - redis_status.fallback + context 'when redis connection is to a slave redis server' do + it 'should check the status of the master server' do + begin + fallback_handler.master = false + Discourse.redis.without_namespace.expects(:set).raises(Redis::CommandError.new("READONLY")) + fallback_handler.expects(:verify_master).once + Discourse.redis.set('test', '1') + ensure + fallback_handler.master = true + Discourse.redis.del('test') end end end describe DiscourseRedis::Connector do let(:connector) { DiscourseRedis::Connector.new(config) } - let(:fallback_handler) { mock('fallback_handler') } - before do - DiscourseRedis::FallbackHandlers.stubs(:handler_for).returns(fallback_handler) + after do + fallback_handler.master = true end it 'should return the master config when master is up' do - fallback_handler.expects(:use_master?).returns(true) expect(connector.resolve).to eq(config) end + class BrokenRedis + def initialize(error) + @error = error + end + + def call(*args) + raise @error + end + + def disconnect + end + end + it 'should return the slave config when master is down' do - fallback_handler.expects(:use_master?).returns(false) - expect(connector.resolve).to eq(slave_config) + error = Redis::CannotConnectError + + expect do + connector.resolve(BrokenRedis.new(error)) + end.to raise_error(Redis::CannotConnectError) + + config = connector.resolve + + expect(config[:host]).to eq(slave_host) + expect(config[:port]).to eq(slave_port) + end + + it "should return the slave config when master's hostname cannot be resolved" do + error = RuntimeError.new('Name or service not known') + + expect do + connector.resolve(BrokenRedis.new(error)) + end.to raise_error(error) + + expect(fallback_handler.master).to eq(false) + + config = connector.resolve + + expect(config[:host]).to eq(slave_host) + expect(config[:port]).to eq(slave_port) + expect(fallback_handler.master).to eq(false) + end + + it "should return the slave config when master is still loading data" do + Redis::Client.any_instance + .expects(:call) + .with([:info, :persistence]) + .returns(" + someconfig:haha\r + #{DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS} + ") + + config = connector.resolve + + expect(config[:host]).to eq(slave_host) + expect(config[:port]).to eq(slave_port) + end + + it "should raise the right error" do + error = RuntimeError.new('test') + + 2.times do + expect { connector.resolve(BrokenRedis.new(error)) } + .to raise_error(error) + end end end describe DiscourseRedis::FallbackHandler do - use_fake_threads - - let!(:redis_status) { mock } - let!(:fallback_handler) { DiscourseRedis::FallbackHandler.new("", redis_status, execution) } - - context "in the initial configuration" do - it "tests that the master is alive and returns true if it is" do - redis_status.expects(:master_alive?).returns(true) - - expect(fallback_handler.use_master?).to eq(true) - end - - it "tests that the master is alive and returns false if it is not" do - redis_status.expects(:master_alive?).returns(false) - expect(fallback_handler.use_master?).to eq(false) - - stop_after(1) - end - - it "tests that the master is alive and returns false if it raises an exception" do - error = Exception.new - redis_status.expects(:master_alive?).raises(error) - - Discourse.expects(:warn_exception) - .with(error, message: "Error running master_alive?") - - expect(fallback_handler.use_master?).to eq(false) - - stop_after(1) - end - end - - context "after master_alive? has returned false" do - before do - redis_status.expects(:master_alive?).returns(false) - expect(fallback_handler.use_master?).to eq(false) - end - - it "responds with false to the next call to use_master? without consulting redis_status" do - expect(fallback_handler.use_master?).to eq(false) - - stop_after(1) - end - - it "checks that master is alive again after a timeout" do - redis_status.expects(:master_alive?).returns(false) - - stop_after(6) - end - - it "checks that master is alive again and checks again if an exception is raised" do - error = Exception.new - redis_status.expects(:master_alive?).raises(error) - - Discourse.expects(:warn_exception) - .with(error, message: "Error running master_alive?") - - execution.sleep(6) - - redis_status.expects(:master_alive?).returns(true) - redis_status.expects(:fallback) - - stop_after(5) - end - - it "triggers a fallback after master_alive? returns true" do - redis_status.expects(:master_alive?).returns(true) - redis_status.expects(:fallback) - - stop_after(6) - end - - context "after falling back" do - before do - redis_status.expects(:master_alive?).returns(true) - redis_status.expects(:fallback) - - stop_after(6) - end - - it "tests that the master is alive and returns true if it is" do - redis_status.expects(:master_alive?).returns(true) - - expect(fallback_handler.use_master?).to eq(true) - end - - it "tests that the master is alive and returns false if it is not" do - redis_status.expects(:master_alive?).returns(false) - expect(fallback_handler.use_master?).to eq(false) - - stop_after(1) - end - - it "tests that the master is alive and returns false if it raises an exception" do - error = Exception.new - redis_status.expects(:master_alive?).raises(error) - - Discourse.expects(:warn_exception) - .with(error, message: "Error running master_alive?") - - expect(fallback_handler.use_master?).to eq(false) - - stop_after(1) - end - - it "doesn't do anything to redis_status for a really long time" do - stop_after(1e9) - end - end - end - end - - context "when message bus and main are on the same host" do - use_fake_threads - before do - # Since config is based on GlobalSetting, we need to fetch it before - # stubbing - conf = config - - GlobalSetting.stubs(:redis_config).returns(conf) - GlobalSetting.stubs(:message_bus_redis_config).returns(conf) - - Concurrency::ThreadedExecution.stubs(:new).returns(execution) + @original_keepalive_interval = MessageBus.keepalive_interval end - context "when the redis master goes down" do - it "sets the message bus keepalive interval to 0" do - expect_master_info - .raises(Redis::ConnectionError.new) - - MessageBus.expects(:keepalive_interval=).with(0) - - DiscourseRedis::Connector.new(config).resolve - - execution.stop_other_tasks - end + after do + fallback_handler.master = true + MessageBus.keepalive_interval = @original_keepalive_interval end - context "when the redis master comes back up" do - before do - MessageBus.keepalive_interval = 60 + describe '#initiate_fallback_to_master' do + it 'should return the right value if the master server is still down' do + fallback_handler.master = false + Redis::Client.any_instance.expects(:call).with([:info]).returns("Some other stuff") - expect_master_info - .raises(Redis::ConnectionError.new) - - DiscourseRedis::Connector.new(config).resolve - - expect_master_info - .returns(info_response(['loading', '0'], ['role', 'master'])) - - expect_fallback + expect(fallback_handler.initiate_fallback_to_master).to eq(false) + expect(MessageBus.keepalive_interval).to eq(0) end - it "sets the message bus keepalive interval to its original value" do - MessageBus.expects(:keepalive_interval=).with(60) - end + it 'should fallback to the master server once it is up' do + fallback_handler.master = false + master_conn = mock('master') + slave_conn = mock('slave') - it "calls clear_readonly! and request_refresh! on Discourse" do - Discourse.expects(:clear_readonly!) - Discourse.expects(:request_refresh!) - end - end - end + Redis::Client.expects(:new) + .with(DiscourseRedis.config) + .returns(master_conn) - context "when message bus and main are on different hosts" do - use_fake_threads + Redis::Client.expects(:new) + .with(DiscourseRedis.slave_config) + .returns(slave_conn) - before do - # Since config is based on GlobalSetting, we need to fetch it before stubbing - conf = config + master_conn.expects(:call) + .with([:info]) + .returns(" + #{DiscourseRedis::FallbackHandler::MASTER_ROLE_STATUS}\r\n + #{DiscourseRedis::FallbackHandler::MASTER_LOADED_STATUS} + ") - GlobalSetting.stubs(:redis_config).returns(conf) + DiscourseRedis::FallbackHandler::CONNECTION_TYPES.each do |connection_type| + slave_conn.expects(:call).with( + [:client, [:kill, 'type', connection_type]] + ) + end - message_bus_config = conf.dup - message_bus_config[:port] = message_bus_config[:port].to_i + 1 - message_bus_config[:slave_port] = message_bus_config[:slave_port].to_i + 1 + master_conn.expects(:disconnect) + slave_conn.expects(:disconnect) - GlobalSetting.stubs(:message_bus_redis_config).returns(message_bus_config) - - Concurrency::ThreadedExecution.stubs(:new).returns(execution) - end - - let(:message_bus_master_config) { - GlobalSetting.message_bus_redis_config - } - - context "when the message bus master goes down" do - before do - expect_master_info(message_bus_master_config) - .raises(Redis::ConnectionError.new) - end - - it "sets the message bus keepalive interval to 0" do - MessageBus.expects(:keepalive_interval=).with(0) - - DiscourseRedis::Connector.new(message_bus_master_config).resolve - - execution.stop_other_tasks - end - - it "does not call clear_readonly! or request_refresh! on Discourse" do - Discourse.expects(:clear_readonly!).never - Discourse.expects(:request_refresh!).never - - DiscourseRedis::Connector.new(message_bus_master_config).resolve - - execution.stop_other_tasks - end - end - - context "when the message bus master comes back up" do - before do - MessageBus.keepalive_interval = 60 - - expect_master_info(message_bus_master_config) - .raises(Redis::ConnectionError.new) - - DiscourseRedis::Connector.new(message_bus_master_config).resolve - - expect_master_info(message_bus_master_config) - .returns(info_response(['loading', '0'], ['role', 'master'])) - - expect_fallback(DiscourseRedis.slave_config(message_bus_master_config)) - end - - it "sets the message bus keepalive interval to its original value" do - MessageBus.expects(:keepalive_interval=).with(60) - end - end - - context "when the main master goes down" do - before do - expect_master_info - .raises(Redis::ConnectionError.new) - end - - it "does not change the message bus keepalive interval" do - MessageBus.expects(:keepalive_interval=).never - - DiscourseRedis::Connector.new(config).resolve - - execution.stop_other_tasks - end - end - - context "when the main master comes back up" do - before do - expect_master_info - .raises(Redis::ConnectionError.new) - - DiscourseRedis::Connector.new(config).resolve - - expect_master_info - .returns(info_response(['loading', '0'], ['role', 'master'])) - - expect_fallback - end - - it "does not change the message bus keepalive interval" do - MessageBus.expects(:keepalive_interval=).never - end - - it "calls clear_readonly! and request_refresh! on Discourse" do - Discourse.expects(:clear_readonly!) - Discourse.expects(:request_refresh!) + expect(fallback_handler.initiate_fallback_to_master).to eq(true) + expect(fallback_handler.master).to eq(true) + expect(Discourse.recently_readonly?).to eq(false) + expect(MessageBus.keepalive_interval).to eq(-1) end end end diff --git a/spec/support/concurrency.rb b/spec/support/concurrency.rb index 062bcab56b5..5d3cd0c87c4 100644 --- a/spec/support/concurrency.rb +++ b/spec/support/concurrency.rb @@ -30,7 +30,7 @@ module Concurrency end def choose_with_weights(*options) - choose(*options.map(&:first)) + choose(options.map(&:first)) end def dead_end @@ -147,11 +147,10 @@ module Concurrency def initialize(path) @path = path @tasks = [] - @time = 0 end def yield - sleep(0) + Fiber.yield end def choose(*options) @@ -162,86 +161,30 @@ module Concurrency @path.choose_with_weights(*options) end - def stop_other_tasks - @tasts = @tasks.select! { |task| task[:fiber] == Fiber.current } - end - - def sleep(length) - Fiber.yield(@time + length) - end - - def start_root(&blk) - descriptor = { - fiber: Fiber.new(&blk), - run_at: 0 - } - - @tasks << descriptor - end - def spawn(&blk) - descriptor = { - fiber: Fiber.new(&blk), - run_at: @time - } - - @tasks << descriptor - - self.yield + @tasks << Fiber.new(&blk) end - def run(sleep_order: false) + def run until @tasks.empty? - descriptor = - if sleep_order - @tasks.sort_by! { |x| x[:run_at] } - run_at = @tasks.first[:run_at] - @path.choose(*@tasks.take_while { |x| x[:run_at] == run_at }) - else - @path.choose(*@tasks) - end - - @time = [@time, descriptor[:run_at]].max - fiber = descriptor[:fiber] - - begin - run_at = fiber.resume - rescue Exception - end - - if fiber.alive? - descriptor[:run_at] = run_at - else - @tasks.delete(descriptor) + task = @path.choose(*@tasks) + task.resume + unless task.alive? + @tasks.delete(task) end end end - - def wait_done - until @tasks.size == 1 - self.sleep(1e9) - end - end - - def new_mutex - Mutex.new(self) - end end - def run_with_path(path, sleep_order: false) + def run_with_path(path) execution = Execution.new(path) - result = {} - execution.start_root { - result[:value] = @blk.call(execution) - } - execution.run(sleep_order: sleep_order) + result = @blk.call(execution) + execution.run result end - def run(sleep_order: false, **opts) - Logic.run(**opts) do |path| - run_with_path(path, sleep_order: sleep_order) - end + def run(**opts) + Logic.run(**opts, &method(:run_with_path)) end end @@ -307,45 +250,4 @@ module Concurrency result end end - - class Mutex - def initialize(execution) - @execution = execution - @locked_by = nil - end - - def lock - @execution.yield - - fiber = Fiber.current - while true - if @locked_by.nil? - @locked_by = fiber - return - elsif @locked_by == fiber - raise ThreadError, "deadlock; recursive locking" - else - @execution.yield - end - end - end - - def unlock - @execution.yield - - if @locked_by != Fiber.current - raise ThreadError, "Attempt to unlock a mutex which is locked by another thread" - end - @locked_by = nil - end - - def synchronize - lock - begin - yield - ensure - unlock - end - end - end end