mirror of
https://github.com/discourse/discourse.git
synced 2025-05-22 07:53:49 +08:00
DEV: Use rails_failover gem for ActiveRecord and Redis failover handling
This commit is contained in:
@ -5,141 +5,6 @@
|
||||
#
|
||||
|
||||
class DiscourseRedis
|
||||
class FallbackHandler
|
||||
include Singleton
|
||||
|
||||
MASTER_ROLE_STATUS = "role:master"
|
||||
MASTER_LOADING_STATUS = "loading:1"
|
||||
MASTER_LOADED_STATUS = "loading:0"
|
||||
CONNECTION_TYPES = %w{normal pubsub}
|
||||
|
||||
def initialize
|
||||
@master = true
|
||||
@running = false
|
||||
@mutex = Mutex.new
|
||||
@slave_config = DiscourseRedis.slave_config
|
||||
@message_bus_keepalive_interval = MessageBus.keepalive_interval
|
||||
end
|
||||
|
||||
def verify_master
|
||||
synchronize do
|
||||
return if @thread && @thread.alive?
|
||||
|
||||
@thread = Thread.new do
|
||||
loop do
|
||||
begin
|
||||
thread = Thread.new { initiate_fallback_to_master }
|
||||
thread.join
|
||||
break if synchronize { @master }
|
||||
sleep 5
|
||||
ensure
|
||||
thread.kill
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def initiate_fallback_to_master
|
||||
success = false
|
||||
|
||||
begin
|
||||
redis_config = DiscourseRedis.config.dup
|
||||
redis_config.delete(:connector)
|
||||
master_client = ::Redis::Client.new(redis_config)
|
||||
logger.warn "#{log_prefix}: Checking connection to master server..."
|
||||
info = master_client.call([:info])
|
||||
|
||||
if info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS)
|
||||
begin
|
||||
logger.warn "#{log_prefix}: Master server is active, killing all connections to slave..."
|
||||
|
||||
self.master = true
|
||||
slave_client = ::Redis::Client.new(@slave_config)
|
||||
|
||||
CONNECTION_TYPES.each do |connection_type|
|
||||
slave_client.call([:client, [:kill, 'type', connection_type]])
|
||||
end
|
||||
|
||||
MessageBus.keepalive_interval = @message_bus_keepalive_interval
|
||||
Discourse.clear_readonly!
|
||||
Discourse.request_refresh!
|
||||
success = true
|
||||
ensure
|
||||
slave_client&.disconnect
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
logger.warn "#{log_prefix}: Connection to Master server failed with '#{e.message}'"
|
||||
ensure
|
||||
master_client&.disconnect
|
||||
end
|
||||
|
||||
success
|
||||
end
|
||||
|
||||
def master
|
||||
synchronize { @master }
|
||||
end
|
||||
|
||||
def master=(args)
|
||||
synchronize do
|
||||
@master = args
|
||||
|
||||
# Disables MessageBus keepalive when Redis is in readonly mode
|
||||
MessageBus.keepalive_interval = 0 if !@master
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def synchronize
|
||||
@mutex.synchronize { yield }
|
||||
end
|
||||
|
||||
def logger
|
||||
Rails.logger
|
||||
end
|
||||
|
||||
def log_prefix
|
||||
"#{self.class}"
|
||||
end
|
||||
end
|
||||
|
||||
class Connector < Redis::Client::Connector
|
||||
def initialize(options)
|
||||
super(options)
|
||||
@slave_options = DiscourseRedis.slave_config(options)
|
||||
@fallback_handler = DiscourseRedis::FallbackHandler.instance
|
||||
end
|
||||
|
||||
def resolve(client = nil)
|
||||
if !@fallback_handler.master
|
||||
@fallback_handler.verify_master
|
||||
return @slave_options
|
||||
end
|
||||
|
||||
begin
|
||||
options = @options.dup
|
||||
options.delete(:connector)
|
||||
client ||= Redis::Client.new(options)
|
||||
|
||||
loading = client.call([:info, :persistence]).include?(
|
||||
DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS
|
||||
)
|
||||
|
||||
loading ? @slave_options : @options
|
||||
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
|
||||
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
|
||||
@fallback_handler.master = false
|
||||
@fallback_handler.verify_master
|
||||
raise ex
|
||||
ensure
|
||||
client.disconnect
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.raw_connection(config = nil)
|
||||
config ||= self.config
|
||||
Redis.new(config)
|
||||
@ -149,20 +14,12 @@ class DiscourseRedis
|
||||
GlobalSetting.redis_config
|
||||
end
|
||||
|
||||
def self.slave_config(options = config)
|
||||
options.dup.merge!(host: options[:slave_host], port: options[:slave_port])
|
||||
end
|
||||
|
||||
def initialize(config = nil, namespace: true)
|
||||
@config = config || DiscourseRedis.config
|
||||
@redis = DiscourseRedis.raw_connection(@config.dup)
|
||||
@namespace = namespace
|
||||
end
|
||||
|
||||
def self.fallback_handler
|
||||
@fallback_handler ||= DiscourseRedis::FallbackHandler.instance
|
||||
end
|
||||
|
||||
def without_namespace
|
||||
# Only use this if you want to store and fetch data that's shared between sites
|
||||
@redis
|
||||
@ -172,10 +29,6 @@ class DiscourseRedis
|
||||
yield
|
||||
rescue Redis::CommandError => ex
|
||||
if ex.message =~ /READONLY/
|
||||
if !ENV["REDIS_RAILS_FAILOVER"]
|
||||
fallback_handler.verify_master if !fallback_handler.master
|
||||
end
|
||||
|
||||
Discourse.received_redis_readonly!
|
||||
nil
|
||||
else
|
||||
|
Reference in New Issue
Block a user