From f8863b0f981364340dcd320f63fbfa5dc6233c3b Mon Sep 17 00:00:00 2001 From: jbrw Date: Wed, 25 Jan 2023 15:19:11 -0500 Subject: [PATCH] DEV: Limit concurrency of NotifyReviewables job (#19968) Under scenarios of extremely high load where large numbers of `Reviewable*` items are being created, it has been observed that multiple instances of the `NotifyReviewable` job may run simultaneously. These jobs will work satisfactorily if the concurrency is limited to 1, and the different types of jobs (items reviewable by admins, vs moderators, vs particular groups, etc.) are run eventually. This change introduces a new option to `DistributedMutex` which allows the `max_get_lock_attempts` to be specified. If the number is exceeded an error will be raised, which will cause Sidekiq to requeue the job. Sidekiq has existing logic to back-off on retry times for jobs that have failed multiple times. --- app/jobs/regular/notify_reviewable.rb | 80 ++++++++++++++------------- lib/distributed_mutex.rb | 26 ++++++++- 2 files changed, 64 insertions(+), 42 deletions(-) diff --git a/app/jobs/regular/notify_reviewable.rb b/app/jobs/regular/notify_reviewable.rb index a59adc4ac19..690fb75f0b0 100644 --- a/app/jobs/regular/notify_reviewable.rb +++ b/app/jobs/regular/notify_reviewable.rb @@ -22,58 +22,60 @@ class Jobs::NotifyReviewable < ::Jobs::Base end end - counts = Hash.new(0) + DistributedMutex.synchronize("notify_reviewable_job", validity: 10, max_get_lock_attempts: 1) do + counts = Hash.new(0) - Reviewable.default_visible.pending.each do |r| - counts[:admins] += 1 - counts[:moderators] += 1 if r.reviewable_by_moderator? - counts[r.reviewable_by_group_id] += 1 if r.reviewable_by_group_id - end + Reviewable.default_visible.pending.each do |r| + counts[:admins] += 1 + counts[:moderators] += 1 if r.reviewable_by_moderator? + counts[r.reviewable_by_group_id] += 1 if r.reviewable_by_group_id + end - if SiteSetting.legacy_navigation_menu? - notify_legacy( - User.real.admins.pluck(:id), - count: counts[:admins], - updates: all_updates[:admins], - ) - else - notify_users(User.real.admins, all_updates[:admins]) - end - - if reviewable.reviewable_by_moderator? if SiteSetting.legacy_navigation_menu? notify_legacy( - User.real.moderators.where("id NOT IN (?)", @contacted).pluck(:id), - count: counts[:moderators], - updates: all_updates[:moderators], + User.real.admins.pluck(:id), + count: counts[:admins], + updates: all_updates[:admins], ) else - notify_users( - User.real.moderators.where("id NOT IN (?)", @contacted), - all_updates[:moderators], - ) + notify_users(User.real.admins, all_updates[:admins]) end - end - - if SiteSetting.enable_category_group_moderation? && (group = reviewable.reviewable_by_group) - users = group.users.includes(:group_users).where("users.id NOT IN (?)", @contacted) - - users.find_each do |user| - count = 0 - updates = {} - user.group_users.each do |gu| - updates.merge!(all_updates[gu.group_id]) - count += counts[gu.group_id] - end + if reviewable.reviewable_by_moderator? if SiteSetting.legacy_navigation_menu? - notify_legacy([user.id], count: count, updates: updates) + notify_legacy( + User.real.moderators.where("id NOT IN (?)", @contacted).pluck(:id), + count: counts[:moderators], + updates: all_updates[:moderators], + ) else - notify_user(user, updates) + notify_users( + User.real.moderators.where("id NOT IN (?)", @contacted), + all_updates[:moderators], + ) end end - @contacted += users.pluck(:id) + if SiteSetting.enable_category_group_moderation? && (group = reviewable.reviewable_by_group) + users = group.users.includes(:group_users).where("users.id NOT IN (?)", @contacted) + + users.find_each do |user| + count = 0 + updates = {} + user.group_users.each do |gu| + updates.merge!(all_updates[gu.group_id]) + count += counts[gu.group_id] + end + + if SiteSetting.legacy_navigation_menu? + notify_legacy([user.id], count: count, updates: updates) + else + notify_user(user, updates) + end + end + + @contacted += users.pluck(:id) + end end end diff --git a/lib/distributed_mutex.rb b/lib/distributed_mutex.rb index f7eb3b4e512..df4d04ae962 100644 --- a/lib/distributed_mutex.rb +++ b/lib/distributed_mutex.rb @@ -30,16 +30,28 @@ class DistributedMutex end LUA - def self.synchronize(key, redis: nil, validity: DEFAULT_VALIDITY, &blk) - self.new(key, redis: redis, validity: validity).synchronize(&blk) + def self.synchronize( + key, + redis: nil, + validity: DEFAULT_VALIDITY, + max_get_lock_attempts: nil, + &blk + ) + self.new( + key, + redis: redis, + validity: validity, + max_get_lock_attempts: max_get_lock_attempts, + ).synchronize(&blk) end - def initialize(key, redis: nil, validity: DEFAULT_VALIDITY) + def initialize(key, redis: nil, validity: DEFAULT_VALIDITY, max_get_lock_attempts: nil) @key = key @using_global_redis = true if !redis @redis = redis || Discourse.redis @mutex = Mutex.new @validity = validity + @max_get_lock_attempts = max_get_lock_attempts end # NOTE wrapped in mutex to maintain its semantics @@ -69,11 +81,15 @@ class DistributedMutex result end + class MaximumAttemptsExceeded < StandardError + end + private attr_reader :key attr_reader :redis attr_reader :validity + attr_reader :max_get_lock_attempts def get_lock attempts = 0 @@ -92,6 +108,10 @@ class DistributedMutex if @using_global_redis && Discourse.recently_readonly? && attempts > CHECK_READONLY_ATTEMPTS raise Discourse::ReadOnly end + + if max_get_lock_attempts && attempts > max_get_lock_attempts + raise DistributedMutex::MaximumAttemptsExceeded + end end end