mirror of
https://github.com/discourse/discourse.git
synced 2025-06-05 14:07:30 +08:00
Use MessageBus to get other processes to failover faster.
This commit is contained in:
@ -9,10 +9,18 @@ class PostgreSQLFallbackHandler
|
|||||||
attr_reader :masters_down
|
attr_reader :masters_down
|
||||||
attr_accessor :initialized
|
attr_accessor :initialized
|
||||||
|
|
||||||
|
DATABASE_DOWN_CHANNEL = '/global/database_down'.freeze
|
||||||
|
|
||||||
def initialize
|
def initialize
|
||||||
@masters_down = DistributedCache.new('masters_down', namespace: false)
|
@masters_down = DistributedCache.new('masters_down', namespace: false)
|
||||||
@mutex = Mutex.new
|
@mutex = Mutex.new
|
||||||
@initialized = false
|
@initialized = false
|
||||||
|
|
||||||
|
MessageBus.subscribe(DATABASE_DOWN_CHANNEL) do |payload|
|
||||||
|
RailsMultisite::ConnectionManagement.with_connection(payload.data['db']) do
|
||||||
|
clear_connections
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def verify_master
|
def verify_master
|
||||||
@ -38,10 +46,11 @@ class PostgreSQLFallbackHandler
|
|||||||
synchronize { @masters_down[namespace] }
|
synchronize { @masters_down[namespace] }
|
||||||
end
|
end
|
||||||
|
|
||||||
def master_down=(args)
|
def master_down
|
||||||
synchronize do
|
synchronize do
|
||||||
@masters_down[namespace] = args
|
@masters_down[namespace] = true
|
||||||
Sidekiq.pause! if args && !Sidekiq.paused?
|
Sidekiq.pause! if !Sidekiq.paused?
|
||||||
|
MessageBus.publish(DATABASE_DOWN_CHANNEL, db: namespace)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -63,8 +72,7 @@ class PostgreSQLFallbackHandler
|
|||||||
|
|
||||||
if is_connection_active
|
if is_connection_active
|
||||||
logger.warn "#{log_prefix}: Master server is active. Reconnecting..."
|
logger.warn "#{log_prefix}: Master server is active. Reconnecting..."
|
||||||
ActiveRecord::Base.clear_active_connections!
|
clear_connections
|
||||||
ActiveRecord::Base.clear_all_connections!
|
|
||||||
self.master_up(key)
|
self.master_up(key)
|
||||||
disable_readonly_mode
|
disable_readonly_mode
|
||||||
Sidekiq.unpause!
|
Sidekiq.unpause!
|
||||||
@ -82,6 +90,11 @@ class PostgreSQLFallbackHandler
|
|||||||
disable_readonly_mode
|
disable_readonly_mode
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def clear_connections
|
||||||
|
ActiveRecord::Base.clear_active_connections!
|
||||||
|
ActiveRecord::Base.clear_all_connections!
|
||||||
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def disable_readonly_mode
|
def disable_readonly_mode
|
||||||
@ -130,12 +143,13 @@ module ActiveRecord
|
|||||||
connection = postgresql_connection(config)
|
connection = postgresql_connection(config)
|
||||||
fallback_handler.initialized ||= true
|
fallback_handler.initialized ||= true
|
||||||
rescue PG::ConnectionBad => e
|
rescue PG::ConnectionBad => e
|
||||||
fallback_handler.master_down = true
|
fallback_handler.master_down
|
||||||
fallback_handler.verify_master
|
fallback_handler.verify_master
|
||||||
|
|
||||||
if !fallback_handler.initialized
|
if !fallback_handler.initialized
|
||||||
return postgresql_fallback_connection(config)
|
return postgresql_fallback_connection(config)
|
||||||
else
|
else
|
||||||
|
fallback_handler.clear_connections
|
||||||
raise e
|
raise e
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -30,9 +30,7 @@ describe ActiveRecord::ConnectionHandling do
|
|||||||
postgresql_fallback_handler.initialized = true
|
postgresql_fallback_handler.initialized = true
|
||||||
|
|
||||||
['default', multisite_db].each do |db|
|
['default', multisite_db].each do |db|
|
||||||
with_multisite_db(db) do
|
postgresql_fallback_handler.master_up(db)
|
||||||
postgresql_fallback_handler.master_down = false
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -75,10 +73,14 @@ describe ActiveRecord::ConnectionHandling do
|
|||||||
).returns(@replica_connection)
|
).returns(@replica_connection)
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(postgresql_fallback_handler.master_down?).to eq(false)
|
expect(postgresql_fallback_handler.master_down?).to eq(nil)
|
||||||
|
|
||||||
expect { ActiveRecord::Base.postgresql_fallback_connection(config) }
|
message = MessageBus.track_publish(PostgreSQLFallbackHandler::DATABASE_DOWN_CHANNEL) do
|
||||||
.to raise_error(PG::ConnectionBad)
|
expect { ActiveRecord::Base.postgresql_fallback_connection(config) }
|
||||||
|
.to raise_error(PG::ConnectionBad)
|
||||||
|
end.first
|
||||||
|
|
||||||
|
expect(message.data[:db]).to eq('default')
|
||||||
|
|
||||||
expect { ActiveRecord::Base.postgresql_fallback_connection(config) }
|
expect { ActiveRecord::Base.postgresql_fallback_connection(config) }
|
||||||
.to change { Discourse.readonly_mode? }.from(false).to(true)
|
.to change { Discourse.readonly_mode? }.from(false).to(true)
|
||||||
@ -87,10 +89,14 @@ describe ActiveRecord::ConnectionHandling do
|
|||||||
expect(Sidekiq.paused?).to eq(true)
|
expect(Sidekiq.paused?).to eq(true)
|
||||||
|
|
||||||
with_multisite_db(multisite_db) do
|
with_multisite_db(multisite_db) do
|
||||||
expect(postgresql_fallback_handler.master_down?).to eq(false)
|
expect(postgresql_fallback_handler.master_down?).to eq(nil)
|
||||||
|
|
||||||
expect { ActiveRecord::Base.postgresql_fallback_connection(multisite_config) }
|
message = MessageBus.track_publish(PostgreSQLFallbackHandler::DATABASE_DOWN_CHANNEL) do
|
||||||
.to raise_error(PG::ConnectionBad)
|
expect { ActiveRecord::Base.postgresql_fallback_connection(multisite_config) }
|
||||||
|
.to raise_error(PG::ConnectionBad)
|
||||||
|
end.first
|
||||||
|
|
||||||
|
expect(message.data[:db]).to eq(multisite_db)
|
||||||
|
|
||||||
expect { ActiveRecord::Base.postgresql_fallback_connection(multisite_config) }
|
expect { ActiveRecord::Base.postgresql_fallback_connection(multisite_config) }
|
||||||
.to change { Discourse.readonly_mode? }.from(false).to(true)
|
.to change { Discourse.readonly_mode? }.from(false).to(true)
|
||||||
|
Reference in New Issue
Block a user