mirror of
https://github.com/discourse/discourse.git
synced 2025-05-22 22:43:33 +08:00
Remove use of concurrent timer for Redis failover.
* Uses the same logic for Postgres failover.
This commit is contained in:
@ -2,6 +2,7 @@
|
|||||||
# A wrapper around redis that namespaces keys with the current site id
|
# A wrapper around redis that namespaces keys with the current site id
|
||||||
#
|
#
|
||||||
require_dependency 'cache'
|
require_dependency 'cache'
|
||||||
|
|
||||||
class DiscourseRedis
|
class DiscourseRedis
|
||||||
class FallbackHandler
|
class FallbackHandler
|
||||||
include Singleton
|
include Singleton
|
||||||
@ -14,16 +15,24 @@ class DiscourseRedis
|
|||||||
@running = false
|
@running = false
|
||||||
@mutex = Mutex.new
|
@mutex = Mutex.new
|
||||||
@slave_config = DiscourseRedis.slave_config
|
@slave_config = DiscourseRedis.slave_config
|
||||||
@timer_task = init_timer_task
|
|
||||||
@message_bus_keepalive_interval = MessageBus.keepalive_interval
|
@message_bus_keepalive_interval = MessageBus.keepalive_interval
|
||||||
end
|
end
|
||||||
|
|
||||||
def verify_master
|
def verify_master
|
||||||
synchronize do
|
synchronize { return if @thread && @thread.alive? }
|
||||||
return if @timer_task.running?
|
|
||||||
end
|
|
||||||
|
|
||||||
@timer_task.execute
|
@thread = Thread.new do
|
||||||
|
loop do
|
||||||
|
begin
|
||||||
|
thread = Thread.new { initiate_fallback_to_master }
|
||||||
|
thread.join
|
||||||
|
break if synchronize { @master }
|
||||||
|
sleep 10
|
||||||
|
ensure
|
||||||
|
thread.kill
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def initiate_fallback_to_master
|
def initiate_fallback_to_master
|
||||||
@ -31,10 +40,10 @@ class DiscourseRedis
|
|||||||
|
|
||||||
begin
|
begin
|
||||||
slave_client = ::Redis::Client.new(@slave_config)
|
slave_client = ::Redis::Client.new(@slave_config)
|
||||||
logger.info "#{log_prefix}: Checking connection to master server..."
|
logger.warn "#{log_prefix}: Checking connection to master server..."
|
||||||
|
|
||||||
if slave_client.call([:info]).split("\r\n").include?(MASTER_LINK_STATUS)
|
if slave_client.call([:info]).split("\r\n").include?(MASTER_LINK_STATUS)
|
||||||
logger.info "#{log_prefix}: Master server is active, killing all connections to slave..."
|
logger.warn "#{log_prefix}: Master server is active, killing all connections to slave..."
|
||||||
|
|
||||||
self.master = true
|
self.master = true
|
||||||
|
|
||||||
@ -67,18 +76,8 @@ class DiscourseRedis
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def running?
|
|
||||||
@timer_task.running?
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def init_timer_task
|
|
||||||
Concurrent::TimerTask.new(execution_interval: 10) do |task|
|
|
||||||
task.shutdown if initiate_fallback_to_master
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def synchronize
|
def synchronize
|
||||||
@mutex.synchronize { yield }
|
@mutex.synchronize { yield }
|
||||||
end
|
end
|
||||||
@ -101,7 +100,7 @@ class DiscourseRedis
|
|||||||
|
|
||||||
def resolve(client = nil)
|
def resolve(client = nil)
|
||||||
if !@fallback_handler.master
|
if !@fallback_handler.master
|
||||||
@fallback_handler.verify_master unless @fallback_handler.running?
|
@fallback_handler.verify_master
|
||||||
return @slave_options
|
return @slave_options
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -114,7 +113,7 @@ class DiscourseRedis
|
|||||||
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
|
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
|
||||||
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
|
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
|
||||||
@fallback_handler.master = false
|
@fallback_handler.master = false
|
||||||
@fallback_handler.verify_master unless @fallback_handler.running?
|
@fallback_handler.verify_master
|
||||||
raise ex
|
raise ex
|
||||||
ensure
|
ensure
|
||||||
client.disconnect
|
client.disconnect
|
||||||
@ -182,7 +181,7 @@ class DiscourseRedis
|
|||||||
:msetnx, :persist, :pexpire, :pexpireat, :psetex, :pttl, :rename, :renamenx, :rpop, :rpoplpush, :rpush, :rpushx, :sadd, :scard,
|
:msetnx, :persist, :pexpire, :pexpireat, :psetex, :pttl, :rename, :renamenx, :rpop, :rpoplpush, :rpush, :rpushx, :sadd, :scard,
|
||||||
:sdiff, :set, :setbit, :setex, :setnx, :setrange, :sinter, :sismember, :smembers, :sort, :spop, :srandmember, :srem, :strlen,
|
:sdiff, :set, :setbit, :setex, :setnx, :setrange, :sinter, :sismember, :smembers, :sort, :spop, :srandmember, :srem, :strlen,
|
||||||
:sunion, :ttl, :type, :watch, :zadd, :zcard, :zcount, :zincrby, :zrange, :zrangebyscore, :zrank, :zrem, :zremrangebyrank,
|
:sunion, :ttl, :type, :watch, :zadd, :zcard, :zcount, :zincrby, :zrange, :zrangebyscore, :zrank, :zrem, :zremrangebyrank,
|
||||||
:zremrangebyscore, :zrevrange, :zrevrangebyscore, :zrevrank, :zrangebyscore].each do |m|
|
:zremrangebyscore, :zrevrange, :zrevrangebyscore, :zrevrank, :zrangebyscore, :evalsha, :eval].each do |m|
|
||||||
define_method m do |*args|
|
define_method m do |*args|
|
||||||
args[0] = "#{namespace}:#{args[0]}" if @namespace
|
args[0] = "#{namespace}:#{args[0]}" if @namespace
|
||||||
DiscourseRedis.ignore_readonly { @redis.send(m, *args) }
|
DiscourseRedis.ignore_readonly { @redis.send(m, *args) }
|
||||||
|
@ -135,14 +135,13 @@ describe DiscourseRedis do
|
|||||||
error = RuntimeError.new('Name or service not known')
|
error = RuntimeError.new('Name or service not known')
|
||||||
|
|
||||||
expect { connector.resolve(BrokenRedis.new(error)) }.to raise_error(error)
|
expect { connector.resolve(BrokenRedis.new(error)) }.to raise_error(error)
|
||||||
fallback_handler.instance_variable_get(:@timer_task).shutdown
|
expect(fallback_handler.master).to eq(false)
|
||||||
expect(fallback_handler.running?).to eq(false)
|
|
||||||
|
|
||||||
config = connector.resolve
|
config = connector.resolve
|
||||||
|
|
||||||
expect(config[:host]).to eq(slave_host)
|
expect(config[:host]).to eq(slave_host)
|
||||||
expect(config[:port]).to eq(slave_port)
|
expect(config[:port]).to eq(slave_port)
|
||||||
expect(fallback_handler.running?).to eq(true)
|
expect(fallback_handler.master).to eq(false)
|
||||||
ensure
|
ensure
|
||||||
fallback_handler.master = true
|
fallback_handler.master = true
|
||||||
end
|
end
|
||||||
|
Reference in New Issue
Block a user