From eeb01ea0de56319c9c700b88995d27e16607d49f Mon Sep 17 00:00:00 2001 From: Alan Guo Xiang Tan Date: Tue, 10 Dec 2024 12:44:56 +0800 Subject: [PATCH] DEV: Remove unnecessary thread in `Jobs::Base::JobInstrumenter` take 2 (#30195) This reverts commit 766ff723f8c216b2bf59053dba302b82e1d34d4b. Ensure that we create the sidekiq log file first before opening it for logging. This avoids any issue of the log file not being present when we initialize an instance of the `Logger`. --- app/jobs/base.rb | 51 ++++++++++++++++++++++--------------- lib/discourse.rb | 13 +++++++++- spec/jobs/jobs_base_spec.rb | 39 ++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 22 deletions(-) diff --git a/app/jobs/base.rb b/app/jobs/base.rb index bf5ad4f64bc..fc7b3c99489 100644 --- a/app/jobs/base.rb +++ b/app/jobs/base.rb @@ -44,6 +44,7 @@ module Jobs class JobInstrumenter def initialize(job_class:, opts:, db:, jid:) return unless enabled? + self.class.mutex.synchronize do @data = {} @@ -75,6 +76,7 @@ module Jobs def stop(exception:) return unless enabled? + self.class.mutex.synchronize do profile = MethodProfiler.stop @@ -102,30 +104,35 @@ module Jobs end def self.raw_log(message) + begin + logger << message + rescue => e + Discourse.warn_exception(e, message: "Exception encountered while logging Sidekiq job") + end + end + + # For test environment only + def self.set_log_path(path) + @@log_path = path + @@logger = nil + end + + # For test environment only + def self.reset_log_path + @@log_path = nil + @@logger = nil + end + + def self.log_path + @@log_path ||= "#{Rails.root}/log/sidekiq.log" + end + + def self.logger @@logger ||= begin - f = File.open "#{Rails.root}/log/sidekiq.log", "a" - f.sync = true - Logger.new f + File.touch(log_path) if !File.exist?(log_path) + Logger.new(log_path) end - - @@log_queue ||= Queue.new - - if !defined?(@@log_thread) || !@@log_thread.alive? - @@log_thread = - Thread.new do - loop do - @@logger << @@log_queue.pop - rescue Exception => e - Discourse.warn_exception( - e, - message: "Exception encountered while logging Sidekiq job", - ) - end - end - end - - @@log_queue.push(message) end def current_duration @@ -259,6 +266,7 @@ module Jobs requeued = true return end + parent_thread = Thread.current cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key @@ -343,6 +351,7 @@ module Jobs keepalive_thread.join self.class.clear_cluster_concurrency_lock! end + ActiveRecord::Base.connection_handler.clear_active_connections! end end diff --git a/lib/discourse.rb b/lib/discourse.rb index 7a269096cec..86066c50c10 100644 --- a/lib/discourse.rb +++ b/lib/discourse.rb @@ -1224,7 +1224,18 @@ module Discourse locale end + # For test environment only + def self.enable_sidekiq_logging + @@sidekiq_logging_enabled = true + end + + # For test environment only + def self.disable_sidekiq_logging + @@sidekiq_logging_enabled = false + end + def self.enable_sidekiq_logging? - ENV["DISCOURSE_LOG_SIDEKIQ"] == "1" + ENV["DISCOURSE_LOG_SIDEKIQ"] == "1" || + (defined?(@@sidekiq_logging_enabled) && @@sidekiq_logging_enabled) end end diff --git a/spec/jobs/jobs_base_spec.rb b/spec/jobs/jobs_base_spec.rb index 4a9cb85dfcd..a6110ac61e2 100644 --- a/spec/jobs/jobs_base_spec.rb +++ b/spec/jobs/jobs_base_spec.rb @@ -148,4 +148,43 @@ RSpec.describe ::Jobs::Base do expect(common_state).to eq(%w[job_2_started job_2_finished job_1_executed]) end end + + context "when `Discourse.enable_sidekiq_logging?` is `true`" do + let(:tmp_log_file) { Tempfile.new("sidekiq.log") } + + before do + Discourse.enable_sidekiq_logging + described_class::JobInstrumenter.set_log_path(tmp_log_file.path) + end + + after do + Discourse.disable_sidekiq_logging + described_class::JobInstrumenter.reset_log_path + tmp_log_file.close + end + + it "should log the job in the sidekiq log file" do + job = GoodJob.new + job.perform({ some_param: "some_value" }) + + parsed_logline = JSON.parse(File.read(tmp_log_file.path).split("\n").first) + + expect(parsed_logline["hostname"]).to be_present + expect(parsed_logline["pid"]).to be_present + expect(parsed_logline["database"]).to eq(RailsMultisite::ConnectionManagement.current_db) + expect(parsed_logline["job_name"]).to eq("GoodJob") + expect(parsed_logline["job_type"]).to eq("regular") + expect(parsed_logline["status"]).to eq("success") + expect(JSON.parse(parsed_logline["opts"])).to eq("some_param" => "some_value") + expect(parsed_logline["duration"]).to be_present + expect(parsed_logline["sql_duration"]).to eq(0) + expect(parsed_logline["sql_calls"]).to eq(0) + expect(parsed_logline["redis_duration"]).to eq(0) + expect(parsed_logline["redis_calls"]).to eq(0) + expect(parsed_logline["net_duration"]).to eq(0) + expect(parsed_logline["net_calls"]).to eq(0) + expect(parsed_logline["live_slots_finish"]).to be_present + expect(parsed_logline["live_slots"]).to be_present + end + end end