DEV: Allow rebakes to generate optimized images at the same time

Previously only Sidekiq was allowed to generate more than one optimized image at the same time per machine. This adds an easy mechanism to allow the same in rake tasks and other tools.
This commit is contained in:
Gerhard Schlager
2024-01-13 23:34:20 +01:00
committed by Gerhard Schlager
parent bcb8b3fab9
commit 241bf48497
4 changed files with 37 additions and 44 deletions

View File

@ -14,15 +14,24 @@ class OptimizedImage < ActiveRecord::Base
# this can very easily lead to runaway CPU so slowing it down is beneficial and it is hijacked # this can very easily lead to runaway CPU so slowing it down is beneficial and it is hijacked
# #
# we can not afford this blocking in Sidekiq cause it can lead to starvation # we can not afford this blocking in Sidekiq cause it can lead to starvation
if Sidekiq.server? if lock_per_machine?
DistributedMutex.synchronize("optimized_image_#{upload_id}_#{width}_#{height}") { yield }
else
DistributedMutex.synchronize("optimized_image_host_#{@hostname}") do DistributedMutex.synchronize("optimized_image_host_#{@hostname}") do
DistributedMutex.synchronize("optimized_image_#{upload_id}_#{width}_#{height}") { yield } DistributedMutex.synchronize("optimized_image_#{upload_id}_#{width}_#{height}") { yield }
end end
else
DistributedMutex.synchronize("optimized_image_#{upload_id}_#{width}_#{height}") { yield }
end end
end end
def self.lock_per_machine?
return @lock_per_machine if defined?(@lock_per_machine)
@lock_per_machine = !Sidekiq.server?
end
def self.lock_per_machine=(value)
@lock_per_machine = value
end
def self.create_for(upload, width, height, opts = {}) def self.create_for(upload, width, height, opts = {})
return if width <= 0 || height <= 0 return if width <= 0 || height <= 0
return if upload.try(:sha1).blank? return if upload.try(:sha1).blank?

View File

@ -720,63 +720,48 @@ desc "Rebake posts that contain polls"
task "import:rebake_uncooked_posts_with_polls" => :environment do task "import:rebake_uncooked_posts_with_polls" => :environment do
log "Rebaking posts with polls" log "Rebaking posts with polls"
Jobs.run_immediately! posts = Post.where("EXISTS (SELECT 1 FROM polls WHERE polls.post_id = posts.id)")
posts = rebake_posts(posts)
Post.where("EXISTS (SELECT 1 FROM polls WHERE polls.post_id = posts.id)").where(
"baked_version <> ? or baked_version IS NULL",
Post::BAKED_VERSION,
)
max_count = posts.count
current_count = 0
posts.find_each(order: :desc) do |post|
post.rebake!
current_count += 1
print "\r%7d / %7d" % [current_count, max_count]
end
end end
desc "Rebake posts that contain events" desc "Rebake posts that contain events"
task "import:rebake_uncooked_posts_with_events" => :environment do task "import:rebake_uncooked_posts_with_events" => :environment do
log "Rebaking posts with events" log "Rebaking posts with events"
Jobs.run_immediately!
posts = posts =
Post.where( Post.where(
"EXISTS (SELECT 1 FROM discourse_post_event_events WHERE discourse_post_event_events.id = posts.id)", "EXISTS (SELECT 1 FROM discourse_post_event_events WHERE discourse_post_event_events.id = posts.id)",
).where("baked_version <> ? or baked_version IS NULL", Post::BAKED_VERSION) )
max_count = posts.count rebake_posts(posts)
current_count = 0
posts.find_each(order: :desc) do |post|
post.rebake!
current_count += 1
print "\r%7d / %7d" % [current_count, max_count]
end
end end
desc "Rebake posts that have tag" desc "Rebake posts that have tag"
task "import:rebake_uncooked_posts_with_tag", [:tag_name] => :environment do |_task, args| task "import:rebake_uncooked_posts_with_tag", [:tag_name] => :environment do |_task, args|
log "Rebaking posts with tag" log "Rebaking posts with tag"
Jobs.run_immediately!
posts = posts =
Post.where( Post.where(
"EXISTS (SELECT 1 FROM topic_tags JOIN tags ON tags.id = topic_tags.tag_id WHERE topic_tags.topic_id = posts.topic_id AND tags.name = ?)", "EXISTS (SELECT 1 FROM topic_tags JOIN tags ON tags.id = topic_tags.tag_id WHERE topic_tags.topic_id = posts.topic_id AND tags.name = ?)",
args[:tag_name], args[:tag_name],
).where("baked_version <> ? or baked_version IS NULL", Post::BAKED_VERSION) )
rebake_posts(posts)
end
def rebake_posts(posts)
Jobs.run_immediately!
OptimizedImage.lock_per_machine = false
max_count = posts.count max_count = posts.count
current_count = 0 current_count = 0
posts.find_each(order: :desc) do |post| posts
post.rebake! .where("baked_version <> ? or baked_version IS NULL", Post::BAKED_VERSION)
current_count += 1 .find_each(order: :desc) do |post|
print "\r%7d / %7d" % [current_count, max_count] post.rebake!
end current_count += 1
print "\r%7d / %7d" % [current_count, max_count]
end
end end

View File

@ -13,6 +13,9 @@ task "posts:rebake_uncooked_posts" => :environment do
# this rake task without worrying about your sidekiq imploding # this rake task without worrying about your sidekiq imploding
Jobs.run_immediately! Jobs.run_immediately!
# don't lock per machine, we want to be able to run this from multiple consoles
OptimizedImage.lock_per_machine = false
ENV["RAILS_DB"] ? rebake_uncooked_posts : rebake_uncooked_posts_all_sites ENV["RAILS_DB"] ? rebake_uncooked_posts : rebake_uncooked_posts_all_sites
end end

View File

@ -6,13 +6,6 @@ require "etc"
require "sqlite3" require "sqlite3"
require "colored2" require "colored2"
# hack so that OptimizedImage.lock beliefs that it's running in a Sidekiq job
module Sidekiq
def self.server?
true
end
end
module BulkImport module BulkImport
class UploadsImporter class UploadsImporter
TRANSACTION_SIZE = 1000 TRANSACTION_SIZE = 1000
@ -356,6 +349,9 @@ module BulkImport
avatar_upload_ids = Set.new avatar_upload_ids = Set.new
max_count = 0 max_count = 0
# allow more than 1 thread to optimized images at the same time
OptimizedImage.lock_per_machine = false
init_threads << Thread.new do init_threads << Thread.new do
query("SELECT id FROM optimized_images", @output_db).tap do |result_set| query("SELECT id FROM optimized_images", @output_db).tap do |result_set|
result_set.each { |row| optimized_upload_ids << row["id"] } result_set.each { |row| optimized_upload_ids << row["id"] }