diff --git a/src/logservice/libobcdc/src/ob_log_config.h b/src/logservice/libobcdc/src/ob_log_config.h index 3901bd086b..877845e5f6 100644 --- a/src/logservice/libobcdc/src/ob_log_config.h +++ b/src/logservice/libobcdc/src/ob_log_config.h @@ -411,7 +411,7 @@ public: DEF_CAP(extra_redo_dispatch_memory_size, OB_CLUSTER_PARAMETER, "0KB", "[0, 512M]", "extra redo dispatcher memory for data skew participant"); T_DEF_INT(pause_redo_dispatch_task_count_threshold, OB_CLUSTER_PARAMETER, 80, 0, 100, "task cound percent threshold for pause redo dispatch"); T_DEF_INT(memory_usage_warn_threshold, OB_CLUSTER_PARAMETER, 85, 10, 100, "memory usage wan threshold, may pause fetch while reach the threshold"); - T_DEF_INT_INFT(queue_backlog_lowest_tolerance, OB_CLUSTER_PARAMETER, 500, 0, "lowest threshold of queue_backlog that will touch redo_dispatch flow controll"); + T_DEF_INT_INFT(queue_backlog_lowest_tolerance, OB_CLUSTER_PARAMETER, 100, 0, "lowest threshold of queue_backlog that will touch flow controll"); // sorter thread num T_DEF_INT(msg_sorter_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "trans msg sorter thread num"); // sorter thread diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index db4d429517..2b65b84501 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -2535,6 +2535,7 @@ void ObLogInstance::global_flow_control_() int64_t active_part_trans_task_count = trans_task_pool_.get_alloc_count(); int64_t active_log_entry_task_count = log_entry_task_pool_->get_alloc_count(); int64_t reusable_part_trans_task_count = 0; + int64_t ddl_part_trans_count = 0; int64_t ready_to_seq_task_count = 0; int64_t seq_queue_trans_count = 0; @@ -2559,15 +2560,17 @@ void ObLogInstance::global_flow_control_() const bool need_pause_dispatch = need_pause_redo_dispatch(); const bool touch_memory_warn_limit = (memory_hold > memory_warn_usage); const bool is_storage_work_mode = is_storage_working_mode(working_mode_); + const int64_t queue_backlog_lowest_tolerance = TCONF.queue_backlog_lowest_tolerance; const char *reason = ""; - if (OB_FAIL(get_task_count_(ready_to_seq_task_count, seq_queue_trans_count, reusable_part_trans_task_count))) { + if (OB_FAIL(get_task_count_(ready_to_seq_task_count, seq_queue_trans_count, reusable_part_trans_task_count, ddl_part_trans_count))) { LOG_ERROR("get_task_count fail", KR(ret), K(ready_to_seq_task_count), K(seq_queue_trans_count), - K(reusable_part_trans_task_count)); + K(reusable_part_trans_task_count), K(ddl_part_trans_count)); } else if (OB_FAIL(dml_parser_->get_log_entry_task_count(dml_parser_part_trans_task_count))) { LOG_ERROR("DML parser get_log_entry_task_count fail", KR(ret), K(dml_parser_part_trans_task_count)); } else { - const bool is_seq_queue_not_empty = (seq_queue_trans_count > 0); + const bool exist_trans_sequenced_not_handled = (seq_queue_trans_count > queue_backlog_lowest_tolerance); + const bool exist_ddl_processing_or_in_queue = (ddl_part_trans_count > queue_backlog_lowest_tolerance); int64_t storager_task_count = 0; int64_t storager_log_count = 0; storager_->get_task_count(storager_task_count, storager_log_count); @@ -2581,7 +2584,9 @@ void ObLogInstance::global_flow_control_() // OR // (3) memory is limited and exist trans sequenced but not output // OR - // (4) memory_limit touch warn threshold and need_pause_dispatch + // (4) memory is limited and exist ddl_trans in to handle or handling + // OR + // (5) memory_limit touch warn threshold and need_pause_dispatch bool condition1 = (active_part_trans_task_count >= part_trans_task_active_count_upper_bound) || touch_memory_warn_limit || (system_memory_avail < system_memory_avail_lower_bound); @@ -2589,15 +2594,17 @@ void ObLogInstance::global_flow_control_() || (ready_to_seq_task_count > ready_to_seq_task_upper_bound); bool condition3 = (storager_task_count > storager_task_count_upper_bound) && (memory_hold >= storager_mem_percentage * memory_limit); - need_slow_down_fetcher = (condition1 && (condition2 || need_pause_dispatch || is_seq_queue_not_empty)) || condition3; + need_slow_down_fetcher = (condition1 && (condition2 || need_pause_dispatch || exist_trans_sequenced_not_handled || exist_ddl_processing_or_in_queue)) || condition3; if (need_slow_down_fetcher) { if (condition2) { reason = "MEMORY_LIMIT_AND_REUSABLE_PART_TOO_MUCH"; } else if (need_pause_dispatch) { reason = "MEMORY_LIMIT_AND_DISPATCH_PAUSED"; - } else if (is_seq_queue_not_empty) { + } else if (exist_trans_sequenced_not_handled) { reason = "MEMORY_LIMIT_AND_EXIST_TRANS_TO_OUTPUT"; + } else if (exist_ddl_processing_or_in_queue) { + reason = "MEMORY_LIMIT_AND_EXIST_DDL_TRANS_TO_HANDLE"; } else if (condition3) { reason = "STORAGER_TASK_OVER_THRESHOLD"; } else { @@ -2624,7 +2631,7 @@ void ObLogInstance::global_flow_control_() "PART_TRANS(TOTAL=%ld, ACTIVE=%ld/%ld, REUSABLE=%ld/%ld) " "LOG_TASK(ACTIVE=%ld) " "STORE(%ld/%ld) " - "[FETCHER=%ld DML_PARSER=%ld " + "[FETCHER=%ld DML_PARSER=%ld DDL=%ld " "COMMITER=%ld USER_QUEUE=%ld OUT=%ld RC=%ld] " "DIST_TRANS(SEQ_QUEUE=%ld, SEQ=%ld, COMMITTED=%ld) " "NEED_PAUSE_DISPATCH=%d REASON=%s", @@ -2637,7 +2644,7 @@ void ObLogInstance::global_flow_control_() reusable_part_trans_task_count, part_trans_task_reusable_count_upper_bound, active_log_entry_task_count, storager_task_count, storager_task_count_upper_bound, - fetcher_part_trans_task_count, dml_parser_part_trans_task_count, + fetcher_part_trans_task_count, dml_parser_part_trans_task_count, ddl_part_trans_count, committer_ddl_part_trans_task_count + committer_dml_part_trans_task_count, br_queue_part_trans_task_count, out_part_trans_task_count, resource_collector_part_trans_task_count, @@ -2799,7 +2806,8 @@ int64_t ObLogInstance::get_memory_limit_() const int ObLogInstance::get_task_count_( int64_t &ready_to_seq_task_count, int64_t &seq_trans_count, - int64_t &part_trans_task_resuable_count) + int64_t &part_trans_task_resuable_count, + int64_t &ddl_part_trans_count) { int ret = OB_SUCCESS; ready_to_seq_task_count = 0; @@ -2858,7 +2866,8 @@ int ObLogInstance::get_task_count_( int64_t fetcher_part_trans_task_count = fetcher_->get_part_trans_task_count(); committer_->get_part_trans_task_count(committer_ddl_part_trans_task_count, committer_dml_part_trans_task_count); - int64_t sys_ls_handle_part_trans_task_count = sys_ls_handler_->get_part_trans_task_count(); + int64_t sys_ls_handle_part_trans_task_count = 0; + sys_ls_handler_->get_task_count(sys_ls_handle_part_trans_task_count, ddl_part_trans_count); int64_t br_queue_part_trans_task_count = br_queue_.get_part_trans_task_count(); int64_t out_part_trans_task_count = get_out_part_trans_task_count_(); int64_t resource_collector_part_trans_task_count = 0; @@ -2881,7 +2890,8 @@ int ObLogInstance::get_task_count_( if (REACH_TIME_INTERVAL(PRINT_GLOBAL_FLOW_CONTROL_INTERVAL)) { _LOG_INFO("------------------------------------------------------------"); _LOG_INFO("[TASK_COUNT_STAT] [FETCHER] [PART_TRANS_TASK=%ld]", fetcher_part_trans_task_count); - _LOG_INFO("[TASK_COUNT_STAT] [SYS_LS_HANDLE] [PART_TRANS_TASK=%ld]", sys_ls_handle_part_trans_task_count); + _LOG_INFO("[TASK_COUNT_STAT] [SYS_LS_HANDLE] [PART_TRANS_TASK=%ld][DDL_QUEUED=%ld]", + sys_ls_handle_part_trans_task_count, ddl_part_trans_count); _LOG_INFO("[TASK_COUNT_STAT] [STORAGER] [LOG_TASK=%ld/%ld]", storager_task_count, storager_log_count); _LOG_INFO("[TASK_COUNT_STAT] [SEQUENCER] [PART_TRANS_TASK(QUEUE=%ld TOTAL=[%ld][DDL=%ld DML=%ld HB=%ld])] [TRANS(READY=%ld SEQ=%ld)]", seq_stat_info.queue_part_trans_task_count_, seq_stat_info.total_part_trans_task_count_, diff --git a/src/logservice/libobcdc/src/ob_log_instance.h b/src/logservice/libobcdc/src/ob_log_instance.h index 4518b0e6ff..ac05d9b85f 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.h +++ b/src/logservice/libobcdc/src/ob_log_instance.h @@ -244,7 +244,8 @@ private: int get_task_count_( int64_t &ready_to_seq_task_count, int64_t &seq_trans_count, - int64_t &part_trans_task_resuable_count); + int64_t &part_trans_task_resuable_count, + int64_t &ddl_part_trans_count); // next record void do_drc_consume_tps_stat_(); diff --git a/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.cpp b/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.cpp index 60c5c6f73a..ee683fbfc2 100644 --- a/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.cpp +++ b/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.cpp @@ -131,6 +131,7 @@ ObLogSysLsTaskHandler::ObLogSysLsTaskHandler() : handle_pid_(0), stop_flag_(true), sys_ls_fetch_queue_(), + ddl_part_trans_count_(0), wait_formatted_cond_() {} @@ -181,6 +182,7 @@ void ObLogSysLsTaskHandler::destroy() err_handler_ = NULL; schema_getter_ = NULL; handle_pid_ = 0; + ddl_part_trans_count_ = 0; stop_flag_ = true; } @@ -233,10 +235,14 @@ void ObLogSysLsTaskHandler::stop() int ObLogSysLsTaskHandler::push(PartTransTask *task, const int64_t timeout) { int ret = OB_SUCCESS; + bool is_ddl_trans = false; if (OB_ISNULL(ddl_parser_)) { ret = OB_NOT_INIT; LOG_ERROR("invalid DDL parser", KR(ret), K(ddl_parser_)); + } else if (OB_ISNULL(task)) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid argument", KR(ret)); } else if (OB_UNLIKELY(! task->is_ddl_trans() && ! task->is_ls_op_trans() && ! task->is_sys_ls_heartbeat() @@ -246,7 +252,7 @@ int ObLogSysLsTaskHandler::push(PartTransTask *task, const int64_t timeout) } // DDL task have to push to the DDL parser first, because the task will retry after the task push DDL parser times out. // that is, the same task may be pushed multiple times.To avoid the same task being added to the queue more than once, the DDL parser is pushed first - else if (task->is_ddl_trans() && OB_FAIL(ddl_parser_->push(*task, timeout))) { + else if ((is_ddl_trans = task->is_ddl_trans()) && OB_FAIL(ddl_parser_->push(*task, timeout))) { if (OB_IN_STOP_STATE != ret && OB_TIMEOUT != ret) { LOG_ERROR("push task into DDL parser fail", KR(ret), K(task)); } @@ -254,7 +260,8 @@ int ObLogSysLsTaskHandler::push(PartTransTask *task, const int64_t timeout) // Add the task to the Fetch queue without timeout failure, ensuring that it will only be pushed once in the Parser else if (OB_FAIL(sys_ls_fetch_queue_.push(task))) { LOG_ERROR("push task into fetch queue fail", KR(ret), KPC(task)); - } else { + } else if (is_ddl_trans) { + inc_ddl_part_trans_count_(); // success } @@ -292,9 +299,10 @@ int ObLogSysLsTaskHandler::get_progress( return ret; } -int64_t ObLogSysLsTaskHandler::get_part_trans_task_count() const +void ObLogSysLsTaskHandler::get_task_count(int64_t &total_part_trans_count, int64_t &ddl_part_trans_count) const { - return sys_ls_fetch_queue_.size(); + total_part_trans_count = sys_ls_fetch_queue_.size(); + ddl_part_trans_count = ATOMIC_LOAD(&ddl_part_trans_count_); } void ObLogSysLsTaskHandler::configure(const ObLogConfig &config) @@ -369,9 +377,10 @@ int ObLogSysLsTaskHandler::handle_task_(PartTransTask &task, K(ddl_tenant_id), K(is_tenant_served)); } else { const bool is_using_online_schema = is_online_refresh_mode(TCTX.refresh_mode_); + const bool is_ddl_trans = task.is_ddl_trans(); // The following handles DDL transaction tasks and DDL heartbeat tasks // NOTICE: handle_ddl_trans before sequencer when using online_schmea, otherwise(using data_dict) handle_ddl_trans in sequencer. - if (task.is_ddl_trans() && is_using_online_schema && OB_FAIL(ddl_processor_->handle_ddl_trans(task, *tenant, stop_flag_))) { + if (is_ddl_trans && is_using_online_schema && OB_FAIL(ddl_processor_->handle_ddl_trans(task, *tenant, stop_flag_))) { if (OB_IN_STOP_STATE != ret) { LOG_ERROR("ddl_processor_ handle_ddl_trans fail", KR(ret), K(task), K(ddl_tenant_id), K(tenant), K(is_tenant_served)); @@ -382,6 +391,8 @@ int ObLogSysLsTaskHandler::handle_task_(PartTransTask &task, if (OB_IN_STOP_STATE != ret) { LOG_ERROR("update_sys_ls_info_ fail", KR(ret), K(task), KPC(tenant)); } + } else if (is_ddl_trans) { + dec_ddl_part_trans_count_(); } } diff --git a/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.h b/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.h index 93709a5724..02e898e6a0 100644 --- a/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.h +++ b/src/logservice/libobcdc/src/ob_log_sys_ls_task_handler.h @@ -61,7 +61,7 @@ public: int64_t &sys_ls_min_progress, palf::LSN &sys_ls_min_handle_log_lsn) = 0; - virtual int64_t get_part_trans_task_count() const = 0; + virtual void get_task_count(int64_t &total_part_trans_count, int64_t &ddl_part_trans_count) const = 0; virtual void configure(const ObLogConfig &config) = 0; }; @@ -99,7 +99,7 @@ public: uint64_t &sys_min_progress_tenant_id, int64_t &sys_ls_min_progress, palf::LSN &sys_ls_min_handle_log_lsn); - virtual int64_t get_part_trans_task_count() const; + virtual void get_task_count(int64_t &total_part_trans_count, int64_t &ddl_part_trans_count) const; virtual void configure(const ObLogConfig &config); public: @@ -140,6 +140,8 @@ private: PartTransTask *task, ObLogTenant *tenant, const bool is_tenant_served); + void inc_ddl_part_trans_count_() { ATOMIC_INC(&ddl_part_trans_count_); } + void dec_ddl_part_trans_count_() { ATOMIC_DEC(&ddl_part_trans_count_); } public: // Task queue @@ -180,6 +182,7 @@ private: // Queue of pending tasks exported from Fetcher TaskQueue sys_ls_fetch_queue_; + int64_t ddl_part_trans_count_; common::ObCond wait_formatted_cond_; private: