[LogFetcher][OBCDC] fix the wrong statistic in LogFetcher

This commit is contained in:
zxlzxlzxlzxlzxl
2023-06-09 05:18:01 +00:00
committed by ob-robot
parent 7d5809c2ba
commit 8c33711e48
7 changed files with 30 additions and 23 deletions

View File

@ -747,7 +747,7 @@ void ObLogFetcher::print_fetcher_stat_()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t min_progress = OB_INVALID_TIMESTAMP; int64_t min_progress = OB_INVALID_TIMESTAMP;
int64_t upper_limit_us = OB_INVALID_TIMESTAMP; int64_t upper_limit_ns = OB_INVALID_TIMESTAMP;
int64_t fetcher_delay = OB_INVALID_TIMESTAMP; int64_t fetcher_delay = OB_INVALID_TIMESTAMP;
int64_t dml_progress_limit = 0; int64_t dml_progress_limit = 0;
@ -759,12 +759,12 @@ void ObLogFetcher::print_fetcher_stat_()
ret = OB_INVALID_ERROR; ret = OB_INVALID_ERROR;
} else { } else {
dml_progress_limit = ATOMIC_LOAD(&FetchStream::g_dml_progress_limit); dml_progress_limit = ATOMIC_LOAD(&FetchStream::g_dml_progress_limit);
upper_limit_us = min_progress + dml_progress_limit; upper_limit_ns = min_progress + dml_progress_limit * NS_CONVERSION;
fetcher_delay = get_timestamp() - min_progress; fetcher_delay = get_timestamp() - min_progress / NS_CONVERSION;
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
LOG_INFO("[STAT] [FETCHER]", "upper_limit", TS_TO_STR(upper_limit_us), LOG_INFO("[STAT] [FETCHER]", "upper_limit", NTS_TO_STR(upper_limit_ns),
"dml_progress_limit_sec", dml_progress_limit / _SEC_, "dml_progress_limit_sec", dml_progress_limit / _SEC_,
"fetcher_delay", TVAL_TO_STR(fetcher_delay)); "fetcher_delay", TVAL_TO_STR(fetcher_delay));
} }

View File

@ -435,7 +435,7 @@ int FetchStream::dispatch_fetch_task_(LSFetchCtx &task,
return ret; return ret;
} }
int FetchStream::get_upper_limit(int64_t &upper_limit_us) int FetchStream::get_upper_limit(int64_t &upper_limit_ns)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t min_progress = OB_INVALID_TIMESTAMP; int64_t min_progress = OB_INVALID_TIMESTAMP;
@ -453,10 +453,10 @@ int FetchStream::get_upper_limit(int64_t &upper_limit_us)
} else { } else {
// DDL partition is not limited by progress limit, here upper limit is set to a future value // DDL partition is not limited by progress limit, here upper limit is set to a future value
if (FETCH_STREAM_TYPE_SYS_LS == stype_) { if (FETCH_STREAM_TYPE_SYS_LS == stype_) {
upper_limit_us = min_progress + ATOMIC_LOAD(&g_ddl_progress_limit) * NS_CONVERSION; upper_limit_ns = min_progress + ATOMIC_LOAD(&g_ddl_progress_limit) * NS_CONVERSION;
} else { } else {
// Other partition are limited by progress limit // Other partition are limited by progress limit
upper_limit_us = min_progress + ATOMIC_LOAD(&g_dml_progress_limit) * NS_CONVERSION; upper_limit_ns = min_progress + ATOMIC_LOAD(&g_dml_progress_limit) * NS_CONVERSION;
} }
} }

View File

@ -116,7 +116,7 @@ public:
void switch_state(State state) { ATOMIC_STORE(&state_, state); } void switch_state(State state) { ATOMIC_STORE(&state_, state); }
int get_upper_limit(int64_t &upper_limit_us); int get_upper_limit(int64_t &upper_limit_ns);
// Execution Statistics // Execution Statistics
void do_stat(); void do_stat();

