From bab73a92851bcae14c52a1cb9d1bbe6b1d8b4a27 Mon Sep 17 00:00:00 2001 From: obdev Date: Sat, 3 Feb 2024 04:51:27 +0000 Subject: [PATCH] [OBCDC] Fix a potential deadlock issue in LOB handling scenarios --- src/logservice/libobcdc/src/ob_cdc_lob_data_merger.h | 2 +- src/logservice/libobcdc/src/ob_log_config.h | 3 ++- src/logservice/libobcdc/src/ob_log_instance.cpp | 10 +++++++--- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.h b/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.h index dedee3136..96959b8ce 100644 --- a/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.h +++ b/src/logservice/libobcdc/src/ob_cdc_lob_data_merger.h @@ -37,7 +37,7 @@ public: virtual void mark_stop_flag() = 0; virtual int push(ObLobDataOutRowCtxList &task, volatile bool &stop_flag) = 0; - virtual void get_task_count(int64_t &log_entry_task_count) const = 0; + virtual void get_task_count(int64_t &lob_data_list_task_count) const = 0; virtual void print_stat_info() = 0; }; diff --git a/src/logservice/libobcdc/src/ob_log_config.h b/src/logservice/libobcdc/src/ob_log_config.h index 7d563f5a4..72df0f806 100644 --- a/src/logservice/libobcdc/src/ob_log_config.h +++ b/src/logservice/libobcdc/src/ob_log_config.h @@ -103,7 +103,8 @@ public: DEF_INT(sequencer_thread_num, OB_CLUSTER_PARAMETER, "5", "[1,]", "sequencer thread number"); DEF_INT(sequencer_queue_length, OB_CLUSTER_PARAMETER, "0", "[0,]", "sequencer queue length"); DEF_INT(formatter_thread_num, OB_CLUSTER_PARAMETER, "10", "[1,]", "formatter thread number"); - DEF_INT(lob_data_merger_thread_num, OB_CLUSTER_PARAMETER, "2", "[1,]", "lob data merger thread number"); + DEF_INT(lob_data_merger_thread_num, OB_CLUSTER_PARAMETER, "5", "[1,]", "lob data merger thread number"); + DEF_INT(lob_data_merger_queue_length, OB_CLUSTER_PARAMETER, "1000000", "[0,]", "lob data merger queue length"); DEF_CAP(batch_buf_size, OB_CLUSTER_PARAMETER, "20MB", "[2MB,]", "batch buf size"); DEF_INT(batch_buf_count, OB_CLUSTER_PARAMETER, "10", "[5,]", "batch buf count"); DEF_INT(storager_thread_num, OB_CLUSTER_PARAMETER, "10", "[1,]", "storager thread number"); diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index 326b7ef3e..20c4c94b7 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -940,7 +940,7 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns) enable_output_hidden_primary_key); INIT(lob_data_merger_, ObCDCLobDataMerger, TCONF.lob_data_merger_thread_num, - CDC_CFG_MGR.get_lob_data_merger_queue_length(), *err_handler); + TCONF.lob_data_merger_queue_length, *err_handler); if (OB_SUCC(ret)) { if (OB_FAIL(lob_aux_meta_storager_.init(store_service_))) { @@ -2769,7 +2769,7 @@ int ObLogInstance::get_task_count_( part_trans_task_resuable_count = 0; if (OB_ISNULL(fetcher_) || OB_ISNULL(dml_parser_) || OB_ISNULL(formatter_) - || OB_ISNULL(storager_) + || OB_ISNULL(storager_) || OB_ISNULL(lob_data_merger_) || OB_ISNULL(sequencer_) || OB_ISNULL(reader_) || OB_ISNULL(committer_) || OB_ISNULL(sys_ls_handler_) || OB_ISNULL(resource_collector_)) { ret = OB_ERR_UNEXPECTED; @@ -2812,6 +2812,8 @@ int ObLogInstance::get_task_count_( // (3) Tasks held by users that have not been returned // (4) tasks held by resource_collector if (OB_SUCC(ret)) { + int64_t lob_data_list_task_count = 0; + lob_data_merger_->get_task_count(lob_data_list_task_count); int64_t committer_ddl_part_trans_task_count = 0; int64_t committer_dml_part_trans_task_count = 0; @@ -2849,7 +2851,9 @@ int ObLogInstance::get_task_count_( seq_stat_info.ready_trans_count_, seq_stat_info.sequenced_trans_count_); _LOG_INFO("[TASK_COUNT_STAT] [READER] [ROW_TASK=%ld]", reader_task_count); _LOG_INFO("[TASK_COUNT_STAT] [DML_PARSER] [LOG_TASK=%ld]", dml_parser_log_count); - _LOG_INFO("[TASK_COUNT_STAT] [FORMATTER] [BR=%ld LOG_TASK=%ld LOB_STMT=%ld]", formatter_br_count, formatter_log_count, stmt_in_lob_merger_count); + _LOG_INFO("[TASK_COUNT_STAT] [FORMATTER] [BR=%ld LOG_TASK=%ld LOB_STMT=%ld]", + formatter_br_count, formatter_log_count, stmt_in_lob_merger_count); + _LOG_INFO("[TASK_COUNT_STAT] [LOB_MERGER] [LOB_LIST_TASK=%ld]", lob_data_list_task_count); _LOG_INFO("[TASK_COUNT_STAT] [SORTER] [TRANS=%ld]", sorter_task_count); _LOG_INFO("[TASK_COUNT_STAT] [COMMITER] [DML_TRANS=%ld DDL_PART_TRANS_TASK=%ld DML_PART_TRANS_TASK=%ld]", committer_pending_dml_trans_count,