mirror of
https://github.com/discourse/discourse.git
synced 2025-05-22 07:53:49 +08:00
FEATURE: Master-Slave Redis configuration with fallback and switch over.
This commit is contained in:
@ -3,6 +3,93 @@
|
||||
#
|
||||
require_dependency 'cache'
|
||||
class DiscourseRedis
|
||||
class FallbackHandler
|
||||
include Singleton
|
||||
|
||||
MASTER_LINK_STATUS = "master_link_status:up".freeze
|
||||
|
||||
def initialize
|
||||
@master = true
|
||||
@running = false
|
||||
@mutex = Mutex.new
|
||||
@slave_config = DiscourseRedis.slave_config
|
||||
end
|
||||
|
||||
def verify_master
|
||||
synchronize do
|
||||
return if @running && !recently_checked?
|
||||
@running = true
|
||||
end
|
||||
|
||||
Thread.new { initiate_fallback_to_master }
|
||||
end
|
||||
|
||||
def initiate_fallback_to_master
|
||||
begin
|
||||
slave_client = ::Redis::Client.new(@slave_config)
|
||||
|
||||
if slave_client.call([:info]).split("\r\n").include?(MASTER_LINK_STATUS)
|
||||
slave_client.call([:client, [:kill, 'type', 'normal']])
|
||||
Discourse.clear_readonly!
|
||||
Discourse.request_refresh!
|
||||
@master = true
|
||||
end
|
||||
ensure
|
||||
@running = false
|
||||
@last_checked = Time.zone.now
|
||||
slave_client.disconnect
|
||||
end
|
||||
end
|
||||
|
||||
def master
|
||||
synchronize { @master }
|
||||
end
|
||||
|
||||
def master=(args)
|
||||
synchronize { @master = args }
|
||||
end
|
||||
|
||||
def recently_checked?
|
||||
if @last_checked
|
||||
Time.zone.now > (@last_checked + 5.seconds)
|
||||
else
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def synchronize
|
||||
@mutex.synchronize { yield }
|
||||
end
|
||||
end
|
||||
|
||||
class Connector < Redis::Client::Connector
|
||||
MASTER = 'master'.freeze
|
||||
SLAVE = 'slave'.freeze
|
||||
|
||||
def initialize(options)
|
||||
super(options)
|
||||
@slave_options = DiscourseRedis.slave_config(options)
|
||||
@fallback_handler = DiscourseRedis::FallbackHandler.instance
|
||||
end
|
||||
|
||||
def resolve
|
||||
begin
|
||||
options = @options.dup
|
||||
options.delete(:connector)
|
||||
client = ::Redis::Client.new(options)
|
||||
client.call([:role])[0]
|
||||
@options
|
||||
rescue Redis::ConnectionError, Redis::CannotConnectError => ex
|
||||
return @slave_options if !@fallback_handler.master
|
||||
@fallback_handler.master = false
|
||||
raise ex
|
||||
ensure
|
||||
client.disconnect
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.raw_connection(config = nil)
|
||||
config ||= self.config
|
||||
@ -13,11 +100,19 @@ class DiscourseRedis
|
||||
GlobalSetting.redis_config
|
||||
end
|
||||
|
||||
def self.slave_config(options = config)
|
||||
options.dup.merge!({ host: options[:slave_host], port: options[:slave_port] })
|
||||
end
|
||||
|
||||
def initialize(config=nil)
|
||||
@config = config || DiscourseRedis.config
|
||||
@redis = DiscourseRedis.raw_connection(@config)
|
||||
end
|
||||
|
||||
def self.fallback_handler
|
||||
@fallback_handler ||= DiscourseRedis::FallbackHandler.instance
|
||||
end
|
||||
|
||||
def without_namespace
|
||||
# Only use this if you want to store and fetch data that's shared between sites
|
||||
@redis
|
||||
@ -30,6 +125,8 @@ class DiscourseRedis
|
||||
unless Discourse.recently_readonly?
|
||||
STDERR.puts "WARN: Redis is in a readonly state. Performed a noop"
|
||||
end
|
||||
|
||||
fallback_handler.verify_master if !fallback_handler.master
|
||||
Discourse.received_readonly!
|
||||
else
|
||||
raise ex
|
||||
|
Reference in New Issue
Block a user