[OBCDC] Fix a potential deadlock issue in LOB handling scenarios
This commit is contained in:
parent
220df7f12d
commit
bab73a9285
@ -37,7 +37,7 @@ public:
|
||||
virtual void mark_stop_flag() = 0;
|
||||
|
||||
virtual int push(ObLobDataOutRowCtxList &task, volatile bool &stop_flag) = 0;
|
||||
virtual void get_task_count(int64_t &log_entry_task_count) const = 0;
|
||||
virtual void get_task_count(int64_t &lob_data_list_task_count) const = 0;
|
||||
virtual void print_stat_info() = 0;
|
||||
};
|
||||
|
||||
|
@ -103,7 +103,8 @@ public:
|
||||
DEF_INT(sequencer_thread_num, OB_CLUSTER_PARAMETER, "5", "[1,]", "sequencer thread number");
|
||||
DEF_INT(sequencer_queue_length, OB_CLUSTER_PARAMETER, "0", "[0,]", "sequencer queue length");
|
||||
DEF_INT(formatter_thread_num, OB_CLUSTER_PARAMETER, "10", "[1,]", "formatter thread number");
|
||||
DEF_INT(lob_data_merger_thread_num, OB_CLUSTER_PARAMETER, "2", "[1,]", "lob data merger thread number");
|
||||
DEF_INT(lob_data_merger_thread_num, OB_CLUSTER_PARAMETER, "5", "[1,]", "lob data merger thread number");
|
||||
DEF_INT(lob_data_merger_queue_length, OB_CLUSTER_PARAMETER, "1000000", "[0,]", "lob data merger queue length");
|
||||
DEF_CAP(batch_buf_size, OB_CLUSTER_PARAMETER, "20MB", "[2MB,]", "batch buf size");
|
||||
DEF_INT(batch_buf_count, OB_CLUSTER_PARAMETER, "10", "[5,]", "batch buf count");
|
||||
DEF_INT(storager_thread_num, OB_CLUSTER_PARAMETER, "10", "[1,]", "storager thread number");
|
||||
|
@ -940,7 +940,7 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
|
||||
enable_output_hidden_primary_key);
|
||||
|
||||
INIT(lob_data_merger_, ObCDCLobDataMerger, TCONF.lob_data_merger_thread_num,
|
||||
CDC_CFG_MGR.get_lob_data_merger_queue_length(), *err_handler);
|
||||
TCONF.lob_data_merger_queue_length, *err_handler);
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(lob_aux_meta_storager_.init(store_service_))) {
|
||||
@ -2769,7 +2769,7 @@ int ObLogInstance::get_task_count_(
|
||||
part_trans_task_resuable_count = 0;
|
||||
|
||||
if (OB_ISNULL(fetcher_) || OB_ISNULL(dml_parser_) || OB_ISNULL(formatter_)
|
||||
|| OB_ISNULL(storager_)
|
||||
|| OB_ISNULL(storager_) || OB_ISNULL(lob_data_merger_)
|
||||
|| OB_ISNULL(sequencer_) || OB_ISNULL(reader_) || OB_ISNULL(committer_)
|
||||
|| OB_ISNULL(sys_ls_handler_) || OB_ISNULL(resource_collector_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2812,6 +2812,8 @@ int ObLogInstance::get_task_count_(
|
||||
// (3) Tasks held by users that have not been returned
|
||||
// (4) tasks held by resource_collector
|
||||
if (OB_SUCC(ret)) {
|
||||
int64_t lob_data_list_task_count = 0;
|
||||
lob_data_merger_->get_task_count(lob_data_list_task_count);
|
||||
int64_t committer_ddl_part_trans_task_count = 0;
|
||||
int64_t committer_dml_part_trans_task_count = 0;
|
||||
|
||||
@ -2849,7 +2851,9 @@ int ObLogInstance::get_task_count_(
|
||||
seq_stat_info.ready_trans_count_, seq_stat_info.sequenced_trans_count_);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [READER] [ROW_TASK=%ld]", reader_task_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [DML_PARSER] [LOG_TASK=%ld]", dml_parser_log_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [FORMATTER] [BR=%ld LOG_TASK=%ld LOB_STMT=%ld]", formatter_br_count, formatter_log_count, stmt_in_lob_merger_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [FORMATTER] [BR=%ld LOG_TASK=%ld LOB_STMT=%ld]",
|
||||
formatter_br_count, formatter_log_count, stmt_in_lob_merger_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [LOB_MERGER] [LOB_LIST_TASK=%ld]", lob_data_list_task_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [SORTER] [TRANS=%ld]", sorter_task_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [COMMITER] [DML_TRANS=%ld DDL_PART_TRANS_TASK=%ld DML_PART_TRANS_TASK=%ld]",
|
||||
committer_pending_dml_trans_count,
|
||||
|
Loading…
x
Reference in New Issue
Block a user