View File

@ -454,13 +454,17 @@ int64_t FetchStatInfoPrinter::to_string(char* buf, const int64_t buf_len) const
(void)databuff_printf(buf, buf_len, pos, (void)databuff_printf(buf, buf_len, pos,
"traffic=%s/sec size/rpc=%s log_cnt/rpc=%ld rpc_cnt/sec=%ld " "traffic=%s/sec log_size=%ld size/rpc=%s log_cnt/rpc=%ld "
"single_rpc/sec=%ld(upper_limit=%ld,max_log=%ld,no_log=%ld,max_result=%ld) " "rpc_cnt=%ld(%ld/sec) single_rpc=%ld(%ld/sec)"
"(upper_limit=%ld(%ld/sec),max_log=%ld(%ld/sec),no_log=%ld(%ld/sec),max_result=%ld(%ld/sec)) "
"rpc_time=%ld svr_time=(queue=%ld,process=%ld) net_time=(l2s=%ld,s2l=%ld) cb_time=%ld " "rpc_time=%ld svr_time=(queue=%ld,process=%ld) net_time=(l2s=%ld,s2l=%ld) cb_time=%ld "
"handle_rpc_time=%ld flush_time=%ld read_log_time=%ld(log_entry=%ld,trans=%ld) %s", "handle_rpc_time=%ld flush_time=%ld read_log_time=%ld(log_entry=%ld,trans=%ld) %s",
SIZE_TO_STR(traffic), SIZE_TO_STR(log_size_per_rpc), log_cnt_per_rpc, rpc_cnt_per_sec, SIZE_TO_STR(traffic), log_size, SIZE_TO_STR(log_size_per_rpc), log_cnt_per_rpc,
single_rpc_cnt_per_sec, reach_upper_limit_rpc_cnt_per_sec, rpc_cnt, rpc_cnt_per_sec, single_rpc_cnt, single_rpc_cnt_per_sec,
reach_max_log_id_rpc_cnt_per_sec, no_log_rpc_cnt_per_sec, reach_max_result_rpc_cnt_per_sec, reach_upper_limit_rpc_cnt, reach_upper_limit_rpc_cnt_per_sec,
reach_max_log_id_rpc_cnt, reach_max_log_id_rpc_cnt_per_sec,
no_log_rpc_cnt, no_log_rpc_cnt_per_sec,
reach_max_result_rpc_cnt, reach_max_result_rpc_cnt_per_sec,
rpc_time_per_rpc, svr_queue_time_per_rpc, svr_process_time_per_rpc, rpc_time_per_rpc, svr_queue_time_per_rpc, svr_process_time_per_rpc,
l2s_net_time_per_rpc, s2l_net_time_per_rpc, callback_time_per_rpc, l2s_net_time_per_rpc, s2l_net_time_per_rpc, callback_time_per_rpc,
handle_rpc_time_per_rpc, flush_time_per_rpc, read_log_time_per_rpc, handle_rpc_time_per_rpc, flush_time_per_rpc, read_log_time_per_rpc,

View File

@ -734,8 +734,9 @@ void ObLogFetcher::print_fetcher_stat_()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t min_progress = OB_INVALID_TIMESTAMP; int64_t min_progress = OB_INVALID_TIMESTAMP;
int64_t upper_limit_us = OB_INVALID_TIMESTAMP; int64_t upper_limit_ns = OB_INVALID_TIMESTAMP;
int64_t fetcher_delay = OB_INVALID_TIMESTAMP; int64_t fetcher_delay = OB_INVALID_TIMESTAMP;
int64_t global_upper_limit = OB_INVALID_TIMESTAMP;
int64_t dml_progress_limit = 0; int64_t dml_progress_limit = 0;
// Get global minimum progress // Get global minimum progress
@ -746,13 +747,15 @@ void ObLogFetcher::print_fetcher_stat_()
ret = OB_INVALID_ERROR; ret = OB_INVALID_ERROR;
} else { } else {
dml_progress_limit = ATOMIC_LOAD(&FetchStream::g_dml_progress_limit); dml_progress_limit = ATOMIC_LOAD(&FetchStream::g_dml_progress_limit);
upper_limit_us = min_progress + dml_progress_limit; upper_limit_ns = min_progress + dml_progress_limit * NS_CONVERSION;
fetcher_delay = get_timestamp() - min_progress / NS_CONVERSION; fetcher_delay = get_timestamp() - min_progress / NS_CONVERSION;
global_upper_limit = progress_controller_.get_global_upper_limit();
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
LOG_INFO("[STAT] [LOG_FETCHER]", "upper_limit", NTS_TO_STR(upper_limit_us), LOG_INFO("[STAT] [LOG_FETCHER]", "upper_limit", NTS_TO_STR(upper_limit_ns),
"dml_progress_limit_sec", dml_progress_limit / NS_CONVERSION, "global_upper_limit", NTS_TO_STR(global_upper_limit),
"dml_progress_limit_sec", dml_progress_limit / _SEC_,
"fetcher_delay", TVAL_TO_STR(fetcher_delay)); "fetcher_delay", TVAL_TO_STR(fetcher_delay));
} }
} }

