From 5801165afd183f1625548222d5d2d8a895dcbc01 Mon Sep 17 00:00:00 2001 From: suz-yang Date: Fri, 21 Jun 2024 06:50:49 +0000 Subject: [PATCH] fix incremental direct load update sql stats --- .../table_load/ob_table_load_client_task.cpp | 14 ++- .../table_load/ob_table_load_coordinator.cpp | 86 +++++++------------ .../engine/cmd/ob_table_direct_insert_ctx.cpp | 3 +- 3 files changed, 44 insertions(+), 59 deletions(-) diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index aba41e6fe..b0a320520 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -611,8 +611,9 @@ int ObTableLoadClientTask::init_instance() ObSchemaGetterGuard schema_guard; ObArray column_ids; ObCompressorType compressor_type = INVALID_COMPRESSOR; - if (OB_FAIL(GCTX.omt_->get_tenant(param_.get_tenant_id(), tenant))) { - LOG_WARN("fail to get tenant handle", KR(ret), K(param_.get_tenant_id())); + bool online_opt_stat_gather = false; + if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) { + LOG_WARN("fail to get tenant handle", KR(ret), K(tenant_id)); } else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) { LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id), K(table_id)); } else if (OB_FAIL(ObTableLoadService::check_support_direct_load(schema_guard, @@ -626,8 +627,11 @@ int ObTableLoadClientTask::init_instance() table_id, column_ids))) { LOG_WARN("fail to get user column ids", KR(ret)); - } else if (OB_FAIL(get_compressor_type(param_.get_tenant_id(), param_.get_table_id(), session_count_, compressor_type))) { + } else if (OB_FAIL(get_compressor_type(tenant_id, table_id, session_count_, compressor_type))) { LOG_WARN("fail to get compressor type", KR(ret)); + } else if (OB_FAIL(ObTableLoadSchema::get_tenant_optimizer_gather_stats_on_load( + tenant_id, online_opt_stat_gather))) { + LOG_WARN("fail to get tenant optimizer gather stats on load", KR(ret), K(tenant_id)); } else { ObTableLoadParam load_param; load_param.tenant_id_ = tenant_id; @@ -639,7 +643,9 @@ int ObTableLoadClientTask::init_instance() load_param.column_count_ = column_ids.count(); load_param.need_sort_ = true; load_param.px_mode_ = false; - load_param.online_opt_stat_gather_ = false; // 支持统计信息收集需要构造ObExecContext + load_param.online_opt_stat_gather_ = + (online_opt_stat_gather && + !ObDirectLoadMethod::is_incremental(method)); // 增量统计信息收集需要构造ObExecContext load_param.dup_action_ = param_.get_dup_action(); load_param.method_ = method; load_param.insert_mode_ = insert_mode; diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index d4bcc45ad..79598529b 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -28,6 +28,7 @@ #include "observer/table_load/ob_table_load_utils.h" #include "share/ob_share_util.h" #include "share/stat/ob_incremental_stat_estimator.h" +#include "share/stat/ob_dbms_stats_executor.h" #include "share/stat/ob_dbms_stats_utils.h" #include "observer/table_load/ob_table_load_index_long_wait.h" #include "observer/omt/ob_tenant.h" @@ -1052,32 +1053,38 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); const uint64_t table_id = ctx_->ddl_param_.dest_table_id_; - ObArenaAllocator allocator("TLD_Temp"); - ObArray global_column_stats; - ObArray global_table_stats; - allocator.set_tenant_id(MTL_ID()); - global_column_stats.set_tenant_id(MTL_ID()); - global_table_stats.set_tenant_id(MTL_ID()); + ObSchemaGetterGuard schema_guard; if (OB_UNLIKELY(sql_statistics.is_empty())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(sql_statistics)); - } else if (ObDirectLoadMethod::is_full(ctx_->param_.method_)) { + } else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) { + LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id)); + } else if (ObDirectLoadMethod::is_full(ctx_->param_.method_)) { // full direct load + ObArray global_column_stats; + ObArray global_table_stats; + global_column_stats.set_tenant_id(MTL_ID()); + global_table_stats.set_tenant_id(MTL_ID()); if (OB_FAIL(sql_statistics.get_table_stat_array(global_table_stats))) { LOG_WARN("fail to get table stat array", KR(ret)); } else if (OB_FAIL(sql_statistics.get_col_stat_array(global_column_stats))) { LOG_WARN("fail to get column stat array", KR(ret)); + } else if (OB_UNLIKELY(global_table_stats.empty() || global_column_stats.empty())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected empty sql stats", KR(ret), K(global_table_stats), K(global_column_stats)); + } else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write(&schema_guard, + ctx_->session_info_, + GCTX.sql_proxy_, + global_table_stats, + global_column_stats))) { + LOG_WARN("fail to split batch write", KR(ret), K(sql_statistics), K(global_table_stats), K(global_column_stats)); } - } else { + } else { // inc direct load + ObExecContext *exec_ctx = nullptr; + ObArenaAllocator allocator("TLD_Temp"); ObTableStatParam param; - ObMySQLTransaction trans; // for acquire inner conn - sqlclient::ObISQLConnection *conn = nullptr; - ObArray base_table_stats; - ObArray base_column_stats; TabStatIndMap inc_table_stats; ColStatIndMap inc_column_stats; - const int64_t map_size = ctx_->param_.column_count_; - base_table_stats.set_tenant_id(MTL_ID()); - base_column_stats.set_tenant_id(MTL_ID()); + allocator.set_tenant_id(MTL_ID()); param.tenant_id_ = tenant_id; param.table_id_ = table_id; param.part_level_ = ctx_->schema_.part_level_; @@ -1094,63 +1101,36 @@ int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statist ObColumnStatParam col_param; col_param.column_id_ = col_desc.col_id_; col_param.cs_type_ = col_desc.col_type_.get_collation_type(); - if (0 == i && ctx_->schema_.is_heap_table_) { + if (OB_HIDDEN_PK_INCREMENT_COLUMN_ID == col_param.column_id_) { // skip hidden pk } else if (OB_FAIL(param.column_params_.push_back(col_param))) { LOG_WARN("fail to push back column param", KR(ret)); } } if (OB_FAIL(ret)) { - } else if (OB_FAIL(trans.start(GCTX.sql_proxy_, tenant_id))) { - LOG_WARN("fail to start transaction", KR(ret)); - } else if (OB_ISNULL(conn = trans.get_connection())) { + } else if (OB_ISNULL(exec_ctx = coordinator_ctx_->exec_ctx_->get_exec_ctx())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected conn is null", KR(ret)); - } else if (OB_FAIL(inc_table_stats.create(map_size, + LOG_WARN("unexpected exec ctx is null", KR(ret)); + } else if (OB_FAIL(inc_table_stats.create(1, "TLD_TabStatBkt", "TLD_TabStatNode", tenant_id))) { LOG_WARN("fail to create table stats map", KR(ret)); - } else if (OB_FAIL(inc_column_stats.create(map_size, + } else if (OB_FAIL(inc_column_stats.create(ctx_->param_.column_count_, "TLD_ColStatBkt", "TLD_ColStatNode", tenant_id))) { LOG_WARN("fail to create column stats map", KR(ret)); - } else if (OB_FAIL(ObDbmsStatsUtils::get_current_opt_stats(allocator, - conn, - param, - base_table_stats, - base_column_stats))) { - LOG_WARN("failed to get current opt stats", KR(ret)); } else if (OB_FAIL(sql_statistics.get_table_stats(inc_table_stats))) { LOG_WARN("fail to get table stat array", KR(ret)); } else if (OB_FAIL(sql_statistics.get_col_stats(inc_column_stats))) { LOG_WARN("fail to get column stat array", KR(ret)); - } else if (OB_FAIL(ObDbmsStatsUtils::merge_tab_stats(param, - inc_table_stats, - base_table_stats, - global_table_stats))) { - LOG_WARN("fail to merge tab stats", KR(ret), K(base_table_stats)); - } else if (OB_FAIL(ObDbmsStatsUtils::merge_col_stats(param, - inc_column_stats, - base_column_stats, - global_column_stats))) { - LOG_WARN("fail to merge col stats", KR(ret), K(base_column_stats)); - } - } - if (OB_SUCC(ret)) { - ObSchemaGetterGuard schema_guard; - if (OB_UNLIKELY(global_table_stats.empty() || global_column_stats.empty())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected empty sql stats", KR(ret), K(global_table_stats), K(global_column_stats)); - } else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) { - LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id)); - } else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write(&schema_guard, - ctx_->session_info_, - GCTX.sql_proxy_, - global_table_stats, - global_column_stats))) { - LOG_WARN("fail to split batch write", KR(ret), K(sql_statistics), K(global_table_stats), K(global_column_stats)); + } else if (OB_FAIL(ObDbmsStatsExecutor::update_online_stat(*exec_ctx, + param, + &schema_guard, + inc_table_stats, + inc_column_stats))) { + LOG_WARN("fail to update online stat", KR(ret)); } } return ret; diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp index d77d4a307..34bf58861 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp @@ -115,12 +115,11 @@ int ObTableDirectInsertCtx::init( param.session_count_ = parallel; param.column_count_ = column_ids.count(); param.px_mode_ = true; - param.online_opt_stat_gather_ = true; + param.online_opt_stat_gather_ = is_online_gather_statistics_; param.need_sort_ = true; param.max_error_row_count_ = 0; param.dup_action_ = (enable_inc_replace ? sql::ObLoadDupActionType::LOAD_REPLACE : sql::ObLoadDupActionType::LOAD_STOP_ON_DUP); - param.online_opt_stat_gather_ = is_online_gather_statistics_; param.method_ = method; param.insert_mode_ = insert_mode; param.load_mode_ = load_mode;