fix incremental direct load update sql stats

This commit is contained in:
suz-yang 2024-06-21 06:50:49 +00:00 committed by ob-robot
parent cc59066e45
commit 5801165afd
3 changed files with 44 additions and 59 deletions

View File

@ -611,8 +611,9 @@ int ObTableLoadClientTask::init_instance()
ObSchemaGetterGuard schema_guard;
ObArray<uint64_t> column_ids;
ObCompressorType compressor_type = INVALID_COMPRESSOR;
if (OB_FAIL(GCTX.omt_->get_tenant(param_.get_tenant_id(), tenant))) {
LOG_WARN("fail to get tenant handle", KR(ret), K(param_.get_tenant_id()));
bool online_opt_stat_gather = false;
if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) {
LOG_WARN("fail to get tenant handle", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(ObTableLoadService::check_support_direct_load(schema_guard,
@ -626,8 +627,11 @@ int ObTableLoadClientTask::init_instance()
table_id,
column_ids))) {
LOG_WARN("fail to get user column ids", KR(ret));
} else if (OB_FAIL(get_compressor_type(param_.get_tenant_id(), param_.get_table_id(), session_count_, compressor_type))) {
} else if (OB_FAIL(get_compressor_type(tenant_id, table_id, session_count_, compressor_type))) {
LOG_WARN("fail to get compressor type", KR(ret));
} else if (OB_FAIL(ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load(
tenant_id, online_opt_stat_gather))) {
LOG_WARN("fail to get tenant optimizer gather stats on load", KR(ret), K(tenant_id));
} else {
ObTableLoadParam load_param;
load_param.tenant_id_ = tenant_id;
@ -639,7 +643,9 @@ int ObTableLoadClientTask::init_instance()
load_param.column_count_ = column_ids.count();
load_param.need_sort_ = true;
load_param.px_mode_ = false;
load_param.online_opt_stat_gather_ = false; // 支持统计信息收集需要构造ObExecContext
load_param.online_opt_stat_gather_ =
(online_opt_stat_gather &&
!ObDirectLoadMethod::is_incremental(method)); // 增量统计信息收集需要构造ObExecContext
load_param.dup_action_ = param_.get_dup_action();
load_param.method_ = method;
load_param.insert_mode_ = insert_mode;

View File

@ -28,6 +28,7 @@
#include "observer/table_load/ob_table_load_utils.h"
#include "share/ob_share_util.h"
#include "share/stat/ob_incremental_stat_estimator.h"
#include "share/stat/ob_dbms_stats_executor.h"
#include "share/stat/ob_dbms_stats_utils.h"
#include "observer/table_load/ob_table_load_index_long_wait.h"
#include "observer/omt/ob_tenant.h"
@ -1052,32 +1053,38 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
const uint64_t table_id = ctx_->ddl_param_.dest_table_id_;
ObArenaAllocator allocator("TLD_Temp");
ObArray<ObOptColumnStat *> global_column_stats;
ObArray<ObOptTableStat *> global_table_stats;
allocator.set_tenant_id(MTL_ID());
global_column_stats.set_tenant_id(MTL_ID());
global_table_stats.set_tenant_id(MTL_ID());
ObSchemaGetterGuard schema_guard;
if (OB_UNLIKELY(sql_statistics.is_empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(sql_statistics));
} else if (ObDirectLoadMethod::is_full(ctx_->param_.method_)) {
} else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (ObDirectLoadMethod::is_full(ctx_->param_.method_)) { // full direct load
ObArray<ObOptColumnStat *> global_column_stats;
ObArray<ObOptTableStat *> global_table_stats;
global_column_stats.set_tenant_id(MTL_ID());
global_table_stats.set_tenant_id(MTL_ID());
if (OB_FAIL(sql_statistics.get_table_stat_array(global_table_stats))) {
LOG_WARN("fail to get table stat array", KR(ret));
} else if (OB_FAIL(sql_statistics.get_col_stat_array(global_column_stats))) {
LOG_WARN("fail to get column stat array", KR(ret));
} else if (OB_UNLIKELY(global_table_stats.empty() || global_column_stats.empty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected empty sql stats", KR(ret), K(global_table_stats), K(global_column_stats));
} else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write(&schema_guard,
ctx_->session_info_,
GCTX.sql_proxy_,
global_table_stats,
global_column_stats))) {
LOG_WARN("fail to split batch write", KR(ret), K(sql_statistics), K(global_table_stats), K(global_column_stats));
}
} else {
} else { // inc direct load
ObExecContext *exec_ctx = nullptr;
ObArenaAllocator allocator("TLD_Temp");
ObTableStatParam param;
ObMySQLTransaction trans; // for acquire inner conn
sqlclient::ObISQLConnection *conn = nullptr;
ObArray<ObOptTableStat *> base_table_stats;
ObArray<ObOptColumnStat *> base_column_stats;
TabStatIndMap inc_table_stats;
ColStatIndMap inc_column_stats;
const int64_t map_size = ctx_->param_.column_count_;
base_table_stats.set_tenant_id(MTL_ID());
base_column_stats.set_tenant_id(MTL_ID());
allocator.set_tenant_id(MTL_ID());
param.tenant_id_ = tenant_id;
param.table_id_ = table_id;
param.part_level_ = ctx_->schema_.part_level_;
@ -1094,63 +1101,36 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist
ObColumnStatParam col_param;
col_param.column_id_ = col_desc.col_id_;
col_param.cs_type_ = col_desc.col_type_.get_collation_type();
if (0 == i && ctx_->schema_.is_heap_table_) {
if (OB_HIDDEN_PK_INCREMENT_COLUMN_ID == col_param.column_id_) {
// skip hidden pk
} else if (OB_FAIL(param.column_params_.push_back(col_param))) {
LOG_WARN("fail to push back column param", KR(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(trans.start(GCTX.sql_proxy_, tenant_id))) {
LOG_WARN("fail to start transaction", KR(ret));
} else if (OB_ISNULL(conn = trans.get_connection())) {
} else if (OB_ISNULL(exec_ctx = coordinator_ctx_->exec_ctx_->get_exec_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected conn is null", KR(ret));
} else if (OB_FAIL(inc_table_stats.create(map_size,
LOG_WARN("unexpected exec ctx is null", KR(ret));
} else if (OB_FAIL(inc_table_stats.create(1,
"TLD_TabStatBkt",
"TLD_TabStatNode",
tenant_id))) {
LOG_WARN("fail to create table stats map", KR(ret));
} else if (OB_FAIL(inc_column_stats.create(map_size,
} else if (OB_FAIL(inc_column_stats.create(ctx_->param_.column_count_,
"TLD_ColStatBkt",
"TLD_ColStatNode",
tenant_id))) {
LOG_WARN("fail to create column stats map", KR(ret));
} else if (OB_FAIL(ObDbmsStatsUtils::get_current_opt_stats(allocator,
conn,
param,
base_table_stats,
base_column_stats))) {
LOG_WARN("failed to get current opt stats", KR(ret));
} else if (OB_FAIL(sql_statistics.get_table_stats(inc_table_stats))) {
LOG_WARN("fail to get table stat array", KR(ret));
} else if (OB_FAIL(sql_statistics.get_col_stats(inc_column_stats))) {
LOG_WARN("fail to get column stat array", KR(ret));
} else if (OB_FAIL(ObDbmsStatsUtils::merge_tab_stats(param,
inc_table_stats,
base_table_stats,
global_table_stats))) {
LOG_WARN("fail to merge tab stats", KR(ret), K(base_table_stats));
} else if (OB_FAIL(ObDbmsStatsUtils::merge_col_stats(param,
inc_column_stats,
base_column_stats,
global_column_stats))) {
LOG_WARN("fail to merge col stats", KR(ret), K(base_column_stats));
}
}
if (OB_SUCC(ret)) {
ObSchemaGetterGuard schema_guard;
if (OB_UNLIKELY(global_table_stats.empty() || global_column_stats.empty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected empty sql stats", KR(ret), K(global_table_stats), K(global_column_stats));
} else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write(&schema_guard,
ctx_->session_info_,
GCTX.sql_proxy_,
global_table_stats,
global_column_stats))) {
LOG_WARN("fail to split batch write", KR(ret), K(sql_statistics), K(global_table_stats), K(global_column_stats));
} else if (OB_FAIL(ObDbmsStatsExecutor::update_online_stat(*exec_ctx,
param,
&schema_guard,
inc_table_stats,
inc_column_stats))) {
LOG_WARN("fail to update online stat", KR(ret));
}
}
return ret;

View File

@ -115,12 +115,11 @@ int ObTableDirectInsertCtx::init(
param.session_count_ = parallel;
param.column_count_ = column_ids.count();
param.px_mode_ = true;
param.online_opt_stat_gather_ = true;
param.online_opt_stat_gather_ = is_online_gather_statistics_;
param.need_sort_ = true;
param.max_error_row_count_ = 0;
param.dup_action_ = (enable_inc_replace ? sql::ObLoadDupActionType::LOAD_REPLACE
: sql::ObLoadDupActionType::LOAD_STOP_ON_DUP);
param.online_opt_stat_gather_ = is_online_gather_statistics_;
param.method_ = method;
param.insert_mode_ = insert_mode;
param.load_mode_ = load_mode;