diff --git a/src/logservice/libobcdc/src/ob_log_fetcher.cpp b/src/logservice/libobcdc/src/ob_log_fetcher.cpp index 092c5dda9b..a45ce91039 100644 --- a/src/logservice/libobcdc/src/ob_log_fetcher.cpp +++ b/src/logservice/libobcdc/src/ob_log_fetcher.cpp @@ -747,7 +747,7 @@ void ObLogFetcher::print_fetcher_stat_() { int ret = OB_SUCCESS; int64_t min_progress = OB_INVALID_TIMESTAMP; - int64_t upper_limit_us = OB_INVALID_TIMESTAMP; + int64_t upper_limit_ns = OB_INVALID_TIMESTAMP; int64_t fetcher_delay = OB_INVALID_TIMESTAMP; int64_t dml_progress_limit = 0; @@ -759,12 +759,12 @@ void ObLogFetcher::print_fetcher_stat_() ret = OB_INVALID_ERROR; } else { dml_progress_limit = ATOMIC_LOAD(&FetchStream::g_dml_progress_limit); - upper_limit_us = min_progress + dml_progress_limit; - fetcher_delay = get_timestamp() - min_progress; + upper_limit_ns = min_progress + dml_progress_limit * NS_CONVERSION; + fetcher_delay = get_timestamp() - min_progress / NS_CONVERSION; } if (OB_SUCC(ret)) { - LOG_INFO("[STAT] [FETCHER]", "upper_limit", TS_TO_STR(upper_limit_us), + LOG_INFO("[STAT] [FETCHER]", "upper_limit", NTS_TO_STR(upper_limit_ns), "dml_progress_limit_sec", dml_progress_limit / _SEC_, "fetcher_delay", TVAL_TO_STR(fetcher_delay)); } diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp index dae0b87bc3..0e6c8d1ea7 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp @@ -435,7 +435,7 @@ int FetchStream::dispatch_fetch_task_(LSFetchCtx &task, return ret; } -int FetchStream::get_upper_limit(int64_t &upper_limit_us) +int FetchStream::get_upper_limit(int64_t &upper_limit_ns) { int ret = OB_SUCCESS; int64_t min_progress = OB_INVALID_TIMESTAMP; @@ -453,10 +453,10 @@ int FetchStream::get_upper_limit(int64_t &upper_limit_us) } else { // DDL partition is not limited by progress limit, here upper limit is set to a future value if (FETCH_STREAM_TYPE_SYS_LS == stype_) { - upper_limit_us = min_progress + ATOMIC_LOAD(&g_ddl_progress_limit) * NS_CONVERSION; + upper_limit_ns = min_progress + ATOMIC_LOAD(&g_ddl_progress_limit) * NS_CONVERSION; } else { // Other partition are limited by progress limit - upper_limit_us = min_progress + ATOMIC_LOAD(&g_dml_progress_limit) * NS_CONVERSION; + upper_limit_ns = min_progress + ATOMIC_LOAD(&g_dml_progress_limit) * NS_CONVERSION; } } diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h index acb570b133..1efcd3767e 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h @@ -116,7 +116,7 @@ public: void switch_state(State state) { ATOMIC_STORE(&state_, state); } - int get_upper_limit(int64_t &upper_limit_us); + int get_upper_limit(int64_t &upper_limit_ns); // Execution Statistics void do_stat(); diff --git a/src/logservice/logfetcher/ob_log_fetch_stat_info.cpp b/src/logservice/logfetcher/ob_log_fetch_stat_info.cpp index 2f34063a07..56b480f2be 100644 --- a/src/logservice/logfetcher/ob_log_fetch_stat_info.cpp +++ b/src/logservice/logfetcher/ob_log_fetch_stat_info.cpp @@ -454,13 +454,17 @@ int64_t FetchStatInfoPrinter::to_string(char* buf, const int64_t buf_len) const (void)databuff_printf(buf, buf_len, pos, - "traffic=%s/sec size/rpc=%s log_cnt/rpc=%ld rpc_cnt/sec=%ld " - "single_rpc/sec=%ld(upper_limit=%ld,max_log=%ld,no_log=%ld,max_result=%ld) " + "traffic=%s/sec log_size=%ld size/rpc=%s log_cnt/rpc=%ld " + "rpc_cnt=%ld(%ld/sec) single_rpc=%ld(%ld/sec)" + "(upper_limit=%ld(%ld/sec),max_log=%ld(%ld/sec),no_log=%ld(%ld/sec),max_result=%ld(%ld/sec)) " "rpc_time=%ld svr_time=(queue=%ld,process=%ld) net_time=(l2s=%ld,s2l=%ld) cb_time=%ld " "handle_rpc_time=%ld flush_time=%ld read_log_time=%ld(log_entry=%ld,trans=%ld) %s", - SIZE_TO_STR(traffic), SIZE_TO_STR(log_size_per_rpc), log_cnt_per_rpc, rpc_cnt_per_sec, - single_rpc_cnt_per_sec, reach_upper_limit_rpc_cnt_per_sec, - reach_max_log_id_rpc_cnt_per_sec, no_log_rpc_cnt_per_sec, reach_max_result_rpc_cnt_per_sec, + SIZE_TO_STR(traffic), log_size, SIZE_TO_STR(log_size_per_rpc), log_cnt_per_rpc, + rpc_cnt, rpc_cnt_per_sec, single_rpc_cnt, single_rpc_cnt_per_sec, + reach_upper_limit_rpc_cnt, reach_upper_limit_rpc_cnt_per_sec, + reach_max_log_id_rpc_cnt, reach_max_log_id_rpc_cnt_per_sec, + no_log_rpc_cnt, no_log_rpc_cnt_per_sec, + reach_max_result_rpc_cnt, reach_max_result_rpc_cnt_per_sec, rpc_time_per_rpc, svr_queue_time_per_rpc, svr_process_time_per_rpc, l2s_net_time_per_rpc, s2l_net_time_per_rpc, callback_time_per_rpc, handle_rpc_time_per_rpc, flush_time_per_rpc, read_log_time_per_rpc, diff --git a/src/logservice/logfetcher/ob_log_fetcher.cpp b/src/logservice/logfetcher/ob_log_fetcher.cpp index b06a884469..4d767726e6 100644 --- a/src/logservice/logfetcher/ob_log_fetcher.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher.cpp @@ -734,8 +734,9 @@ void ObLogFetcher::print_fetcher_stat_() { int ret = OB_SUCCESS; int64_t min_progress = OB_INVALID_TIMESTAMP; - int64_t upper_limit_us = OB_INVALID_TIMESTAMP; + int64_t upper_limit_ns = OB_INVALID_TIMESTAMP; int64_t fetcher_delay = OB_INVALID_TIMESTAMP; + int64_t global_upper_limit = OB_INVALID_TIMESTAMP; int64_t dml_progress_limit = 0; // Get global minimum progress @@ -746,13 +747,15 @@ void ObLogFetcher::print_fetcher_stat_() ret = OB_INVALID_ERROR; } else { dml_progress_limit = ATOMIC_LOAD(&FetchStream::g_dml_progress_limit); - upper_limit_us = min_progress + dml_progress_limit; + upper_limit_ns = min_progress + dml_progress_limit * NS_CONVERSION; fetcher_delay = get_timestamp() - min_progress / NS_CONVERSION; + global_upper_limit = progress_controller_.get_global_upper_limit(); } if (OB_SUCC(ret)) { - LOG_INFO("[STAT] [LOG_FETCHER]", "upper_limit", NTS_TO_STR(upper_limit_us), - "dml_progress_limit_sec", dml_progress_limit / NS_CONVERSION, + LOG_INFO("[STAT] [LOG_FETCHER]", "upper_limit", NTS_TO_STR(upper_limit_ns), + "global_upper_limit", NTS_TO_STR(global_upper_limit), + "dml_progress_limit_sec", dml_progress_limit / _SEC_, "fetcher_delay", TVAL_TO_STR(fetcher_delay)); } } diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp index 5b64ef3ff9..f1d2d18ceb 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp @@ -458,7 +458,7 @@ int FetchStream::dispatch_fetch_task_(LSFetchCtx &task, return ret; } -int FetchStream::get_upper_limit(int64_t &upper_limit_us) +int FetchStream::get_upper_limit(int64_t &upper_limit_ns) { int ret = OB_SUCCESS; int64_t min_progress = OB_INVALID_TIMESTAMP; @@ -477,19 +477,19 @@ int FetchStream::get_upper_limit(int64_t &upper_limit_us) } else { // DDL partition is not limited by progress limit, here upper limit is set to a future value if (FETCH_STREAM_TYPE_SYS_LS == stype_) { - upper_limit_us = min_progress + ATOMIC_LOAD(&g_ddl_progress_limit) * NS_CONVERSION; + upper_limit_ns = min_progress + ATOMIC_LOAD(&g_ddl_progress_limit) * NS_CONVERSION; } else { // Other partition are limited by progress limit - upper_limit_us = min_progress + ATOMIC_LOAD(&g_dml_progress_limit) * NS_CONVERSION; + upper_limit_ns = min_progress + ATOMIC_LOAD(&g_dml_progress_limit) * NS_CONVERSION; } global_upper_limit = progress_controller_->get_global_upper_limit(); if (OB_INVALID_TIMESTAMP != global_upper_limit) { const int64_t log_progress = ls_fetch_ctx_->get_progress(); if (log_progress < global_upper_limit) { - upper_limit_us = INT64_MAX - 1; + upper_limit_ns = INT64_MAX - 1; } else { - upper_limit_us = std::min(upper_limit_us, global_upper_limit); + upper_limit_ns = std::min(upper_limit_ns, global_upper_limit); } } } diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_stream.h b/src/logservice/logfetcher/ob_log_ls_fetch_stream.h index c621f2ae8a..b8e13cd493 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_stream.h +++ b/src/logservice/logfetcher/ob_log_ls_fetch_stream.h @@ -120,7 +120,7 @@ public: void switch_state(State state) { ATOMIC_STORE(&state_, state); } - int get_upper_limit(int64_t &upper_limit_us); + int get_upper_limit(int64_t &upper_limit_ns); // Execution Statistics void do_stat();