From 20cba75ccffccba2808886f4f8084743cefdf574 Mon Sep 17 00:00:00 2001 From: suz-yang Date: Fri, 28 Jun 2024 06:11:27 +0000 Subject: [PATCH] Fix incremental direct load update dml stats --- .../ob_table_load_control_rpc_executor.cpp | 1 + .../ob_table_load_control_rpc_struct.cpp | 3 +- .../ob_table_load_control_rpc_struct.h | 7 +- .../table_load/ob_table_load_coordinator.cpp | 86 ++++++++++++------- .../table_load/ob_table_load_coordinator.h | 11 ++- .../table_load/ob_table_load_store.cpp | 13 ++- src/observer/table_load/ob_table_load_store.h | 2 + src/share/CMakeLists.txt | 1 + .../compaction/ob_schedule_batch_size_mgr.cpp | 1 + src/share/stat/ob_dbms_stats_executor.cpp | 5 +- src/share/stat/ob_dbms_stats_executor.h | 3 +- .../stat/ob_opt_stat_monitor_manager.cpp | 23 +++-- src/share/stat/ob_opt_stat_monitor_manager.h | 9 +- src/share/stat/ob_stat_define.cpp | 8 ++ src/share/stat/ob_stat_define.h | 2 + src/share/table/ob_table_load_dml_stat.cpp | 73 ++++++++++++++++ src/share/table/ob_table_load_dml_stat.h | 20 ++++- .../table/ob_table_load_sql_statistics.cpp | 30 +++++-- 18 files changed, 236 insertions(+), 62 deletions(-) create mode 100644 src/share/table/ob_table_load_dml_stat.cpp diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index 6e84dd68f1..3e966a4af4 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -275,6 +275,7 @@ int ObDirectLoadControlCommitExecutor::process() LOG_WARN("fail to init store", KR(ret)); } else if (OB_FAIL(store.commit(res_.result_info_, res_.sql_statistics_, + res_.dml_stats_, res_.trans_result_))) { LOG_WARN("fail to store commit", KR(ret)); } else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) { diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp index 9a4894c8a9..7c3a554154 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp @@ -203,7 +203,8 @@ OB_SERIALIZE_MEMBER(ObDirectLoadControlCommitArg, OB_SERIALIZE_MEMBER(ObDirectLoadControlCommitRes, result_info_, sql_statistics_, - trans_result_); + trans_result_, + dml_stats_); // abort OB_SERIALIZE_MEMBER(ObDirectLoadControlAbortArg, diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h index 9f8a972e73..48d8849a07 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h @@ -18,6 +18,7 @@ #include "observer/table_load/ob_table_load_struct.h" #include "share/table/ob_table_load_array.h" #include "share/table/ob_table_load_define.h" +#include "share/table/ob_table_load_dml_stat.h" #include "share/table/ob_table_load_sql_statistics.h" #include "sql/session/ob_sql_session_mgr.h" #include "storage/direct_load/ob_direct_load_struct.h" @@ -254,11 +255,15 @@ class ObDirectLoadControlCommitRes final public: ObDirectLoadControlCommitRes() {} - TO_STRING_KV(K_(result_info), K_(sql_statistics)) + TO_STRING_KV(K_(result_info), + K_(sql_statistics), + K_(trans_result), + K_(dml_stats)); public: table::ObTableLoadResultInfo result_info_; table::ObTableLoadSqlStatistics sql_statistics_; transaction::ObTxExecResult trans_result_; + table::ObTableLoadDmlStat dml_stats_; }; class ObDirectLoadControlAbortArg final diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index edc6671f3b..6e79f8f995 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -977,7 +977,8 @@ int ObTableLoadCoordinator::add_check_merge_result_task() * commit */ -int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistics) +int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistics, + ObTableLoadDmlStat &dml_stats) { int ret = OB_SUCCESS; ObTransService *txs = nullptr; @@ -1001,6 +1002,7 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic LOG_WARN("fail to init store", KR(ret)); } else if (OB_FAIL(store.commit(res.result_info_, res.sql_statistics_, + res.dml_stats_, res.trans_result_))) { LOG_WARN("fail to commit store", KR(ret)); } @@ -1014,6 +1016,8 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic ATOMIC_AAF(&coordinator_ctx_->result_info_.warnings_, res.result_info_.warnings_); if (OB_FAIL(sql_statistics.merge(res.sql_statistics_))) { LOG_WARN("fail to add result sql stats", KR(ret), K(addr), K(res)); + } else if (OB_FAIL(dml_stats.merge(res.dml_stats_))) { + LOG_WARN("fail to add result dml stats", KR(ret), K(addr), K(res)); } else if (ObDirectLoadMethod::is_incremental(param_.method_) && txs->add_tx_exec_result(*ctx_->session_info_->get_tx_desc(), res.trans_result_)) { LOG_WARN("fail to add tx exec result", KR(ret)); @@ -1024,23 +1028,54 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic return ret; } -int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statistics) +int ObTableLoadCoordinator::build_table_stat_param(ObTableStatParam ¶m, ObIAllocator &allocator) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + const uint64_t table_id = ctx_->ddl_param_.dest_table_id_; + param.tenant_id_ = tenant_id; + param.table_id_ = table_id; + param.part_level_ = ctx_->schema_.part_level_; + param.allocator_ = &allocator; + param.global_stat_param_.need_modify_ = true; + param.part_stat_param_.need_modify_ = false; + param.subpart_stat_param_.need_modify_ = false; + if (!ctx_->schema_.is_partitioned_table_) { + param.global_part_id_ = table_id; + param.global_tablet_id_ = table_id; + } + for (int64_t i = 0; OB_SUCC(ret) && i < ctx_->schema_.column_descs_.count(); ++i) { + const ObColDesc &col_desc = ctx_->schema_.column_descs_.at(i); + ObColumnStatParam col_param; + col_param.column_id_ = col_desc.col_id_; + col_param.cs_type_ = col_desc.col_type_.get_collation_type(); + if (OB_HIDDEN_PK_INCREMENT_COLUMN_ID == col_param.column_id_) { + // skip hidden pk + } else if (OB_FAIL(param.column_params_.push_back(col_param))) { + LOG_WARN("fail to push back column param", KR(ret)); + } + } + return ret; +} + +int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statistics, + ObTableLoadDmlStat &dml_stats) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); const uint64_t table_id = ctx_->ddl_param_.dest_table_id_; ObSchemaGetterGuard schema_guard; - if (OB_UNLIKELY(sql_statistics.is_empty())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(sql_statistics)); - } else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) { + if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) { LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id)); } else if (ObDirectLoadMethod::is_full(ctx_->param_.method_)) { // full direct load ObArray global_column_stats; ObArray global_table_stats; global_column_stats.set_tenant_id(MTL_ID()); global_table_stats.set_tenant_id(MTL_ID()); - if (OB_FAIL(sql_statistics.get_table_stat_array(global_table_stats))) { + if (OB_UNLIKELY(sql_statistics.is_empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(sql_statistics)); + } else if (OB_FAIL(sql_statistics.get_table_stat_array(global_table_stats))) { LOG_WARN("fail to get table stat array", KR(ret)); } else if (OB_FAIL(sql_statistics.get_col_stat_array(global_column_stats))) { LOG_WARN("fail to get column stat array", KR(ret)); @@ -1061,29 +1096,9 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist TabStatIndMap inc_table_stats; ColStatIndMap inc_column_stats; allocator.set_tenant_id(MTL_ID()); - param.tenant_id_ = tenant_id; - param.table_id_ = table_id; - param.part_level_ = ctx_->schema_.part_level_; - param.allocator_ = &allocator; - param.global_stat_param_.need_modify_ = true; - param.part_stat_param_.need_modify_ = false; - param.subpart_stat_param_.need_modify_ = false; - if (!ctx_->schema_.is_partitioned_table_) { - param.global_part_id_ = table_id; - param.global_tablet_id_ = table_id; - } - for (int64_t i = 0; OB_SUCC(ret) && i < ctx_->schema_.column_descs_.count(); ++i) { - const ObColDesc &col_desc = ctx_->schema_.column_descs_.at(i); - ObColumnStatParam col_param; - col_param.column_id_ = col_desc.col_id_; - col_param.cs_type_ = col_desc.col_type_.get_collation_type(); - if (OB_HIDDEN_PK_INCREMENT_COLUMN_ID == col_param.column_id_) { - // skip hidden pk - } else if (OB_FAIL(param.column_params_.push_back(col_param))) { - LOG_WARN("fail to push back column param", KR(ret)); - } - } - if (OB_FAIL(ret)) { + if (OB_UNLIKELY(sql_statistics.is_empty() || dml_stats.is_empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(sql_statistics), K(dml_stats)); } else if (OB_ISNULL(exec_ctx = coordinator_ctx_->exec_ctx_->get_exec_ctx())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected exec ctx is null", KR(ret)); @@ -1101,11 +1116,14 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist LOG_WARN("fail to get table stat array", KR(ret)); } else if (OB_FAIL(sql_statistics.get_col_stats(inc_column_stats))) { LOG_WARN("fail to get column stat array", KR(ret)); + } else if (OB_FAIL(build_table_stat_param(param, allocator))) { + LOG_WARN("fail to build table stat param", KR(ret)); } else if (OB_FAIL(ObDbmsStatsExecutor::update_online_stat(*exec_ctx, param, &schema_guard, inc_table_stats, - inc_column_stats))) { + inc_column_stats, + &dml_stats.dml_stat_array_))) { LOG_WARN("fail to update online stat", KR(ret)); } } @@ -1122,12 +1140,14 @@ int ObTableLoadCoordinator::commit(ObTableLoadResultInfo &result_info) LOG_INFO("coordinator commit"); ObMutexGuard guard(coordinator_ctx_->get_op_lock()); ObTableLoadSqlStatistics sql_statistics; + ObTableLoadDmlStat dml_stats; if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check coordinator status", KR(ret)); - } else if (OB_FAIL(commit_peers(sql_statistics))) { + } else if (OB_FAIL(commit_peers(sql_statistics, dml_stats))) { LOG_WARN("fail to commit peers", KR(ret)); } else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) { - } else if (param_.online_opt_stat_gather_ && OB_FAIL(write_sql_stat(sql_statistics))) { + } else if (param_.online_opt_stat_gather_ && + OB_FAIL(write_sql_stat(sql_statistics, dml_stats))) { LOG_WARN("fail to write sql stat", KR(ret)); } else if (OB_FAIL(coordinator_ctx_->set_status_commit())) { LOG_WARN("fail to set coordinator status commit", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_coordinator.h b/src/observer/table_load/ob_table_load_coordinator.h index 352f48c40c..aba76ffd0b 100644 --- a/src/observer/table_load/ob_table_load_coordinator.h +++ b/src/observer/table_load/ob_table_load_coordinator.h @@ -16,6 +16,7 @@ #include "observer/table_load/ob_table_load_struct.h" #include "share/table/ob_table_load_array.h" #include "share/table/ob_table_load_define.h" +#include "share/table/ob_table_load_dml_stat.h" #include "share/table/ob_table_load_sql_statistics.h" #include "share/table/ob_table_load_row_array.h" #include "observer/table_load/resource/ob_table_load_resource_rpc_struct.h" @@ -65,6 +66,13 @@ private: int gen_apply_arg(ObDirectLoadResourceApplyArg &apply_arg); int pre_begin_peers(ObDirectLoadResourceApplyArg &apply_arg); int confirm_begin_peers(); + int commit_peers(table::ObTableLoadSqlStatistics &sql_statistics, + table::ObTableLoadDmlStat &dml_stats); + int build_table_stat_param(ObTableStatParam ¶m, + common::ObIAllocator &allocator); + int write_sql_stat(table::ObTableLoadSqlStatistics &sql_statistics, + table::ObTableLoadDmlStat &dml_stats); + int heart_beat_peer(); private: int add_check_begin_result_task(); int check_peers_begin_result(bool &is_finish); @@ -73,9 +81,6 @@ private: public: int pre_merge_peers(); int start_merge_peers(); - int commit_peers(table::ObTableLoadSqlStatistics &sql_statistics); - int write_sql_stat(table::ObTableLoadSqlStatistics &sql_statistics); - int heart_beat_peer(); private: int add_check_merge_result_task(); int check_peers_merge_result(bool &is_finish); diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index ce60a25bea..b5f85f7580 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -420,10 +420,12 @@ int ObTableLoadStore::start_merge() int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlStatistics &sql_statistics, + ObTableLoadDmlStat &dml_stats, ObTxExecResult &trans_result) { int ret = OB_SUCCESS; sql_statistics.reset(); + dml_stats.reset(); trans_result.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -432,7 +434,6 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, LOG_INFO("store commit"); ObTransService *txs = nullptr; ObMutexGuard guard(store_ctx_->get_op_lock()); - ObTableLoadDmlStat dml_stats; if (OB_ISNULL(MTL(ObTransService *))) { ret = OB_ERR_SYS; LOG_WARN("trans service is null", KR(ret)); @@ -442,11 +443,15 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, LOG_WARN("fail to commit insert table", KR(ret)); } else if (ctx_->schema_.has_autoinc_column_ && OB_FAIL(store_ctx_->commit_autoinc_value())) { LOG_WARN("fail to commit sync auto increment value", KR(ret)); - } else if (OB_FAIL(ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(dml_stats.dml_stat_array_))) { + } + // 全量旁路导入的dml_stat在执行节点更新 + // 增量旁路导入的dml_stat收集到协调节点在事务中更新 + else if (ObDirectLoadMethod::is_full(param_.method_) && + OB_FAIL(ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(dml_stats.dml_stat_array_))) { LOG_WARN("fail to update dml stat info", KR(ret)); + } else if (ObDirectLoadMethod::is_full(param_.method_) && FALSE_IT(dml_stats.reset())) { } else if (ObDirectLoadMethod::is_incremental(param_.method_) && - txs->get_tx_exec_result(*ctx_->session_info_->get_tx_desc(), trans_result)) { - LOG_WARN("fail to get tx exec result", KR(ret)); + OB_FAIL(txs->get_tx_exec_result(*ctx_->session_info_->get_tx_desc(), trans_result))) { } else if (OB_FAIL(store_ctx_->set_status_commit())) { LOG_WARN("fail to set store status commit", KR(ret)); } else { diff --git a/src/observer/table_load/ob_table_load_store.h b/src/observer/table_load/ob_table_load_store.h index 283f52cb45..c49ed444b2 100644 --- a/src/observer/table_load/ob_table_load_store.h +++ b/src/observer/table_load/ob_table_load_store.h @@ -23,6 +23,7 @@ namespace oceanbase namespace table { class ObTableLoadSqlStatistics; +class ObTableLoadDmlStat; } // namespace table namespace observer { @@ -57,6 +58,7 @@ public: int start_merge(); int commit(table::ObTableLoadResultInfo &result_info, table::ObTableLoadSqlStatistics &sql_statistics, + table::ObTableLoadDmlStat &dml_stats, transaction::ObTxExecResult &trans_result); int get_status(table::ObTableLoadStatusType &status, int &error_code); int heart_beat(); diff --git a/src/share/CMakeLists.txt b/src/share/CMakeLists.txt index 364199c56f..329448d767 100644 --- a/src/share/CMakeLists.txt +++ b/src/share/CMakeLists.txt @@ -259,6 +259,7 @@ ob_set_subtarget(ob_share common_mixed table/ob_table.cpp table/ob_table_rpc_struct.cpp table/ob_table_load_define.cpp + table/ob_table_load_dml_stat.cpp table/ob_table_load_row.cpp transfer/ob_transfer_info.cpp transfer/ob_transfer_task_operator.cpp diff --git a/src/share/compaction/ob_schedule_batch_size_mgr.cpp b/src/share/compaction/ob_schedule_batch_size_mgr.cpp index 754a086c0f..9cb1752943 100644 --- a/src/share/compaction/ob_schedule_batch_size_mgr.cpp +++ b/src/share/compaction/ob_schedule_batch_size_mgr.cpp @@ -9,6 +9,7 @@ // See the Mulan PubL v2 for more details. #define USING_LOG_PREFIX STORAGE_COMPACTION #include "share/compaction/ob_schedule_batch_size_mgr.h" +#include "lib/oblog/ob_log.h" #include "lib/oblog/ob_log_module.h" namespace oceanbase diff --git a/src/share/stat/ob_dbms_stats_executor.cpp b/src/share/stat/ob_dbms_stats_executor.cpp index 9ece10b782..e6d474b4aa 100644 --- a/src/share/stat/ob_dbms_stats_executor.cpp +++ b/src/share/stat/ob_dbms_stats_executor.cpp @@ -1321,7 +1321,8 @@ int ObDbmsStatsExecutor::update_online_stat(ObExecContext &ctx, ObTableStatParam ¶m, share::schema::ObSchemaGetterGuard *schema_guard, const TabStatIndMap &online_table_stats, - const ColStatIndMap &online_column_stats) + const ColStatIndMap &online_column_stats, + const ObIArray *dml_stats) { int ret = OB_SUCCESS; ObArenaAllocator allocator("ObOnlineStat", OB_MALLOC_NORMAL_BLOCK_SIZE, param.tenant_id_); @@ -1363,6 +1364,8 @@ int ObDbmsStatsExecutor::update_online_stat(ObExecContext &ctx, need_reset_trx_lock_timeout, conn))) { LOG_WARN("failed to prepare conn and store session for online stats", K(ret)); + } else if (nullptr != dml_stats && ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(*dml_stats, conn)) { + LOG_WARN("fail to update dml stat info", K(ret)); } else if (OB_FAIL(ObDbmsStatsUtils::get_current_opt_stats(allocator, conn, param, diff --git a/src/share/stat/ob_dbms_stats_executor.h b/src/share/stat/ob_dbms_stats_executor.h index 08ef7a92db..b74b72f3fc 100644 --- a/src/share/stat/ob_dbms_stats_executor.h +++ b/src/share/stat/ob_dbms_stats_executor.h @@ -86,7 +86,8 @@ public: ObTableStatParam ¶m, share::schema::ObSchemaGetterGuard *schema_guard, const TabStatIndMap &online_table_stats, - const ColStatIndMap &online_column_stats); + const ColStatIndMap &online_column_stats, + const ObIArray *dml_stats = nullptr /*for_direct_load*/); static int cancel_gather_stats(ObExecContext &ctx, ObString &task_id); diff --git a/src/share/stat/ob_opt_stat_monitor_manager.cpp b/src/share/stat/ob_opt_stat_monitor_manager.cpp index ead2f44574..e2648ddc29 100644 --- a/src/share/stat/ob_opt_stat_monitor_manager.cpp +++ b/src/share/stat/ob_opt_stat_monitor_manager.cpp @@ -30,6 +30,7 @@ namespace oceanbase { using namespace observer; +using namespace sqlclient; namespace common { @@ -572,7 +573,8 @@ int ObOptStatMonitorManager::UpdateValueAtomicOp::operator() (common::hash::Hash return ret; } -int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(ObSqlString &values_sql) +int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(ObSqlString &values_sql, + ObISQLConnection *conn) { int ret = OB_SUCCESS; ObSqlString insert_sql; @@ -583,7 +585,11 @@ int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(ObSqlString &value LOG_WARN("failed to append format", K(ret)); } else if (OB_FAIL(insert_sql.append(ON_DUPLICATE_UPDATE_MONITOR_MODIFIED))) { LOG_WARN("failed to append string", K(ret)); - } else if (OB_FAIL(mysql_proxy_->write(tenant_id_, insert_sql.ptr(), affected_rows))) { + } else if (nullptr != conn && + OB_FAIL(conn->execute_write(tenant_id_, insert_sql.ptr(), affected_rows))) { + LOG_WARN("fail to exec sql", K(insert_sql), K(ret)); + } else if (nullptr == conn && + OB_FAIL(mysql_proxy_->write(tenant_id_, insert_sql.ptr(), affected_rows))) { LOG_WARN("fail to exec sql", K(insert_sql), K(ret)); } else { LOG_TRACE("succeed to exec insert monitor modified sql", K(tenant_id_), K(values_sql)); @@ -667,7 +673,9 @@ int ObOptStatMonitorManager::clean_useless_dml_stat_info() return ret; } -int ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(const ObIArray &dml_stats) +int ObOptStatMonitorManager::update_dml_stat_info_from_direct_load( + const ObIArray &dml_stats, + ObISQLConnection *conn) { int ret = OB_SUCCESS; ObOptStatMonitorManager *optstat_monitor_mgr = NULL; @@ -680,13 +688,14 @@ int ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(const ObIArra } else if (OB_ISNULL(optstat_monitor_mgr = MTL(ObOptStatMonitorManager*))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(ret), K(optstat_monitor_mgr)); - } else if (OB_FAIL(optstat_monitor_mgr->update_dml_stat_info(dml_stats))) { + } else if (OB_FAIL(optstat_monitor_mgr->update_dml_stat_info(dml_stats, conn))) { LOG_WARN("failed to update dml stat info", K(ret)); } else {/*do nothing*/} return ret; } -int ObOptStatMonitorManager::update_dml_stat_info(const ObIArray &dml_stats) +int ObOptStatMonitorManager::update_dml_stat_info(const ObIArray &dml_stats, + ObISQLConnection *conn) { int ret = OB_SUCCESS; ObSqlString value_sql; @@ -700,7 +709,7 @@ int ObOptStatMonitorManager::update_dml_stat_info(const ObIArray if (OB_FAIL(get_dml_stat_sql(*dml_stats.at(i), 0 != count, value_sql))) { LOG_WARN("failed to get dml stat sql", K(ret)); } else if (UPDATE_OPT_STAT_BATCH_CNT == ++count) { - if (OB_FAIL(exec_insert_monitor_modified_sql(value_sql))) { + if (OB_FAIL(exec_insert_monitor_modified_sql(value_sql, conn))) { LOG_WARN("failed to exec insert sql", K(ret)); } else { count = 0; @@ -710,7 +719,7 @@ int ObOptStatMonitorManager::update_dml_stat_info(const ObIArray } } if (OB_SUCC(ret) && count != 0) { - if (OB_FAIL(exec_insert_monitor_modified_sql(value_sql))) { + if (OB_FAIL(exec_insert_monitor_modified_sql(value_sql, conn))) { LOG_WARN("failed to exec insert sql", K(ret)); } } diff --git a/src/share/stat/ob_opt_stat_monitor_manager.h b/src/share/stat/ob_opt_stat_monitor_manager.h index ecb47062cd..df1e48ac04 100644 --- a/src/share/stat/ob_opt_stat_monitor_manager.h +++ b/src/share/stat/ob_opt_stat_monitor_manager.h @@ -109,7 +109,8 @@ public: int update_local_cache(ObOptDmlStat &dml_stat); int update_column_usage_info(const bool with_check); int update_dml_stat_info(); - int update_dml_stat_info(const ObIArray &dml_stats); + int update_dml_stat_info(const ObIArray &dml_stats, + common::sqlclient::ObISQLConnection *conn = nullptr); int get_column_usage_sql(const StatKey &col_key, const int64_t flags, const bool need_add_comma, @@ -118,7 +119,8 @@ public: const bool need_add_comma, ObSqlString &sql_string); int exec_insert_column_usage_sql(ObSqlString &values_sql); - int exec_insert_monitor_modified_sql(ObSqlString &values_sql); + int exec_insert_monitor_modified_sql(ObSqlString &values_sql, + common::sqlclient::ObISQLConnection *conn = nullptr); static int get_column_usage_from_table(sql::ObExecContext &ctx, ObIArray &column_params, uint64_t tenant_id, @@ -131,7 +133,8 @@ public: int check_table_writeable(bool &is_writeable); int generate_opt_stat_monitoring_info_rows(observer::ObOptDmlStatMapGetter &getter); int clean_useless_dml_stat_info(); - static int update_dml_stat_info_from_direct_load(const ObIArray &dml_stats); + static int update_dml_stat_info_from_direct_load(const ObIArray &dml_stats, + common::sqlclient::ObISQLConnection *conn = nullptr); int get_col_usage_info(const bool with_check, ObIArray &col_stat_keys, ObIArray &col_flags); diff --git a/src/share/stat/ob_stat_define.cpp b/src/share/stat/ob_stat_define.cpp index abcabb477d..5e6e271fcd 100644 --- a/src/share/stat/ob_stat_define.cpp +++ b/src/share/stat/ob_stat_define.cpp @@ -306,5 +306,13 @@ int64_t ObOptStatGatherParam::get_need_gather_column() const return valid_column; } +OB_SERIALIZE_MEMBER(ObOptDmlStat, + tenant_id_, + table_id_, + tablet_id_, + insert_row_count_, + update_row_count_, + delete_row_count_); + } } diff --git a/src/share/stat/ob_stat_define.h b/src/share/stat/ob_stat_define.h index 8695be95fc..b77a0962d2 100644 --- a/src/share/stat/ob_stat_define.h +++ b/src/share/stat/ob_stat_define.h @@ -827,6 +827,8 @@ enum ObOptDmlStatType { struct ObOptDmlStat { + OB_UNIS_VERSION(1); +public: ObOptDmlStat (): tenant_id_(0), table_id_(common::OB_INVALID_ID), diff --git a/src/share/table/ob_table_load_dml_stat.cpp b/src/share/table/ob_table_load_dml_stat.cpp new file mode 100644 index 0000000000..b256b347d4 --- /dev/null +++ b/src/share/table/ob_table_load_dml_stat.cpp @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2024 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX CLIENT + +#include "ob_table_load_dml_stat.h" + +namespace oceanbase +{ +namespace table +{ + +OB_DEF_SERIALIZE(ObTableLoadDmlStat) +{ + int ret = OB_SUCCESS; + OB_UNIS_ENCODE(dml_stat_array_.count()); + for (int64_t i = 0; OB_SUCC(ret) && i < dml_stat_array_.count(); i++) { + ObOptDmlStat *dml_stat = dml_stat_array_.at(i); + if (OB_ISNULL(dml_stat)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected dml stat is null", KR(ret)); + } else { + OB_UNIS_ENCODE(*dml_stat); + } + } + return ret; +} + +OB_DEF_DESERIALIZE(ObTableLoadDmlStat) +{ + int ret = OB_SUCCESS; + int64_t size = 0; + reset(); + OB_UNIS_DECODE(size); + for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) { + ObOptDmlStat *dml_stat = nullptr; + if (OB_FAIL(allocate_dml_stat(dml_stat))) { + LOG_WARN("fail to allocate dml stat", KR(ret)); + } else { + OB_UNIS_DECODE(*dml_stat); + } + } + return ret; +} + +OB_DEF_SERIALIZE_SIZE(ObTableLoadDmlStat) +{ + int ret = OB_SUCCESS; + int64_t len = 0; + OB_UNIS_ADD_LEN(dml_stat_array_.count()); + for (int64_t i = 0; OB_SUCC(ret) && i < dml_stat_array_.count(); i++) { + ObOptDmlStat *dml_stat = dml_stat_array_.at(i); + if (OB_ISNULL(dml_stat)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected dml stat is null", KR(ret)); + } else { + OB_UNIS_ADD_LEN(*dml_stat); + } + } + return len; +} + +} // namespace table +} // namespace oceanbase diff --git a/src/share/table/ob_table_load_dml_stat.h b/src/share/table/ob_table_load_dml_stat.h index 07e45f0396..302811e983 100644 --- a/src/share/table/ob_table_load_dml_stat.h +++ b/src/share/table/ob_table_load_dml_stat.h @@ -24,6 +24,7 @@ namespace table { struct ObTableLoadDmlStat { + OB_UNIS_VERSION(1); public: ObTableLoadDmlStat() : allocator_("TLD_Dmlstat") { @@ -42,7 +43,7 @@ public: dml_stat_array_.reset(); allocator_.reset(); } - bool is_empty() const { return dml_stat_array_.count() == 0; } + bool is_empty() const { return dml_stat_array_.empty(); } int allocate_dml_stat(ObOptDmlStat *&dml_stat) { int ret = OB_SUCCESS; @@ -64,6 +65,23 @@ public: } return ret; } + int merge(const ObTableLoadDmlStat &other) + { + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < other.dml_stat_array_.count(); i++) { + ObOptDmlStat *dml_stat = other.dml_stat_array_.at(i); + ObOptDmlStat *new_dml_stat = nullptr; + if (OB_ISNULL(dml_stat)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "unexpected dml stat is null", KR(ret)); + } else if (OB_FAIL(allocate_dml_stat(new_dml_stat))) { + OB_LOG(WARN, "fail to allocate dml stat", KR(ret)); + } else { + *new_dml_stat = *dml_stat; + } + } + return ret; + } TO_STRING_KV(K_(dml_stat_array)); public: common::ObArray dml_stat_array_; diff --git a/src/share/table/ob_table_load_sql_statistics.cpp b/src/share/table/ob_table_load_sql_statistics.cpp index 5aaa0fdee2..93ef1986d4 100644 --- a/src/share/table/ob_table_load_sql_statistics.cpp +++ b/src/share/table/ob_table_load_sql_statistics.cpp @@ -301,13 +301,21 @@ OB_DEF_SERIALIZE(ObTableLoadSqlStatistics) int ret = OB_SUCCESS; OB_UNIS_ENCODE(table_stat_array_.count()); for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) { - if (table_stat_array_.at(i) != nullptr) { - OB_UNIS_ENCODE(*table_stat_array_.at(i)); + ObOptTableStat *table_stat = table_stat_array_.at(i); + if (OB_ISNULL(table_stat)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "unexpected table stat is null", KR(ret)); + } else { + OB_UNIS_ENCODE(*table_stat); } } OB_UNIS_ENCODE(col_stat_array_.count()); for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) { - if (col_stat_array_.at(i) != nullptr) { + ObOptOSGColumnStat *col_stat = col_stat_array_.at(i); + if (OB_ISNULL(col_stat)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "unexpected col stat is null", KR(ret)); + } else { OB_UNIS_ENCODE(*col_stat_array_.at(i)); } } @@ -379,14 +387,22 @@ OB_DEF_SERIALIZE_SIZE(ObTableLoadSqlStatistics) int64_t len = 0; OB_UNIS_ADD_LEN(table_stat_array_.count()); for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) { - if (table_stat_array_.at(i) != nullptr) { - OB_UNIS_ADD_LEN(*table_stat_array_.at(i)); + ObOptTableStat *table_stat = table_stat_array_.at(i); + if (OB_ISNULL(table_stat)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "unexpected table stat is null", KR(ret)); + } else { + OB_UNIS_ADD_LEN(*table_stat); } } OB_UNIS_ADD_LEN(col_stat_array_.count()); for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) { - if (col_stat_array_.at(i) != nullptr) { - OB_UNIS_ADD_LEN(*col_stat_array_.at(i)); + ObOptOSGColumnStat *col_stat = col_stat_array_.at(i); + if (OB_ISNULL(col_stat)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "unexpected col stat is null", KR(ret)); + } else { + OB_UNIS_ADD_LEN(*col_stat); } } return len;