mirror of
https://github.com/discourse/discourse.git
synced 2025-05-25 00:32:52 +08:00
DEV: Migrate Sidekiq to a dedicated Redis DB
As we’re currently using a namespace for Sidekiq, in order to upgrade to the latest version, we need to drop it as it’s not supported anymore. The recommended way is to use a different Redis DB for Sidekiq. This patch uses a different config for Sidekiq and also takes care of migrating existing jobs (in queues and the retry and scheduled sets).
This commit is contained in:

committed by
Loïc Guitaut

parent
80625f6c1c
commit
b9dd9c70a5
@ -2,7 +2,10 @@
|
||||
|
||||
# Multisite freedom patch defines RailsMultisite::DiscoursePatches.config which is used by 200-first_middlewares.rb
|
||||
# Therefore it can not be postponed with .to_prepare
|
||||
RUN_WITHOUT_PREPARE = ["#{Rails.root}/lib/freedom_patches/rails_multisite.rb"]
|
||||
RUN_WITHOUT_PREPARE = [
|
||||
"#{Rails.root}/lib/freedom_patches/rails_multisite.rb",
|
||||
"#{Rails.root}/lib/freedom_patches/sidekiq.rb",
|
||||
]
|
||||
RUN_WITHOUT_PREPARE.each { |path| require(path) }
|
||||
|
||||
Rails.application.reloader.to_prepare do
|
||||
|
@ -84,7 +84,7 @@ Rails.application.config.to_prepare do
|
||||
Dir.glob("#{Rails.root}/app/jobs/scheduled/*.rb") { |f| require(f) } if Rails.env.development?
|
||||
|
||||
MiniScheduler.configure do |config|
|
||||
config.redis = Discourse.redis
|
||||
config.redis = DiscourseRedis.new(Discourse.sidekiq_redis_config)
|
||||
|
||||
config.job_exception_handler { |ex, context| Discourse.handle_job_exception(ex, context) }
|
||||
|
||||
|
13
db/post_migrate/20250227142351_migrate_sidekiq_jobs.rb
Normal file
13
db/post_migrate/20250227142351_migrate_sidekiq_jobs.rb
Normal file
@ -0,0 +1,13 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# Delete this migration instead of promoting it (along with the
|
||||
# `SidekiqMigration` class)
|
||||
class MigrateSidekiqJobs < ActiveRecord::Migration[7.2]
|
||||
def up
|
||||
SidekiqMigration.call
|
||||
end
|
||||
|
||||
def down
|
||||
raise ActiveRecord::IrreversibleMigration
|
||||
end
|
||||
end
|
@ -1047,10 +1047,10 @@ module Discourse
|
||||
|
||||
SIDEKIQ_NAMESPACE = "sidekiq"
|
||||
|
||||
def self.sidekiq_redis_config
|
||||
conf = GlobalSetting.redis_config.dup
|
||||
conf[:namespace] = SIDEKIQ_NAMESPACE
|
||||
conf
|
||||
def self.sidekiq_redis_config(old: false)
|
||||
redis_config = GlobalSetting.redis_config.dup
|
||||
return redis_config.merge(namespace: SIDEKIQ_NAMESPACE) if old
|
||||
redis_config.merge(db: redis_config[:db].to_i + 1)
|
||||
end
|
||||
|
||||
def self.static_doc_topic_ids
|
||||
|
14
lib/freedom_patches/sidekiq.rb
Normal file
14
lib/freedom_patches/sidekiq.rb
Normal file
@ -0,0 +1,14 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# TODO (2025-03-03): delete this once the migration is propagated everywhere
|
||||
# (in about 6 months or so)
|
||||
module Sidekiq
|
||||
def self.redis_pool
|
||||
@redis ||= RedisConnection.create
|
||||
Thread.current[:sidekiq_via_pool] || @redis
|
||||
end
|
||||
|
||||
def self.old_pool
|
||||
@old_pool ||= RedisConnection.create(Discourse.sidekiq_redis_config(old: true))
|
||||
end
|
||||
end
|
40
lib/sidekiq_migration.rb
Normal file
40
lib/sidekiq_migration.rb
Normal file
@ -0,0 +1,40 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# TODO (2025-03-03): delete this once the migration is propagated everywhere
|
||||
# (in about 6 months or so)
|
||||
class SidekiqMigration
|
||||
delegate :old_pool, to: :Sidekiq
|
||||
|
||||
def self.call
|
||||
new.call
|
||||
end
|
||||
|
||||
def call
|
||||
migrate_all_queues
|
||||
migrate(klass: Sidekiq::RetrySet)
|
||||
migrate(klass: Sidekiq::ScheduledSet)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def migrate_all_queues
|
||||
migrate(
|
||||
old_jobs: -> { Sidekiq::Queue.all.flat_map(&:to_a) },
|
||||
enqueue_jobs: ->(job) { client.push(job.item) },
|
||||
)
|
||||
end
|
||||
|
||||
def migrate(
|
||||
klass: nil,
|
||||
old_jobs: -> { klass.new.to_a },
|
||||
enqueue_jobs: ->(job) { klass.new.schedule(job.score, job.item) }
|
||||
)
|
||||
jobs_to_migrate = Sidekiq::Client.via(old_pool, &old_jobs)
|
||||
jobs_to_migrate.each(&enqueue_jobs)
|
||||
Sidekiq::Client.via(old_pool) { jobs_to_migrate.each(&:delete) }
|
||||
end
|
||||
|
||||
def client
|
||||
@client ||= Sidekiq::Client.new
|
||||
end
|
||||
end
|
Reference in New Issue
Block a user