View File

@ -458,7 +458,7 @@ int FetchStream::dispatch_fetch_task_(LSFetchCtx &task,
return ret; return ret;
} }
int FetchStream::get_upper_limit(int64_t &upper_limit_us) int FetchStream::get_upper_limit(int64_t &upper_limit_ns)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t min_progress = OB_INVALID_TIMESTAMP; int64_t min_progress = OB_INVALID_TIMESTAMP;
@ -477,19 +477,19 @@ int FetchStream::get_upper_limit(int64_t &upper_limit_us)
} else { } else {
// DDL partition is not limited by progress limit, here upper limit is set to a future value // DDL partition is not limited by progress limit, here upper limit is set to a future value
if (FETCH_STREAM_TYPE_SYS_LS == stype_) { if (FETCH_STREAM_TYPE_SYS_LS == stype_) {
upper_limit_us = min_progress + ATOMIC_LOAD(&g_ddl_progress_limit) * NS_CONVERSION; upper_limit_ns = min_progress + ATOMIC_LOAD(&g_ddl_progress_limit) * NS_CONVERSION;
} else { } else {
// Other partition are limited by progress limit // Other partition are limited by progress limit
upper_limit_us = min_progress + ATOMIC_LOAD(&g_dml_progress_limit) * NS_CONVERSION; upper_limit_ns = min_progress + ATOMIC_LOAD(&g_dml_progress_limit) * NS_CONVERSION;
} }
global_upper_limit = progress_controller_->get_global_upper_limit(); global_upper_limit = progress_controller_->get_global_upper_limit();
if (OB_INVALID_TIMESTAMP != global_upper_limit) { if (OB_INVALID_TIMESTAMP != global_upper_limit) {
const int64_t log_progress = ls_fetch_ctx_->get_progress(); const int64_t log_progress = ls_fetch_ctx_->get_progress();
if (log_progress < global_upper_limit) { if (log_progress < global_upper_limit) {
upper_limit_us = INT64_MAX - 1; upper_limit_ns = INT64_MAX - 1;
} else { } else {
upper_limit_us = std::min(upper_limit_us, global_upper_limit); upper_limit_ns = std::min(upper_limit_ns, global_upper_limit);
} }
} }
} }

View File

@ -120,7 +120,7 @@ public:
void switch_state(State state) { ATOMIC_STORE(&state_, state); } void switch_state(State state) { ATOMIC_STORE(&state_, state); }
int get_upper_limit(int64_t &upper_limit_us); int get_upper_limit(int64_t &upper_limit_ns);
// Execution Statistics // Execution Statistics
void do_stat(); void do_stat();