From d89f2c4133d65f55ea5deca22f59a9a96bea8fd3 Mon Sep 17 00:00:00 2001 From: fkuner <784819644@qq.com> Date: Fri, 29 Mar 2024 10:45:45 +0000 Subject: [PATCH] [CP] Add statistics about parser and tenant traffic --- .../src/ob_log_fetch_stream_container.cpp | 4 +- .../src/ob_log_fetch_stream_container.h | 2 +- .../src/ob_log_fetch_stream_container_mgr.cpp | 56 +++++++++++++++++-- .../src/ob_log_fetch_stream_container_mgr.h | 30 +++++++--- .../libobcdc/src/ob_log_instance.cpp | 1 + .../libobcdc/src/ob_log_ls_fetch_stream.cpp | 4 +- .../libobcdc/src/ob_log_ls_fetch_stream.h | 2 +- .../libobcdc/src/ob_log_part_trans_parser.cpp | 35 +++++++++++- .../libobcdc/src/ob_log_part_trans_parser.h | 8 +++ .../logfetcher/ob_log_fetch_stat_info.cpp | 9 +++ .../logfetcher/ob_log_fetch_stat_info.h | 2 + .../ob_log_fetch_stream_container.cpp | 4 +- .../ob_log_fetch_stream_container.h | 2 +- .../ob_log_fetch_stream_container_mgr.cpp | 15 ++++- .../ob_log_fetch_stream_container_mgr.h | 10 +++- .../logfetcher/ob_log_ls_fetch_stream.cpp | 4 +- .../logfetcher/ob_log_ls_fetch_stream.h | 2 +- 17 files changed, 160 insertions(+), 30 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_fetch_stream_container.cpp b/src/logservice/libobcdc/src/ob_log_fetch_stream_container.cpp index c7c14ee9a7..e5315a716b 100644 --- a/src/logservice/libobcdc/src/ob_log_fetch_stream_container.cpp +++ b/src/logservice/libobcdc/src/ob_log_fetch_stream_container.cpp @@ -89,7 +89,7 @@ int FetchStreamContainer::dispatch(LSFetchCtx &task, return ret; } -void FetchStreamContainer::do_stat() +void FetchStreamContainer::do_stat(int64_t &traffic) { // Add read locks to allow concurrent lookups and inserts SpinRLockGuard guard(lock_); @@ -97,7 +97,7 @@ void FetchStreamContainer::do_stat() FetchStream *fs = fs_list_.head(); while (NULL != fs) { if (fs->get_fetch_task_count() > 0) { - fs->do_stat(); + fs->do_stat(traffic); } fs = fs->get_next(); } diff --git a/src/logservice/libobcdc/src/ob_log_fetch_stream_container.h b/src/logservice/libobcdc/src/ob_log_fetch_stream_container.h index e363f9eab8..2b980446da 100644 --- a/src/logservice/libobcdc/src/ob_log_fetch_stream_container.h +++ b/src/logservice/libobcdc/src/ob_log_fetch_stream_container.h @@ -52,7 +52,7 @@ public: const common::ObAddr &request_svr); // print stat info - void do_stat(); + void do_stat(int64_t &traffic); public: TO_STRING_KV("stype", print_fetch_stream_type(stype_), diff --git a/src/logservice/libobcdc/src/ob_log_fetch_stream_container_mgr.cpp b/src/logservice/libobcdc/src/ob_log_fetch_stream_container_mgr.cpp index 3300f143c9..18a238ac55 100644 --- a/src/logservice/libobcdc/src/ob_log_fetch_stream_container_mgr.cpp +++ b/src/logservice/libobcdc/src/ob_log_fetch_stream_container_mgr.cpp @@ -26,7 +26,8 @@ ObFsContainerMgr::ObFsContainerMgr() : fsc_map_(), fsc_pool_(), fs_pool_(), - rpc_result_pool_() + rpc_result_pool_(), + tenant_fetch_traffic_map_() { } @@ -60,6 +61,8 @@ int ObFsContainerMgr::init(const int64_t svr_stream_cached_count, LOG_ERROR("init fetch stream pool fail", KR(ret), K(fetch_stream_cached_count)); } else if (OB_FAIL(rpc_result_pool_.init(rpc_result_cached_count))) { LOG_ERROR("init rpc result pool fail", KR(ret), K(rpc_result_cached_count)); + } else if (OB_FAIL(tenant_fetch_traffic_map_.init("TrafficMap"))) { + LOG_ERROR("tenant_fetch_traffic_map_ init fail", KR(ret)); } else { rpc_ = &rpc; stream_worker_ = &stream_worker; @@ -81,6 +84,7 @@ void ObFsContainerMgr::destroy() (void)fsc_map_.destroy(); fsc_pool_.destroy(); fs_pool_.destroy(); + tenant_fetch_traffic_map_.destroy(); } } @@ -166,7 +170,6 @@ int ObFsContainerMgr::get_fsc(const logservice::TenantLSID &tls_id, void ObFsContainerMgr::print_stat() { int ret = OB_SUCCESS; - SvrStreamStatFunc svr_stream_stat_func; int64_t alloc_count = fsc_pool_.get_alloc_count(); int64_t free_count = fsc_pool_.get_free_count(); @@ -180,11 +183,54 @@ void ObFsContainerMgr::print_stat() fs_pool_.print_stat(); rpc_result_pool_.print_stat(); - // Statistics every FetchStreamContainer - if (OB_FAIL(fsc_map_.for_each(svr_stream_stat_func))) { - LOG_ERROR("for each FetchStreamContainer map fail", KR(ret)); + tenant_fetch_traffic_map_.clear(); + TenantStreamStatFunc tenant_stream_stat_func(&tenant_fetch_traffic_map_); + TenantStreamStatPrinter tenant_stream_stat_printer; + + if (OB_FAIL(fsc_map_.for_each(tenant_stream_stat_func))) { + LOG_ERROR("TenantStreamStatFunc for each FetchStreamContainer map fail", KR(ret)); + } else if (OB_FAIL(tenant_fetch_traffic_map_.for_each(tenant_stream_stat_printer))) { + LOG_ERROR("TenantStreamStatPrinter for each tenant fetch traffic map fail", KR(ret)); + } else { } } +bool ObFsContainerMgr::TenantStreamStatFunc::operator() (const logservice::TenantLSID &key, FetchStreamContainer *value) +{ + int ret = OB_SUCCESS; + int64_t tenant_traffic = 0; + int64_t ls_traffic = 0; + TenantID tenant_id(key.get_tenant_id()); + + if (OB_ISNULL(tenant_fetch_traffic_map_)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tenant_fetch_traffic_map_ is NULL", KR(ret)); + } else if (OB_FAIL(tenant_fetch_traffic_map_->get(tenant_id, tenant_traffic))) { + if (OB_ENTRY_NOT_EXIST != ret) { + LOG_ERROR("tenant_fetch_traffic_map_ get fail", KR(ret), K(tenant_id)); + } else { + ret = OB_SUCCESS; + } + } + + if (OB_SUCC(ret)) { + (void) value->do_stat(ls_traffic); + tenant_traffic += ls_traffic; + if (OB_FAIL(tenant_fetch_traffic_map_->insert_or_update(tenant_id, tenant_traffic))) { + LOG_ERROR("tenant_fetch_traffic_map_ insert_or_update fail", KR(ret), K(tenant_id), K(tenant_traffic)); + } + } + + return OB_SUCCESS == ret; +} + + +bool ObFsContainerMgr::TenantStreamStatPrinter::operator() (const TenantID &tenant_id, const int64_t traffic) +{ + _LOG_INFO("[STAT] [FETCH_STREAM] TENANT=%lu, TRAFFIC=%s/sec", tenant_id.tenant_id_, SIZE_TO_STR(traffic)); + return true; +} + + } // namespace libobcdc } // namespace oceanbase diff --git a/src/logservice/libobcdc/src/ob_log_fetch_stream_container_mgr.h b/src/logservice/libobcdc/src/ob_log_fetch_stream_container_mgr.h index b620734f36..ca072e341e 100644 --- a/src/logservice/libobcdc/src/ob_log_fetch_stream_container_mgr.h +++ b/src/logservice/libobcdc/src/ob_log_fetch_stream_container_mgr.h @@ -19,6 +19,7 @@ #include "ob_log_fetch_stream_container.h" // FetchStreamContainer #include "ob_log_fetch_stream_pool.h" // FetchStreamPool #include "ob_log_fetch_log_rpc.h" // FetchLogARpcResultPool +#include "ob_log_tenant.h" namespace oceanbase { @@ -72,8 +73,9 @@ private: bool operator() (const logservice::TenantLSID &key, FetchStreamContainer* value) { UNUSED(key); + int64_t traffic = 0; if (NULL != value) { - value->do_stat(); + value->do_stat(traffic); } return true; } @@ -81,21 +83,35 @@ private: typedef common::ObLinearHashMap FscMap; typedef common::ObSmallObjPool FscPool; + typedef common::ObLinearHashMap TenantFetchInfoMap; // Map of tenant_id -> fetch log traffic // TODO static const int64_t SVR_STREAM_POOL_BLOCK_SIZE = 1 << 22; + struct TenantStreamStatFunc + { + TenantStreamStatFunc(TenantFetchInfoMap *tenant_fetch_traffic_map) : tenant_fetch_traffic_map_(tenant_fetch_traffic_map) {} + bool operator() (const logservice::TenantLSID &key, FetchStreamContainer *value); + TenantFetchInfoMap *tenant_fetch_traffic_map_; + }; + + struct TenantStreamStatPrinter + { + bool operator() (const TenantID &tenant_id, const int64_t traffic); + }; + private: bool is_inited_; // External modules - IObLogRpc *rpc_; // RPC handler - IObLSWorker *stream_worker_; // Stream master - PartProgressController *progress_controller_; // progress controller + IObLogRpc *rpc_; // RPC handler + IObLSWorker *stream_worker_; // Stream master + PartProgressController *progress_controller_; // progress controller FscMap fsc_map_; - FscPool fsc_pool_; // Supports multi-threaded alloc/release - FetchStreamPool fs_pool_; // FetchStream object pool - FetchLogARpcResultPool rpc_result_pool_; // RPC resujt object pool + FscPool fsc_pool_; // Supports multi-threaded alloc/release + FetchStreamPool fs_pool_; // FetchStream object pool + FetchLogARpcResultPool rpc_result_pool_; // RPC resujt object pool + TenantFetchInfoMap tenant_fetch_traffic_map_; // Tenant fetch log traffic map }; } // namespace libobcdc diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index fc6b6ef843..ccd2f4eac5 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -2198,6 +2198,7 @@ void ObLogInstance::timer_routine() resource_collector_->print_stat_info(); reader_->print_stat_info(); lob_aux_meta_storager_.print_stat_info(); + part_trans_parser_->print_stat_info(); } // Periodic memory recycling diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp index 6336e84c34..3e1feaf030 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp @@ -276,7 +276,7 @@ void FetchStream::configure(const ObLogConfig &config) LOG_INFO("[CONFIG]", K(print_stream_dispatch_info)); } -void FetchStream::do_stat() +void FetchStream::do_stat(int64_t &traffic) { ObByteLockGuard lock_guard(stat_lock_); @@ -305,7 +305,7 @@ void FetchStream::do_stat() ret = OB_ERR_UNEXPECTED; LOG_ERROR("ls_fetch_ctx_ is NULL", KR(ret), "fs", *this); } - + traffic = fsi_printer.get_traffic(); last_stat_time_ = cur_time; last_stat_info_ = cur_stat_info_; } diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h index 5a81a3ecc6..c4f107350d 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.h @@ -119,7 +119,7 @@ public: int get_upper_limit(int64_t &upper_limit_ns); // Execution Statistics - void do_stat(); + void do_stat(int64_t &traffic); int64_t get_fetch_task_count() const { return 1; } diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp index fdc3a6ba18..30be1aa8e3 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp @@ -38,7 +38,10 @@ ObLogPartTransParser::ObLogPartTransParser() : inited_(false), br_pool_(NULL), meta_manager_(NULL), - cluster_id_(OB_INVALID_CLUSTER_ID) + cluster_id_(OB_INVALID_CLUSTER_ID), + total_log_size_(0), + remaining_log_size_(0), + last_stat_time_(OB_INVALID_TIMESTAMP) {} ObLogPartTransParser::~ObLogPartTransParser() @@ -71,12 +74,36 @@ int ObLogPartTransParser::init( } else { cluster_id_ = cluster_id; inited_ = true; - + last_stat_time_ = get_timestamp(); LOG_INFO("init PartTransParser succ", K(cluster_id)); } return ret; } +void ObLogPartTransParser::print_stat_info() +{ + int64_t current_timestamp = get_timestamp(); + int64_t local_last_stat_time = last_stat_time_; + int64_t delta_time = current_timestamp - local_last_stat_time; + double delta_second = static_cast(delta_time) / static_cast(_SEC_); + int64_t total_traffic = 0; + int64_t remaining_traffic = 0; + int64_t filtered_out_traffic = 0; + if (0 < delta_second) { + total_traffic = static_cast(static_cast(total_log_size_) / delta_second); + remaining_traffic = static_cast(static_cast(remaining_log_size_) / delta_second); + filtered_out_traffic = static_cast(static_cast(total_log_size_ - remaining_log_size_) / delta_second); + } + + // Update last statistic value + last_stat_time_ = current_timestamp; + total_log_size_ = 0; + remaining_log_size_ = 0; + + _LOG_INFO("[STAT] [PARSER] TOTAL_TRAFFIC=%s/sec, REMAINING_TRAFFIC=%s/sec, FILTERED_OUT_TRAFFIC=%s/sec", + SIZE_TO_STR(total_traffic), SIZE_TO_STR(remaining_traffic), SIZE_TO_STR(filtered_out_traffic)); +} + int ObLogPartTransParser::parse(PartTransTask &task, volatile bool &stop_flag) { int ret = OB_SUCCESS; @@ -156,6 +183,7 @@ int ObLogPartTransParser::parse(ObLogEntryTask &task, volatile bool &stop_flag) task, *part_trans_task, row_index, stop_flag))) { LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), KPC(redo_node), K(task), K(row_index)); } else { + ATOMIC_AAF(&total_log_size_, redo_node->get_data_len()); LOG_DEBUG("[PARSE] LogEntryTask parse succ", K(task)); } } @@ -246,6 +274,7 @@ int ObLogPartTransParser::parse_stmts_( while (OB_SUCC(ret) && pos < redo_data_len) { bool need_filter_row = false; int32_t row_size = 0; + int64_t begin_pos = pos; MutatorType mutator_type = MutatorType::MUTATOR_ROW; // default type to mutator_row common::ObTabletID tablet_id; @@ -304,6 +333,8 @@ int ObLogPartTransParser::parse_stmts_( redo_log_entry_task, task))) { LOG_ERROR("parse_dml_stmts_ fail", KR(ret), K(row_index), K(*row), K(redo_log_entry_task), K(task)); + } else { + ATOMIC_AAF(&remaining_log_size_, pos - begin_pos); } if (OB_SUCC(ret)) { diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_parser.h b/src/logservice/libobcdc/src/ob_log_part_trans_parser.h index 2f8ab17d54..a1279427d8 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_parser.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_parser.h @@ -37,6 +37,8 @@ public: virtual int parse(PartTransTask &task, volatile bool &stop_flag) = 0; virtual int parse(ObLogEntryTask &task, volatile bool &stop_flag) = 0; + + virtual void print_stat_info() = 0; }; @@ -62,6 +64,7 @@ public: IObLogMetaManager *meta_manager, const int64_t cluster_id); void destroy(); + void print_stat_info(); private: int check_row_need_rollback_( @@ -160,6 +163,11 @@ private: // Set as the unique ID of the DDL int64_t cluster_id_; + // Stat for White Black List + int64_t total_log_size_; + int64_t remaining_log_size_; + int64_t last_stat_time_; + private: DISALLOW_COPY_AND_ASSIGN(ObLogPartTransParser); }; diff --git a/src/logservice/logfetcher/ob_log_fetch_stat_info.cpp b/src/logservice/logfetcher/ob_log_fetch_stat_info.cpp index 56b480f2be..7a8171b892 100644 --- a/src/logservice/logfetcher/ob_log_fetch_stat_info.cpp +++ b/src/logservice/logfetcher/ob_log_fetch_stat_info.cpp @@ -474,5 +474,14 @@ int64_t FetchStatInfoPrinter::to_string(char* buf, const int64_t buf_len) const return pos; } +int64_t FetchStatInfoPrinter::get_traffic() const +{ + int64_t traffic = 0; + if (0 < delta_second_) { + traffic = static_cast(static_cast(delta_fsi_.fetch_log_size_) / delta_second_); + } + return traffic; +} + } } diff --git a/src/logservice/logfetcher/ob_log_fetch_stat_info.h b/src/logservice/logfetcher/ob_log_fetch_stat_info.h index e360a38255..048957f849 100644 --- a/src/logservice/logfetcher/ob_log_fetch_stat_info.h +++ b/src/logservice/logfetcher/ob_log_fetch_stat_info.h @@ -207,6 +207,8 @@ struct FetchStatInfoPrinter int64_t to_string(char* buf, const int64_t buf_len) const; + int64_t get_traffic() const; + FetchStatInfo delta_fsi_; const double delta_second_; diff --git a/src/logservice/logfetcher/ob_log_fetch_stream_container.cpp b/src/logservice/logfetcher/ob_log_fetch_stream_container.cpp index b7abf4c403..25b8f46df3 100644 --- a/src/logservice/logfetcher/ob_log_fetch_stream_container.cpp +++ b/src/logservice/logfetcher/ob_log_fetch_stream_container.cpp @@ -95,7 +95,7 @@ int FetchStreamContainer::dispatch(LSFetchCtx &task, return ret; } -void FetchStreamContainer::do_stat() +void FetchStreamContainer::do_stat(int64_t &traffic) { // Add read locks to allow concurrent lookups and inserts SpinRLockGuard guard(lock_); @@ -103,7 +103,7 @@ void FetchStreamContainer::do_stat() FetchStream *fs = fs_list_.head(); while (NULL != fs) { if (fs->get_fetch_task_count() > 0) { - fs->do_stat(); + fs->do_stat(traffic); } fs = fs->get_next(); } diff --git a/src/logservice/logfetcher/ob_log_fetch_stream_container.h b/src/logservice/logfetcher/ob_log_fetch_stream_container.h index 1159a5bc1c..56bef5653c 100644 --- a/src/logservice/logfetcher/ob_log_fetch_stream_container.h +++ b/src/logservice/logfetcher/ob_log_fetch_stream_container.h @@ -55,7 +55,7 @@ public: const common::ObAddr &request_svr); // print stat info - void do_stat(); + void do_stat(int64_t &traffic); public: TO_STRING_KV("stype", print_fetch_stream_type(stype_), diff --git a/src/logservice/logfetcher/ob_log_fetch_stream_container_mgr.cpp b/src/logservice/logfetcher/ob_log_fetch_stream_container_mgr.cpp index 653f3b52b4..80df3e476d 100644 --- a/src/logservice/logfetcher/ob_log_fetch_stream_container_mgr.cpp +++ b/src/logservice/logfetcher/ob_log_fetch_stream_container_mgr.cpp @@ -198,7 +198,6 @@ int ObFsContainerMgr::get_fsc(const logservice::TenantLSID &tls_id, void ObFsContainerMgr::print_stat() { int ret = OB_SUCCESS; - SvrStreamStatFunc svr_stream_stat_func; int64_t alloc_count = fsc_pool_.get_alloc_count(); int64_t free_count = fsc_pool_.get_free_count(); @@ -212,11 +211,21 @@ void ObFsContainerMgr::print_stat() fs_pool_.print_stat(); rpc_result_pool_.print_stat(); - // Statistics every FetchStreamContainer - if (OB_FAIL(fsc_map_.for_each(svr_stream_stat_func))) { + TenantStreamStatFunc tenant_stream_stat_func; + if (OB_FAIL(fsc_map_.for_each(tenant_stream_stat_func))) { LOG_ERROR("for each FetchStreamContainer map fail", KR(ret)); + } else { + _LOG_INFO("[STAT] [FETCH_STREAM] TENANT=%lu/sec, TRAFFIC=%s/sec", self_tenant_id_, SIZE_TO_STR(tenant_stream_stat_func.total_traffic_)); } } +bool ObFsContainerMgr::TenantStreamStatFunc::operator() (const logservice::TenantLSID &key, FetchStreamContainer *value) +{ + int64_t ls_traffic = 0; + (void) value->do_stat(ls_traffic); + total_traffic_ += ls_traffic; + return true; +} + } // namespace logfetcher } // namespace oceanbase diff --git a/src/logservice/logfetcher/ob_log_fetch_stream_container_mgr.h b/src/logservice/logfetcher/ob_log_fetch_stream_container_mgr.h index cea9bfc6f9..2fcb218bd9 100644 --- a/src/logservice/logfetcher/ob_log_fetch_stream_container_mgr.h +++ b/src/logservice/logfetcher/ob_log_fetch_stream_container_mgr.h @@ -75,8 +75,9 @@ private: bool operator() (const logservice::TenantLSID &key, FetchStreamContainer *value) { UNUSED(key); + int64_t traffic = 0; if (NULL != value) { - value->do_stat(); + value->do_stat(traffic); } return true; } @@ -86,6 +87,13 @@ private: typedef common::ObSmallObjPool FscPool; static const int64_t SVR_STREAM_POOL_BLOCK_SIZE = 1 << 22; + struct TenantStreamStatFunc + { + TenantStreamStatFunc() : total_traffic_(0) {} + bool operator() (const logservice::TenantLSID &key, FetchStreamContainer *value); + int64_t total_traffic_; + }; + private: bool is_inited_; diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp index 7b947da7f4..acc8740abd 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp @@ -320,7 +320,7 @@ void FetchStream::configure(const ObLogFetcherConfig &config) LOG_INFO("[CONFIG]", K(timer_task_wait_time_msec)); } -void FetchStream::do_stat() +void FetchStream::do_stat(int64_t &traffic) { ObByteLockGuard lock_guard(stat_lock_); @@ -349,7 +349,7 @@ void FetchStream::do_stat() ret = OB_ERR_UNEXPECTED; LOG_ERROR("ls_fetch_ctx_ is NULL", KR(ret), "fs", *this); } - + traffic = fsi_printer.get_traffic(); last_stat_time_ = cur_time; last_stat_info_ = cur_stat_info_; } diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_stream.h b/src/logservice/logfetcher/ob_log_ls_fetch_stream.h index 1713e8de9f..a92f8c08bb 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_stream.h +++ b/src/logservice/logfetcher/ob_log_ls_fetch_stream.h @@ -124,7 +124,7 @@ public: int get_upper_limit(int64_t &upper_limit_ns); // Execution Statistics - void do_stat(); + void do_stat(int64_t &traffic); int64_t get_fetch_task_count() const { return 1; }