[OBCDC] Fix memory_usage not controlled while sql response slow
This commit is contained in:
parent
5fe0031adc
commit
aabce90fca
@ -411,7 +411,7 @@ public:
|
||||
DEF_CAP(extra_redo_dispatch_memory_size, OB_CLUSTER_PARAMETER, "0KB", "[0, 512M]", "extra redo dispatcher memory for data skew participant");
|
||||
T_DEF_INT(pause_redo_dispatch_task_count_threshold, OB_CLUSTER_PARAMETER, 80, 0, 100, "task cound percent threshold for pause redo dispatch");
|
||||
T_DEF_INT(memory_usage_warn_threshold, OB_CLUSTER_PARAMETER, 85, 10, 100, "memory usage wan threshold, may pause fetch while reach the threshold");
|
||||
T_DEF_INT_INFT(queue_backlog_lowest_tolerance, OB_CLUSTER_PARAMETER, 500, 0, "lowest threshold of queue_backlog that will touch redo_dispatch flow controll");
|
||||
T_DEF_INT_INFT(queue_backlog_lowest_tolerance, OB_CLUSTER_PARAMETER, 100, 0, "lowest threshold of queue_backlog that will touch flow controll");
|
||||
// sorter thread num
|
||||
T_DEF_INT(msg_sorter_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "trans msg sorter thread num");
|
||||
// sorter thread
|
||||
|
@ -2535,6 +2535,7 @@ void ObLogInstance::global_flow_control_()
|
||||
int64_t active_part_trans_task_count = trans_task_pool_.get_alloc_count();
|
||||
int64_t active_log_entry_task_count = log_entry_task_pool_->get_alloc_count();
|
||||
int64_t reusable_part_trans_task_count = 0;
|
||||
int64_t ddl_part_trans_count = 0;
|
||||
int64_t ready_to_seq_task_count = 0;
|
||||
int64_t seq_queue_trans_count = 0;
|
||||
|
||||
@ -2559,15 +2560,17 @@ void ObLogInstance::global_flow_control_()
|
||||
const bool need_pause_dispatch = need_pause_redo_dispatch();
|
||||
const bool touch_memory_warn_limit = (memory_hold > memory_warn_usage);
|
||||
const bool is_storage_work_mode = is_storage_working_mode(working_mode_);
|
||||
const int64_t queue_backlog_lowest_tolerance = TCONF.queue_backlog_lowest_tolerance;
|
||||
const char *reason = "";
|
||||
|
||||
if (OB_FAIL(get_task_count_(ready_to_seq_task_count, seq_queue_trans_count, reusable_part_trans_task_count))) {
|
||||
if (OB_FAIL(get_task_count_(ready_to_seq_task_count, seq_queue_trans_count, reusable_part_trans_task_count, ddl_part_trans_count))) {
|
||||
LOG_ERROR("get_task_count fail", KR(ret), K(ready_to_seq_task_count), K(seq_queue_trans_count),
|
||||
K(reusable_part_trans_task_count));
|
||||
K(reusable_part_trans_task_count), K(ddl_part_trans_count));
|
||||
} else if (OB_FAIL(dml_parser_->get_log_entry_task_count(dml_parser_part_trans_task_count))) {
|
||||
LOG_ERROR("DML parser get_log_entry_task_count fail", KR(ret), K(dml_parser_part_trans_task_count));
|
||||
} else {
|
||||
const bool is_seq_queue_not_empty = (seq_queue_trans_count > 0);
|
||||
const bool exist_trans_sequenced_not_handled = (seq_queue_trans_count > queue_backlog_lowest_tolerance);
|
||||
const bool exist_ddl_processing_or_in_queue = (ddl_part_trans_count > queue_backlog_lowest_tolerance);
|
||||
int64_t storager_task_count = 0;
|
||||
int64_t storager_log_count = 0;
|
||||
storager_->get_task_count(storager_task_count, storager_log_count);
|
||||
@ -2581,7 +2584,9 @@ void ObLogInstance::global_flow_control_()
|
||||
// OR
|
||||
// (3) memory is limited and exist trans sequenced but not output
|
||||
// OR
|
||||
// (4) memory_limit touch warn threshold and need_pause_dispatch
|
||||
// (4) memory is limited and exist ddl_trans in to handle or handling
|
||||
// OR
|
||||
// (5) memory_limit touch warn threshold and need_pause_dispatch
|
||||
bool condition1 = (active_part_trans_task_count >= part_trans_task_active_count_upper_bound)
|
||||
|| touch_memory_warn_limit
|
||||
|| (system_memory_avail < system_memory_avail_lower_bound);
|
||||
@ -2589,15 +2594,17 @@ void ObLogInstance::global_flow_control_()
|
||||
|| (ready_to_seq_task_count > ready_to_seq_task_upper_bound);
|
||||
bool condition3 = (storager_task_count > storager_task_count_upper_bound) && (memory_hold >= storager_mem_percentage * memory_limit);
|
||||
|
||||
need_slow_down_fetcher = (condition1 && (condition2 || need_pause_dispatch || is_seq_queue_not_empty)) || condition3;
|
||||
need_slow_down_fetcher = (condition1 && (condition2 || need_pause_dispatch || exist_trans_sequenced_not_handled || exist_ddl_processing_or_in_queue)) || condition3;
|
||||
|
||||
if (need_slow_down_fetcher) {
|
||||
if (condition2) {
|
||||
reason = "MEMORY_LIMIT_AND_REUSABLE_PART_TOO_MUCH";
|
||||
} else if (need_pause_dispatch) {
|
||||
reason = "MEMORY_LIMIT_AND_DISPATCH_PAUSED";
|
||||
} else if (is_seq_queue_not_empty) {
|
||||
} else if (exist_trans_sequenced_not_handled) {
|
||||
reason = "MEMORY_LIMIT_AND_EXIST_TRANS_TO_OUTPUT";
|
||||
} else if (exist_ddl_processing_or_in_queue) {
|
||||
reason = "MEMORY_LIMIT_AND_EXIST_DDL_TRANS_TO_HANDLE";
|
||||
} else if (condition3) {
|
||||
reason = "STORAGER_TASK_OVER_THRESHOLD";
|
||||
} else {
|
||||
@ -2624,7 +2631,7 @@ void ObLogInstance::global_flow_control_()
|
||||
"PART_TRANS(TOTAL=%ld, ACTIVE=%ld/%ld, REUSABLE=%ld/%ld) "
|
||||
"LOG_TASK(ACTIVE=%ld) "
|
||||
"STORE(%ld/%ld) "
|
||||
"[FETCHER=%ld DML_PARSER=%ld "
|
||||
"[FETCHER=%ld DML_PARSER=%ld DDL=%ld "
|
||||
"COMMITER=%ld USER_QUEUE=%ld OUT=%ld RC=%ld] "
|
||||
"DIST_TRANS(SEQ_QUEUE=%ld, SEQ=%ld, COMMITTED=%ld) "
|
||||
"NEED_PAUSE_DISPATCH=%d REASON=%s",
|
||||
@ -2637,7 +2644,7 @@ void ObLogInstance::global_flow_control_()
|
||||
reusable_part_trans_task_count, part_trans_task_reusable_count_upper_bound,
|
||||
active_log_entry_task_count,
|
||||
storager_task_count, storager_task_count_upper_bound,
|
||||
fetcher_part_trans_task_count, dml_parser_part_trans_task_count,
|
||||
fetcher_part_trans_task_count, dml_parser_part_trans_task_count, ddl_part_trans_count,
|
||||
committer_ddl_part_trans_task_count + committer_dml_part_trans_task_count,
|
||||
br_queue_part_trans_task_count, out_part_trans_task_count,
|
||||
resource_collector_part_trans_task_count,
|
||||
@ -2799,7 +2806,8 @@ int64_t ObLogInstance::get_memory_limit_() const
|
||||
int ObLogInstance::get_task_count_(
|
||||
int64_t &ready_to_seq_task_count,
|
||||
int64_t &seq_trans_count,
|
||||
int64_t &part_trans_task_resuable_count)
|
||||
int64_t &part_trans_task_resuable_count,
|
||||
int64_t &ddl_part_trans_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ready_to_seq_task_count = 0;
|
||||
@ -2858,7 +2866,8 @@ int ObLogInstance::get_task_count_(
|
||||
int64_t fetcher_part_trans_task_count = fetcher_->get_part_trans_task_count();
|
||||
committer_->get_part_trans_task_count(committer_ddl_part_trans_task_count,
|
||||
committer_dml_part_trans_task_count);
|
||||
int64_t sys_ls_handle_part_trans_task_count = sys_ls_handler_->get_part_trans_task_count();
|
||||
int64_t sys_ls_handle_part_trans_task_count = 0;
|
||||
sys_ls_handler_->get_task_count(sys_ls_handle_part_trans_task_count, ddl_part_trans_count);
|
||||
int64_t br_queue_part_trans_task_count = br_queue_.get_part_trans_task_count();
|
||||
int64_t out_part_trans_task_count = get_out_part_trans_task_count_();
|
||||
int64_t resource_collector_part_trans_task_count = 0;
|
||||
@ -2881,7 +2890,8 @@ int ObLogInstance::get_task_count_(
|
||||
if (REACH_TIME_INTERVAL(PRINT_GLOBAL_FLOW_CONTROL_INTERVAL)) {
|
||||
_LOG_INFO("------------------------------------------------------------");
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [FETCHER] [PART_TRANS_TASK=%ld]", fetcher_part_trans_task_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [SYS_LS_HANDLE] [PART_TRANS_TASK=%ld]", sys_ls_handle_part_trans_task_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [SYS_LS_HANDLE] [PART_TRANS_TASK=%ld][DDL_QUEUED=%ld]",
|
||||
sys_ls_handle_part_trans_task_count, ddl_part_trans_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [STORAGER] [LOG_TASK=%ld/%ld]", storager_task_count, storager_log_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [SEQUENCER] [PART_TRANS_TASK(QUEUE=%ld TOTAL=[%ld][DDL=%ld DML=%ld HB=%ld])] [TRANS(READY=%ld SEQ=%ld)]",
|
||||
seq_stat_info.queue_part_trans_task_count_, seq_stat_info.total_part_trans_task_count_,
|
||||
|
@ -244,7 +244,8 @@ private:
|
||||
int get_task_count_(
|
||||
int64_t &ready_to_seq_task_count,
|
||||
int64_t &seq_trans_count,
|
||||
int64_t &part_trans_task_resuable_count);
|
||||
int64_t &part_trans_task_resuable_count,
|
||||
int64_t &ddl_part_trans_count);
|
||||
|
||||
// next record
|
||||
void do_drc_consume_tps_stat_();
|
||||
|
@ -131,6 +131,7 @@ ObLogSysLsTaskHandler::ObLogSysLsTaskHandler() :
|
||||
handle_pid_(0),
|
||||
stop_flag_(true),
|
||||
sys_ls_fetch_queue_(),
|
||||
ddl_part_trans_count_(0),
|
||||
wait_formatted_cond_()
|
||||
{}
|
||||
|
||||
@ -181,6 +182,7 @@ void ObLogSysLsTaskHandler::destroy()
|
||||
err_handler_ = NULL;
|
||||
schema_getter_ = NULL;
|
||||
handle_pid_ = 0;
|
||||
ddl_part_trans_count_ = 0;
|
||||
stop_flag_ = true;
|
||||
}
|
||||
|
||||
@ -233,10 +235,14 @@ void ObLogSysLsTaskHandler::stop()
|
||||
int ObLogSysLsTaskHandler::push(PartTransTask *task, const int64_t timeout)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_ddl_trans = false;
|
||||
|
||||
if (OB_ISNULL(ddl_parser_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_ERROR("invalid DDL parser", KR(ret), K(ddl_parser_));
|
||||
} else if (OB_ISNULL(task)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("invalid argument", KR(ret));
|
||||
} else if (OB_UNLIKELY(! task->is_ddl_trans()
|
||||
&& ! task->is_ls_op_trans()
|
||||
&& ! task->is_sys_ls_heartbeat()
|
||||
@ -246,7 +252,7 @@ int ObLogSysLsTaskHandler::push(PartTransTask *task, const int64_t timeout)
|
||||
}
|
||||
// DDL task have to push to the DDL parser first, because the task will retry after the task push DDL parser times out.
|
||||
// that is, the same task may be pushed multiple times.To avoid the same task being added to the queue more than once, the DDL parser is pushed first
|
||||
else if (task->is_ddl_trans() && OB_FAIL(ddl_parser_->push(*task, timeout))) {
|
||||
else if ((is_ddl_trans = task->is_ddl_trans()) && OB_FAIL(ddl_parser_->push(*task, timeout))) {
|
||||
if (OB_IN_STOP_STATE != ret && OB_TIMEOUT != ret) {
|
||||
LOG_ERROR("push task into DDL parser fail", KR(ret), K(task));
|
||||
}
|
||||
@ -254,7 +260,8 @@ int ObLogSysLsTaskHandler::push(PartTransTask *task, const int64_t timeout)
|
||||
// Add the task to the Fetch queue without timeout failure, ensuring that it will only be pushed once in the Parser
|
||||
else if (OB_FAIL(sys_ls_fetch_queue_.push(task))) {
|
||||
LOG_ERROR("push task into fetch queue fail", KR(ret), KPC(task));
|
||||
} else {
|
||||
} else if (is_ddl_trans) {
|
||||
inc_ddl_part_trans_count_();
|
||||
// success
|
||||
}
|
||||
|
||||
@ -292,9 +299,10 @@ int ObLogSysLsTaskHandler::get_progress(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObLogSysLsTaskHandler::get_part_trans_task_count() const
|
||||
void ObLogSysLsTaskHandler::get_task_count(int64_t &total_part_trans_count, int64_t &ddl_part_trans_count) const
|
||||
{
|
||||
return sys_ls_fetch_queue_.size();
|
||||
total_part_trans_count = sys_ls_fetch_queue_.size();
|
||||
ddl_part_trans_count = ATOMIC_LOAD(&ddl_part_trans_count_);
|
||||
}
|
||||
|
||||
void ObLogSysLsTaskHandler::configure(const ObLogConfig &config)
|
||||
@ -369,9 +377,10 @@ int ObLogSysLsTaskHandler::handle_task_(PartTransTask &task,
|
||||
K(ddl_tenant_id), K(is_tenant_served));
|
||||
} else {
|
||||
const bool is_using_online_schema = is_online_refresh_mode(TCTX.refresh_mode_);
|
||||
const bool is_ddl_trans = task.is_ddl_trans();
|
||||
// The following handles DDL transaction tasks and DDL heartbeat tasks
|
||||
// NOTICE: handle_ddl_trans before sequencer when using online_schmea, otherwise(using data_dict) handle_ddl_trans in sequencer.
|
||||
if (task.is_ddl_trans() && is_using_online_schema && OB_FAIL(ddl_processor_->handle_ddl_trans(task, *tenant, stop_flag_))) {
|
||||
if (is_ddl_trans && is_using_online_schema && OB_FAIL(ddl_processor_->handle_ddl_trans(task, *tenant, stop_flag_))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("ddl_processor_ handle_ddl_trans fail", KR(ret), K(task), K(ddl_tenant_id), K(tenant),
|
||||
K(is_tenant_served));
|
||||
@ -382,6 +391,8 @@ int ObLogSysLsTaskHandler::handle_task_(PartTransTask &task,
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("update_sys_ls_info_ fail", KR(ret), K(task), KPC(tenant));
|
||||
}
|
||||
} else if (is_ddl_trans) {
|
||||
dec_ddl_part_trans_count_();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ public:
|
||||
int64_t &sys_ls_min_progress,
|
||||
palf::LSN &sys_ls_min_handle_log_lsn) = 0;
|
||||
|
||||
virtual int64_t get_part_trans_task_count() const = 0;
|
||||
virtual void get_task_count(int64_t &total_part_trans_count, int64_t &ddl_part_trans_count) const = 0;
|
||||
|
||||
virtual void configure(const ObLogConfig &config) = 0;
|
||||
};
|
||||
@ -99,7 +99,7 @@ public:
|
||||
uint64_t &sys_min_progress_tenant_id,
|
||||
int64_t &sys_ls_min_progress,
|
||||
palf::LSN &sys_ls_min_handle_log_lsn);
|
||||
virtual int64_t get_part_trans_task_count() const;
|
||||
virtual void get_task_count(int64_t &total_part_trans_count, int64_t &ddl_part_trans_count) const;
|
||||
virtual void configure(const ObLogConfig &config);
|
||||
|
||||
public:
|
||||
@ -140,6 +140,8 @@ private:
|
||||
PartTransTask *task,
|
||||
ObLogTenant *tenant,
|
||||
const bool is_tenant_served);
|
||||
void inc_ddl_part_trans_count_() { ATOMIC_INC(&ddl_part_trans_count_); }
|
||||
void dec_ddl_part_trans_count_() { ATOMIC_DEC(&ddl_part_trans_count_); }
|
||||
|
||||
public:
|
||||
// Task queue
|
||||
@ -180,6 +182,7 @@ private:
|
||||
|
||||
// Queue of pending tasks exported from Fetcher
|
||||
TaskQueue sys_ls_fetch_queue_;
|
||||
int64_t ddl_part_trans_count_;
|
||||
common::ObCond wait_formatted_cond_;
|
||||
|
||||
private:
|
||||
|
Loading…
x
Reference in New Issue
Block a user