mirror of
https://github.com/discourse/discourse.git
synced 2025-06-01 04:19:01 +08:00
Remove Concurrent::TimerTask which spawns a long lasting Thread.
This commit is contained in:
@ -1,45 +1,86 @@
|
||||
require 'active_record/connection_adapters/abstract_adapter'
|
||||
require 'active_record/connection_adapters/postgresql_adapter'
|
||||
require 'discourse'
|
||||
require 'concurrent'
|
||||
|
||||
class TaskObserver
|
||||
def update(time, result, ex)
|
||||
if result
|
||||
logger.info { "PG connection heartbeat successfully returned #{result}" }
|
||||
elsif ex.is_a?(Concurrent::TimeoutError)
|
||||
logger.warning { "PG connection heartbeat timed out".freeze }
|
||||
else
|
||||
if ex.message.include?("PG::UnableToSend")
|
||||
logger.info { "PG connection heartbeat: Master connection is not active.".freeze }
|
||||
else
|
||||
logger.error { "PG connection heartbeat failed with error: \"#{ex}\"" }
|
||||
end
|
||||
class PostgreSQLFallbackHandler
|
||||
include Singleton
|
||||
|
||||
attr_reader :running
|
||||
attr_accessor :master
|
||||
|
||||
def initialize
|
||||
@master = true
|
||||
@running = false
|
||||
end
|
||||
|
||||
def verify_master
|
||||
return if @running && recently_checked?
|
||||
@running = true
|
||||
|
||||
Thread.new do
|
||||
begin
|
||||
logger.info "#{self.class}: Checking master server..."
|
||||
connection = ActiveRecord::Base.postgresql_connection(config)
|
||||
|
||||
if connection.active?
|
||||
logger.info "#{self.class}: Master server is active. Reconnecting..."
|
||||
ActiveRecord::Base.remove_connection
|
||||
ActiveRecord::Base.establish_connection(config)
|
||||
Discourse.disable_readonly_mode
|
||||
@master = true
|
||||
end
|
||||
rescue => e
|
||||
if e.message.include?("could not connect to server")
|
||||
logger.warn "#{self.class}: Connection to master PostgreSQL server failed with '#{e.message}'"
|
||||
else
|
||||
raise e
|
||||
end
|
||||
ensure
|
||||
@last_check = Time.zone.now
|
||||
@running = false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def config
|
||||
ActiveRecord::Base.configurations[Rails.env]
|
||||
end
|
||||
|
||||
def logger
|
||||
Rails.logger
|
||||
end
|
||||
|
||||
def recently_checked?
|
||||
if @last_check
|
||||
Time.zone.now <= @last_check + 5.seconds
|
||||
else
|
||||
false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
module ActiveRecord
|
||||
module ConnectionHandling
|
||||
def postgresql_fallback_connection(config)
|
||||
begin
|
||||
connection = postgresql_connection(config)
|
||||
rescue PG::ConnectionBad => e
|
||||
fallback_handler = ::PostgreSQLFallbackHandler.instance
|
||||
config = config.symbolize_keys
|
||||
|
||||
if !fallback_handler.master
|
||||
connection = postgresql_connection(config.dup.merge({
|
||||
"host" => config["replica_host"], "port" => config["replica_port"]
|
||||
host: config[:replica_host], port: config[:replica_port]
|
||||
}))
|
||||
|
||||
verify_replica(connection)
|
||||
|
||||
Discourse.enable_readonly_mode if !Discourse.readonly_mode?
|
||||
|
||||
start_connection_heartbeart(connection, config)
|
||||
Discourse.enable_readonly_mode
|
||||
else
|
||||
begin
|
||||
connection = postgresql_connection(config)
|
||||
rescue PG::ConnectionBad => e
|
||||
fallback_handler.master = false
|
||||
raise e
|
||||
end
|
||||
end
|
||||
|
||||
connection
|
||||
@ -51,24 +92,23 @@ module ActiveRecord
|
||||
value = connection.raw_connection.exec("SELECT pg_is_in_recovery()").values[0][0]
|
||||
raise "Replica database server is not in recovery mode." if value == 'f'
|
||||
end
|
||||
end
|
||||
|
||||
def interval
|
||||
5
|
||||
end
|
||||
module ConnectionAdapters
|
||||
class PostgreSQLAdapter
|
||||
set_callback :checkout, :before, :switch_back?
|
||||
|
||||
def start_connection_heartbeart(existing_connection, config)
|
||||
timer_task = Concurrent::TimerTask.new(execution_interval: interval) do |task|
|
||||
connection = postgresql_connection(config)
|
||||
private
|
||||
|
||||
if connection.active?
|
||||
existing_connection.disconnect!
|
||||
Discourse.disable_readonly_mode if Discourse.readonly_mode?
|
||||
task.shutdown
|
||||
end
|
||||
def fallback_handler
|
||||
@fallback_handler ||= ::PostgreSQLFallbackHandler.instance
|
||||
end
|
||||
|
||||
timer_task.add_observer(TaskObserver.new)
|
||||
timer_task.execute
|
||||
def switch_back?
|
||||
if !fallback_handler.master && !fallback_handler.running
|
||||
fallback_handler.verify_master
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Reference in New Issue
Block a user