Fix incremental direct load update dml stats

This commit is contained in:
suz-yang
2024-06-28 06:11:27 +00:00
committed by ob-robot
parent a7bdd6c86b
commit 20cba75ccf
18 changed files with 236 additions and 62 deletions

View File

@ -275,6 +275,7 @@ int ObDirectLoadControlCommitExecutor::process()
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.commit(res_.result_info_,
res_.sql_statistics_,
res_.dml_stats_,
res_.trans_result_))) {
LOG_WARN("fail to store commit", KR(ret));
} else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {

View File

@ -203,7 +203,8 @@ OB_SERIALIZE_MEMBER(ObDirectLoadControlCommitArg,
OB_SERIALIZE_MEMBER(ObDirectLoadControlCommitRes,
result_info_,
sql_statistics_,
trans_result_);
trans_result_,
dml_stats_);
// abort
OB_SERIALIZE_MEMBER(ObDirectLoadControlAbortArg,

View File

@ -18,6 +18,7 @@
#include "observer/table_load/ob_table_load_struct.h"
#include "share/table/ob_table_load_array.h"
#include "share/table/ob_table_load_define.h"
#include "share/table/ob_table_load_dml_stat.h"
#include "share/table/ob_table_load_sql_statistics.h"
#include "sql/session/ob_sql_session_mgr.h"
#include "storage/direct_load/ob_direct_load_struct.h"
@ -254,11 +255,15 @@ class ObDirectLoadControlCommitRes final
public:
ObDirectLoadControlCommitRes() {}
TO_STRING_KV(K_(result_info), K_(sql_statistics))
TO_STRING_KV(K_(result_info),
K_(sql_statistics),
K_(trans_result),
K_(dml_stats));
public:
table::ObTableLoadResultInfo result_info_;
table::ObTableLoadSqlStatistics sql_statistics_;
transaction::ObTxExecResult trans_result_;
table::ObTableLoadDmlStat dml_stats_;
};
class ObDirectLoadControlAbortArg final

View File

@ -977,7 +977,8 @@ int ObTableLoadCoordinator::add_check_merge_result_task()
* commit
*/
int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistics)
int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistics,
ObTableLoadDmlStat &dml_stats)
{
int ret = OB_SUCCESS;
ObTransService *txs = nullptr;
@ -1001,6 +1002,7 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.commit(res.result_info_,
res.sql_statistics_,
res.dml_stats_,
res.trans_result_))) {
LOG_WARN("fail to commit store", KR(ret));
}
@ -1014,6 +1016,8 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic
ATOMIC_AAF(&coordinator_ctx_->result_info_.warnings_, res.result_info_.warnings_);
if (OB_FAIL(sql_statistics.merge(res.sql_statistics_))) {
LOG_WARN("fail to add result sql stats", KR(ret), K(addr), K(res));
} else if (OB_FAIL(dml_stats.merge(res.dml_stats_))) {
LOG_WARN("fail to add result dml stats", KR(ret), K(addr), K(res));
} else if (ObDirectLoadMethod::is_incremental(param_.method_) &&
txs->add_tx_exec_result(*ctx_->session_info_->get_tx_desc(), res.trans_result_)) {
LOG_WARN("fail to add tx exec result", KR(ret));
@ -1024,23 +1028,54 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic
return ret;
}
int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statistics)
int ObTableLoadCoordinator::build_table_stat_param(ObTableStatParam &param, ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
const uint64_t table_id = ctx_->ddl_param_.dest_table_id_;
param.tenant_id_ = tenant_id;
param.table_id_ = table_id;
param.part_level_ = ctx_->schema_.part_level_;
param.allocator_ = &allocator;
param.global_stat_param_.need_modify_ = true;
param.part_stat_param_.need_modify_ = false;
param.subpart_stat_param_.need_modify_ = false;
if (!ctx_->schema_.is_partitioned_table_) {
param.global_part_id_ = table_id;
param.global_tablet_id_ = table_id;
}
for (int64_t i = 0; OB_SUCC(ret) && i < ctx_->schema_.column_descs_.count(); ++i) {
const ObColDesc &col_desc = ctx_->schema_.column_descs_.at(i);
ObColumnStatParam col_param;
col_param.column_id_ = col_desc.col_id_;
col_param.cs_type_ = col_desc.col_type_.get_collation_type();
if (OB_HIDDEN_PK_INCREMENT_COLUMN_ID == col_param.column_id_) {
// skip hidden pk
} else if (OB_FAIL(param.column_params_.push_back(col_param))) {
LOG_WARN("fail to push back column param", KR(ret));
}
}
return ret;
}
int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statistics,
ObTableLoadDmlStat &dml_stats)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
const uint64_t table_id = ctx_->ddl_param_.dest_table_id_;
ObSchemaGetterGuard schema_guard;
if (OB_UNLIKELY(sql_statistics.is_empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(sql_statistics));
} else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) {
if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (ObDirectLoadMethod::is_full(ctx_->param_.method_)) { // full direct load
ObArray<ObOptColumnStat *> global_column_stats;
ObArray<ObOptTableStat *> global_table_stats;
global_column_stats.set_tenant_id(MTL_ID());
global_table_stats.set_tenant_id(MTL_ID());
if (OB_FAIL(sql_statistics.get_table_stat_array(global_table_stats))) {
if (OB_UNLIKELY(sql_statistics.is_empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(sql_statistics));
} else if (OB_FAIL(sql_statistics.get_table_stat_array(global_table_stats))) {
LOG_WARN("fail to get table stat array", KR(ret));
} else if (OB_FAIL(sql_statistics.get_col_stat_array(global_column_stats))) {
LOG_WARN("fail to get column stat array", KR(ret));
@ -1061,29 +1096,9 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist
TabStatIndMap inc_table_stats;
ColStatIndMap inc_column_stats;
allocator.set_tenant_id(MTL_ID());
param.tenant_id_ = tenant_id;
param.table_id_ = table_id;
param.part_level_ = ctx_->schema_.part_level_;
param.allocator_ = &allocator;
param.global_stat_param_.need_modify_ = true;
param.part_stat_param_.need_modify_ = false;
param.subpart_stat_param_.need_modify_ = false;
if (!ctx_->schema_.is_partitioned_table_) {
param.global_part_id_ = table_id;
param.global_tablet_id_ = table_id;
}
for (int64_t i = 0; OB_SUCC(ret) && i < ctx_->schema_.column_descs_.count(); ++i) {
const ObColDesc &col_desc = ctx_->schema_.column_descs_.at(i);
ObColumnStatParam col_param;
col_param.column_id_ = col_desc.col_id_;
col_param.cs_type_ = col_desc.col_type_.get_collation_type();
if (OB_HIDDEN_PK_INCREMENT_COLUMN_ID == col_param.column_id_) {
// skip hidden pk
} else if (OB_FAIL(param.column_params_.push_back(col_param))) {
LOG_WARN("fail to push back column param", KR(ret));
}
}
if (OB_FAIL(ret)) {
if (OB_UNLIKELY(sql_statistics.is_empty() || dml_stats.is_empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(sql_statistics), K(dml_stats));
} else if (OB_ISNULL(exec_ctx = coordinator_ctx_->exec_ctx_->get_exec_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected exec ctx is null", KR(ret));
@ -1101,11 +1116,14 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist
LOG_WARN("fail to get table stat array", KR(ret));
} else if (OB_FAIL(sql_statistics.get_col_stats(inc_column_stats))) {
LOG_WARN("fail to get column stat array", KR(ret));
} else if (OB_FAIL(build_table_stat_param(param, allocator))) {
LOG_WARN("fail to build table stat param", KR(ret));
} else if (OB_FAIL(ObDbmsStatsExecutor::update_online_stat(*exec_ctx,
param,
&schema_guard,
inc_table_stats,
inc_column_stats))) {
inc_column_stats,
&dml_stats.dml_stat_array_))) {
LOG_WARN("fail to update online stat", KR(ret));
}
}
@ -1122,12 +1140,14 @@ int ObTableLoadCoordinator::commit(ObTableLoadResultInfo &result_info)
LOG_INFO("coordinator commit");
ObMutexGuard guard(coordinator_ctx_->get_op_lock());
ObTableLoadSqlStatistics sql_statistics;
ObTableLoadDmlStat dml_stats;
if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) {
LOG_WARN("fail to check coordinator status", KR(ret));
} else if (OB_FAIL(commit_peers(sql_statistics))) {
} else if (OB_FAIL(commit_peers(sql_statistics, dml_stats))) {
LOG_WARN("fail to commit peers", KR(ret));
} else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) {
} else if (param_.online_opt_stat_gather_ && OB_FAIL(write_sql_stat(sql_statistics))) {
} else if (param_.online_opt_stat_gather_ &&
OB_FAIL(write_sql_stat(sql_statistics, dml_stats))) {
LOG_WARN("fail to write sql stat", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->set_status_commit())) {
LOG_WARN("fail to set coordinator status commit", KR(ret));

View File

@ -16,6 +16,7 @@
#include "observer/table_load/ob_table_load_struct.h"
#include "share/table/ob_table_load_array.h"
#include "share/table/ob_table_load_define.h"
#include "share/table/ob_table_load_dml_stat.h"
#include "share/table/ob_table_load_sql_statistics.h"
#include "share/table/ob_table_load_row_array.h"
#include "observer/table_load/resource/ob_table_load_resource_rpc_struct.h"
@ -65,6 +66,13 @@ private:
int gen_apply_arg(ObDirectLoadResourceApplyArg &apply_arg);
int pre_begin_peers(ObDirectLoadResourceApplyArg &apply_arg);
int confirm_begin_peers();
int commit_peers(table::ObTableLoadSqlStatistics &sql_statistics,
table::ObTableLoadDmlStat &dml_stats);
int build_table_stat_param(ObTableStatParam &param,
common::ObIAllocator &allocator);
int write_sql_stat(table::ObTableLoadSqlStatistics &sql_statistics,
table::ObTableLoadDmlStat &dml_stats);
int heart_beat_peer();
private:
int add_check_begin_result_task();
int check_peers_begin_result(bool &is_finish);
@ -73,9 +81,6 @@ private:
public:
int pre_merge_peers();
int start_merge_peers();
int commit_peers(table::ObTableLoadSqlStatistics &sql_statistics);
int write_sql_stat(table::ObTableLoadSqlStatistics &sql_statistics);
int heart_beat_peer();
private:
int add_check_merge_result_task();
int check_peers_merge_result(bool &is_finish);

View File

@ -420,10 +420,12 @@ int ObTableLoadStore::start_merge()
int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info,
ObTableLoadSqlStatistics &sql_statistics,
ObTableLoadDmlStat &dml_stats,
ObTxExecResult &trans_result)
{
int ret = OB_SUCCESS;
sql_statistics.reset();
dml_stats.reset();
trans_result.reset();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -432,7 +434,6 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info,
LOG_INFO("store commit");
ObTransService *txs = nullptr;
ObMutexGuard guard(store_ctx_->get_op_lock());
ObTableLoadDmlStat dml_stats;
if (OB_ISNULL(MTL(ObTransService *))) {
ret = OB_ERR_SYS;
LOG_WARN("trans service is null", KR(ret));
@ -442,11 +443,15 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info,
LOG_WARN("fail to commit insert table", KR(ret));
} else if (ctx_->schema_.has_autoinc_column_ && OB_FAIL(store_ctx_->commit_autoinc_value())) {
LOG_WARN("fail to commit sync auto increment value", KR(ret));
} else if (OB_FAIL(ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(dml_stats.dml_stat_array_))) {
}
// 全量旁路导入的dml_stat在执行节点更新
// 增量旁路导入的dml_stat收集到协调节点在事务中更新
else if (ObDirectLoadMethod::is_full(param_.method_) &&
OB_FAIL(ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(dml_stats.dml_stat_array_))) {
LOG_WARN("fail to update dml stat info", KR(ret));
} else if (ObDirectLoadMethod::is_full(param_.method_) && FALSE_IT(dml_stats.reset())) {
} else if (ObDirectLoadMethod::is_incremental(param_.method_) &&
txs->get_tx_exec_result(*ctx_->session_info_->get_tx_desc(), trans_result)) {
LOG_WARN("fail to get tx exec result", KR(ret));
OB_FAIL(txs->get_tx_exec_result(*ctx_->session_info_->get_tx_desc(), trans_result))) {
} else if (OB_FAIL(store_ctx_->set_status_commit())) {
LOG_WARN("fail to set store status commit", KR(ret));
} else {

View File

@ -23,6 +23,7 @@ namespace oceanbase
namespace table
{
class ObTableLoadSqlStatistics;
class ObTableLoadDmlStat;
} // namespace table
namespace observer
{
@ -57,6 +58,7 @@ public:
int start_merge();
int commit(table::ObTableLoadResultInfo &result_info,
table::ObTableLoadSqlStatistics &sql_statistics,
table::ObTableLoadDmlStat &dml_stats,
transaction::ObTxExecResult &trans_result);
int get_status(table::ObTableLoadStatusType &status, int &error_code);
int heart_beat();

View File

@ -259,6 +259,7 @@ ob_set_subtarget(ob_share common_mixed
table/ob_table.cpp
table/ob_table_rpc_struct.cpp
table/ob_table_load_define.cpp
table/ob_table_load_dml_stat.cpp
table/ob_table_load_row.cpp
transfer/ob_transfer_info.cpp
transfer/ob_transfer_task_operator.cpp

View File

@ -9,6 +9,7 @@
// See the Mulan PubL v2 for more details.
#define USING_LOG_PREFIX STORAGE_COMPACTION
#include "share/compaction/ob_schedule_batch_size_mgr.h"
#include "lib/oblog/ob_log.h"
#include "lib/oblog/ob_log_module.h"
namespace oceanbase

View File

@ -1321,7 +1321,8 @@ int ObDbmsStatsExecutor::update_online_stat(ObExecContext &ctx,
ObTableStatParam &param,
share::schema::ObSchemaGetterGuard *schema_guard,
const TabStatIndMap &online_table_stats,
const ColStatIndMap &online_column_stats)
const ColStatIndMap &online_column_stats,
const ObIArray<ObOptDmlStat *> *dml_stats)
{
int ret = OB_SUCCESS;
ObArenaAllocator allocator("ObOnlineStat", OB_MALLOC_NORMAL_BLOCK_SIZE, param.tenant_id_);
@ -1363,6 +1364,8 @@ int ObDbmsStatsExecutor::update_online_stat(ObExecContext &ctx,
need_reset_trx_lock_timeout,
conn))) {
LOG_WARN("failed to prepare conn and store session for online stats", K(ret));
} else if (nullptr != dml_stats && ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(*dml_stats, conn)) {
LOG_WARN("fail to update dml stat info", K(ret));
} else if (OB_FAIL(ObDbmsStatsUtils::get_current_opt_stats(allocator,
conn,
param,

View File

@ -86,7 +86,8 @@ public:
ObTableStatParam &param,
share::schema::ObSchemaGetterGuard *schema_guard,
const TabStatIndMap &online_table_stats,
const ColStatIndMap &online_column_stats);
const ColStatIndMap &online_column_stats,
const ObIArray<ObOptDmlStat *> *dml_stats = nullptr /*for_direct_load*/);
static int cancel_gather_stats(ObExecContext &ctx, ObString &task_id);

View File

@ -30,6 +30,7 @@
namespace oceanbase
{
using namespace observer;
using namespace sqlclient;
namespace common
{
@ -572,7 +573,8 @@ int ObOptStatMonitorManager::UpdateValueAtomicOp::operator() (common::hash::Hash
return ret;
}
int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(ObSqlString &values_sql)
int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(ObSqlString &values_sql,
ObISQLConnection *conn)
{
int ret = OB_SUCCESS;
ObSqlString insert_sql;
@ -583,7 +585,11 @@ int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(ObSqlString &value
LOG_WARN("failed to append format", K(ret));
} else if (OB_FAIL(insert_sql.append(ON_DUPLICATE_UPDATE_MONITOR_MODIFIED))) {
LOG_WARN("failed to append string", K(ret));
} else if (OB_FAIL(mysql_proxy_->write(tenant_id_, insert_sql.ptr(), affected_rows))) {
} else if (nullptr != conn &&
OB_FAIL(conn->execute_write(tenant_id_, insert_sql.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(insert_sql), K(ret));
} else if (nullptr == conn &&
OB_FAIL(mysql_proxy_->write(tenant_id_, insert_sql.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(insert_sql), K(ret));
} else {
LOG_TRACE("succeed to exec insert monitor modified sql", K(tenant_id_), K(values_sql));
@ -667,7 +673,9 @@ int ObOptStatMonitorManager::clean_useless_dml_stat_info()
return ret;
}
int ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(const ObIArray<ObOptDmlStat *> &dml_stats)
int ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(
const ObIArray<ObOptDmlStat *> &dml_stats,
ObISQLConnection *conn)
{
int ret = OB_SUCCESS;
ObOptStatMonitorManager *optstat_monitor_mgr = NULL;
@ -680,13 +688,14 @@ int ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(const ObIArra
} else if (OB_ISNULL(optstat_monitor_mgr = MTL(ObOptStatMonitorManager*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(optstat_monitor_mgr));
} else if (OB_FAIL(optstat_monitor_mgr->update_dml_stat_info(dml_stats))) {
} else if (OB_FAIL(optstat_monitor_mgr->update_dml_stat_info(dml_stats, conn))) {
LOG_WARN("failed to update dml stat info", K(ret));
} else {/*do nothing*/}
return ret;
}
int ObOptStatMonitorManager::update_dml_stat_info(const ObIArray<ObOptDmlStat *> &dml_stats)
int ObOptStatMonitorManager::update_dml_stat_info(const ObIArray<ObOptDmlStat *> &dml_stats,
ObISQLConnection *conn)
{
int ret = OB_SUCCESS;
ObSqlString value_sql;
@ -700,7 +709,7 @@ int ObOptStatMonitorManager::update_dml_stat_info(const ObIArray<ObOptDmlStat *>
if (OB_FAIL(get_dml_stat_sql(*dml_stats.at(i), 0 != count, value_sql))) {
LOG_WARN("failed to get dml stat sql", K(ret));
} else if (UPDATE_OPT_STAT_BATCH_CNT == ++count) {
if (OB_FAIL(exec_insert_monitor_modified_sql(value_sql))) {
if (OB_FAIL(exec_insert_monitor_modified_sql(value_sql, conn))) {
LOG_WARN("failed to exec insert sql", K(ret));
} else {
count = 0;
@ -710,7 +719,7 @@ int ObOptStatMonitorManager::update_dml_stat_info(const ObIArray<ObOptDmlStat *>
}
}
if (OB_SUCC(ret) && count != 0) {
if (OB_FAIL(exec_insert_monitor_modified_sql(value_sql))) {
if (OB_FAIL(exec_insert_monitor_modified_sql(value_sql, conn))) {
LOG_WARN("failed to exec insert sql", K(ret));
}
}

View File

@ -109,7 +109,8 @@ public:
int update_local_cache(ObOptDmlStat &dml_stat);
int update_column_usage_info(const bool with_check);
int update_dml_stat_info();
int update_dml_stat_info(const ObIArray<ObOptDmlStat *> &dml_stats);
int update_dml_stat_info(const ObIArray<ObOptDmlStat *> &dml_stats,
common::sqlclient::ObISQLConnection *conn = nullptr);
int get_column_usage_sql(const StatKey &col_key,
const int64_t flags,
const bool need_add_comma,
@ -118,7 +119,8 @@ public:
const bool need_add_comma,
ObSqlString &sql_string);
int exec_insert_column_usage_sql(ObSqlString &values_sql);
int exec_insert_monitor_modified_sql(ObSqlString &values_sql);
int exec_insert_monitor_modified_sql(ObSqlString &values_sql,
common::sqlclient::ObISQLConnection *conn = nullptr);
static int get_column_usage_from_table(sql::ObExecContext &ctx,
ObIArray<ObColumnStatParam *> &column_params,
uint64_t tenant_id,
@ -131,7 +133,8 @@ public:
int check_table_writeable(bool &is_writeable);
int generate_opt_stat_monitoring_info_rows(observer::ObOptDmlStatMapGetter &getter);
int clean_useless_dml_stat_info();
static int update_dml_stat_info_from_direct_load(const ObIArray<ObOptDmlStat *> &dml_stats);
static int update_dml_stat_info_from_direct_load(const ObIArray<ObOptDmlStat *> &dml_stats,
common::sqlclient::ObISQLConnection *conn = nullptr);
int get_col_usage_info(const bool with_check,
ObIArray<StatKey> &col_stat_keys,
ObIArray<int64_t> &col_flags);

View File

@ -306,5 +306,13 @@ int64_t ObOptStatGatherParam::get_need_gather_column() const
return valid_column;
}
OB_SERIALIZE_MEMBER(ObOptDmlStat,
tenant_id_,
table_id_,
tablet_id_,
insert_row_count_,
update_row_count_,
delete_row_count_);
}
}

View File

@ -827,6 +827,8 @@ enum ObOptDmlStatType {
struct ObOptDmlStat
{
OB_UNIS_VERSION(1);
public:
ObOptDmlStat ():
tenant_id_(0),
table_id_(common::OB_INVALID_ID),

View File

@ -0,0 +1,73 @@
/**
* Copyright (c) 2024 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX CLIENT
#include "ob_table_load_dml_stat.h"
namespace oceanbase
{
namespace table
{
OB_DEF_SERIALIZE(ObTableLoadDmlStat)
{
int ret = OB_SUCCESS;
OB_UNIS_ENCODE(dml_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < dml_stat_array_.count(); i++) {
ObOptDmlStat *dml_stat = dml_stat_array_.at(i);
if (OB_ISNULL(dml_stat)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dml stat is null", KR(ret));
} else {
OB_UNIS_ENCODE(*dml_stat);
}
}
return ret;
}
OB_DEF_DESERIALIZE(ObTableLoadDmlStat)
{
int ret = OB_SUCCESS;
int64_t size = 0;
reset();
OB_UNIS_DECODE(size);
for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) {
ObOptDmlStat *dml_stat = nullptr;
if (OB_FAIL(allocate_dml_stat(dml_stat))) {
LOG_WARN("fail to allocate dml stat", KR(ret));
} else {
OB_UNIS_DECODE(*dml_stat);
}
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObTableLoadDmlStat)
{
int ret = OB_SUCCESS;
int64_t len = 0;
OB_UNIS_ADD_LEN(dml_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < dml_stat_array_.count(); i++) {
ObOptDmlStat *dml_stat = dml_stat_array_.at(i);
if (OB_ISNULL(dml_stat)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dml stat is null", KR(ret));
} else {
OB_UNIS_ADD_LEN(*dml_stat);
}
}
return len;
}
} // namespace table
} // namespace oceanbase

View File

@ -24,6 +24,7 @@ namespace table
{
struct ObTableLoadDmlStat
{
OB_UNIS_VERSION(1);
public:
ObTableLoadDmlStat() : allocator_("TLD_Dmlstat")
{
@ -42,7 +43,7 @@ public:
dml_stat_array_.reset();
allocator_.reset();
}
bool is_empty() const { return dml_stat_array_.count() == 0; }
bool is_empty() const { return dml_stat_array_.empty(); }
int allocate_dml_stat(ObOptDmlStat *&dml_stat)
{
int ret = OB_SUCCESS;
@ -64,6 +65,23 @@ public:
}
return ret;
}
int merge(const ObTableLoadDmlStat &other)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < other.dml_stat_array_.count(); i++) {
ObOptDmlStat *dml_stat = other.dml_stat_array_.at(i);
ObOptDmlStat *new_dml_stat = nullptr;
if (OB_ISNULL(dml_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unexpected dml stat is null", KR(ret));
} else if (OB_FAIL(allocate_dml_stat(new_dml_stat))) {
OB_LOG(WARN, "fail to allocate dml stat", KR(ret));
} else {
*new_dml_stat = *dml_stat;
}
}
return ret;
}
TO_STRING_KV(K_(dml_stat_array));
public:
common::ObArray<ObOptDmlStat *> dml_stat_array_;

View File

@ -301,13 +301,21 @@ OB_DEF_SERIALIZE(ObTableLoadSqlStatistics)
int ret = OB_SUCCESS;
OB_UNIS_ENCODE(table_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) {
if (table_stat_array_.at(i) != nullptr) {
OB_UNIS_ENCODE(*table_stat_array_.at(i));
ObOptTableStat *table_stat = table_stat_array_.at(i);
if (OB_ISNULL(table_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unexpected table stat is null", KR(ret));
} else {
OB_UNIS_ENCODE(*table_stat);
}
}
OB_UNIS_ENCODE(col_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) {
if (col_stat_array_.at(i) != nullptr) {
ObOptOSGColumnStat *col_stat = col_stat_array_.at(i);
if (OB_ISNULL(col_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unexpected col stat is null", KR(ret));
} else {
OB_UNIS_ENCODE(*col_stat_array_.at(i));
}
}
@ -379,14 +387,22 @@ OB_DEF_SERIALIZE_SIZE(ObTableLoadSqlStatistics)
int64_t len = 0;
OB_UNIS_ADD_LEN(table_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) {
if (table_stat_array_.at(i) != nullptr) {
OB_UNIS_ADD_LEN(*table_stat_array_.at(i));
ObOptTableStat *table_stat = table_stat_array_.at(i);
if (OB_ISNULL(table_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unexpected table stat is null", KR(ret));
} else {
OB_UNIS_ADD_LEN(*table_stat);
}
}
OB_UNIS_ADD_LEN(col_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) {
if (col_stat_array_.at(i) != nullptr) {
OB_UNIS_ADD_LEN(*col_stat_array_.at(i));
ObOptOSGColumnStat *col_stat = col_stat_array_.at(i);
if (OB_ISNULL(col_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unexpected col stat is null", KR(ret));
} else {
OB_UNIS_ADD_LEN(*col_stat);
}
}
return len;