From f1d9377aad93d2927de1f23e9c39bf10a135c81c Mon Sep 17 00:00:00 2001 From: yongshige <598633031@qq.com> Date: Wed, 1 Nov 2023 10:09:07 +0000 Subject: [PATCH] change sql_statistics collect mode --- .../ob_table_direct_load_rpc_executor.cpp | 44 +++- .../ob_table_direct_load_rpc_executor.h | 2 +- .../ob_table_load_control_rpc_executor.cpp | 2 +- .../table_load/ob_table_load_coordinator.cpp | 50 ++--- .../table_load/ob_table_load_coordinator.h | 4 +- .../ob_table_load_coordinator_ctx.cpp | 3 +- .../table_load/ob_table_load_merger.cpp | 63 +----- .../table_load/ob_table_load_merger.h | 1 - .../table_load/ob_table_load_store.cpp | 40 +--- src/observer/table_load/ob_table_load_store.h | 4 +- .../table_load/ob_table_load_store_ctx.cpp | 8 + .../table_load/ob_table_load_trans_store.cpp | 3 +- .../table/ob_table_load_sql_statistics.cpp | 97 +++++---- .../table/ob_table_load_sql_statistics.h | 8 +- .../engine/cmd/ob_load_data_direct_impl.cpp | 26 +-- .../ob_direct_load_fast_heap_table.cpp | 44 +--- .../ob_direct_load_fast_heap_table.h | 9 - ...ob_direct_load_fast_heap_table_builder.cpp | 67 +----- .../ob_direct_load_fast_heap_table_builder.h | 12 +- .../ob_direct_load_insert_table_ctx.cpp | 206 +++++++++++++++++- .../ob_direct_load_insert_table_ctx.h | 27 ++- .../direct_load/ob_direct_load_merge_ctx.cpp | 95 +------- .../direct_load/ob_direct_load_merge_ctx.h | 10 +- .../ob_direct_load_partition_merge_task.cpp | 87 +------- .../ob_direct_load_partition_merge_task.h | 10 +- .../ob_direct_load_table_store.cpp | 9 +- .../direct_load/ob_direct_load_table_store.h | 7 +- 27 files changed, 413 insertions(+), 525 deletions(-) diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index bc6de6eaf9..cb78e60983 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -23,6 +23,7 @@ #include "observer/table_load/ob_table_load_schema.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_table_ctx.h" +#include "lib/mysqlclient/ob_mysql_result.h" namespace oceanbase { @@ -33,6 +34,7 @@ using namespace omt; using namespace share::schema; using namespace sql; using namespace table; +using namespace common; // begin ObTableDirectLoadBeginExecutor::ObTableDirectLoadBeginExecutor( @@ -207,11 +209,49 @@ int ObTableDirectLoadBeginExecutor::process() return ret; } +int ObTableDirectLoadBeginExecutor::check_need_online_gather(const uint64_t tenant_id, bool &online_opt_stat_gather) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + sqlclient::ObMySQLResult *result = nullptr; + if (OB_FAIL(sql.assign_fmt("SELECT value FROM %s WHERE (zone, name, schema_version) in (select zone, name, max(schema_version) FROM %s" + " group by zone, name) and name = '%s'", + OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_SV__OPTIMIZER_GATHER_STATS_ON_LOAD))) { + LOG_WARN("fail to append sql", KR(ret), K(tenant_id)); + } else if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, sql.ptr()))) { + LOG_WARN("fail to execute sql", KR(ret), K(sql), K(tenant_id)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", KR(ret), K(sql), K(tenant_id)); + } else { + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END != ret) { + LOG_WARN("fail to get next row", KR(ret), K(tenant_id)); + } + } else { + ObString data; + EXTRACT_VARCHAR_FIELD_MYSQL(*result, "value", data); + if(0 == strcmp(data.ptr(), "1")) { + online_opt_stat_gather = true; + } + } + } + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } + } + } + return ret; +} + int ObTableDirectLoadBeginExecutor::create_table_ctx() { int ret = OB_SUCCESS; const uint64_t tenant_id = client_task_->tenant_id_; const uint64_t table_id = client_task_->table_id_; + bool online_opt_stat_gather = false; ObTableLoadDDLParam ddl_param; ObTableLoadParam param; // start redef table @@ -240,6 +280,8 @@ int ObTableDirectLoadBeginExecutor::create_table_ctx() ObTenant *tenant = nullptr; if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) { LOG_WARN("fail to get tenant", KR(ret), K(tenant_id)); + } else if (OB_FAIL(check_need_online_gather(tenant_id, online_opt_stat_gather))) { + LOG_WARN("fail to get tenant", KR(ret), K(tenant_id)); } else { param.tenant_id_ = tenant_id; param.table_id_ = table_id; @@ -250,7 +292,7 @@ int ObTableDirectLoadBeginExecutor::create_table_ctx() param.column_count_ = client_task_->column_names_.count(); param.need_sort_ = true; param.px_mode_ = false; - param.online_opt_stat_gather_ = false; + param.online_opt_stat_gather_ = online_opt_stat_gather; param.dup_action_ = arg_.dup_action_; if (OB_FAIL(param.normalize())) { LOG_WARN("fail to normalize param", KR(ret)); diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h index 743e0f0319..c86feb0c4e 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h @@ -71,7 +71,7 @@ protected: private: int create_table_ctx(); int do_begin(); - + int check_need_online_gather(const uint64_t tenant_id, bool &online_opt_stat_gather); private: ObTableLoadClientTask *client_task_; ObTableLoadTableCtx *table_ctx_; diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index e69e774114..f60ddd11e3 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -264,7 +264,7 @@ int ObDirectLoadControlCommitExecutor::process() ObTableLoadStore store(table_ctx); if (OB_FAIL(store.init())) { LOG_WARN("fail to init store", KR(ret)); - } else if (OB_FAIL(store.commit(res_.result_info_))) { + } else if (OB_FAIL(store.commit(res_.result_info_, res_.sql_statistics_))) { LOG_WARN("fail to store commit", KR(ret)); } else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) { LOG_WARN("fail to remove table ctx", KR(ret), K(key)); diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 231a6a9322..3700154038 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -28,6 +28,7 @@ #include "observer/table_load/ob_table_load_utils.h" #include "share/ob_share_util.h" #include "share/stat/ob_incremental_stat_estimator.h" +#include "share/stat/ob_dbms_stats_utils.h" namespace oceanbase { @@ -628,7 +629,7 @@ int ObTableLoadCoordinator::add_check_merge_result_task() * commit */ -int ObTableLoadCoordinator::commit_peers() +int ObTableLoadCoordinator::commit_peers(table::ObTableLoadSqlStatistics &sql_statistics) { int ret = OB_SUCCESS; ObTableLoadArray all_addr_array; @@ -646,7 +647,7 @@ int ObTableLoadCoordinator::commit_peers() ObTableLoadStore store(ctx_); if (OB_FAIL(store.init())) { LOG_WARN("fail to init store", KR(ret)); - } else if (OB_FAIL(store.commit(res.result_info_))) { + } else if (OB_FAIL(store.commit(res.result_info_, res.sql_statistics_))) { LOG_WARN("fail to commit store", KR(ret)); } } else { // 远端, 发送rpc @@ -657,6 +658,9 @@ int ObTableLoadCoordinator::commit_peers() ATOMIC_AAF(&coordinator_ctx_->result_info_.deleted_, res.result_info_.deleted_); ATOMIC_AAF(&coordinator_ctx_->result_info_.skipped_, res.result_info_.skipped_); ATOMIC_AAF(&coordinator_ctx_->result_info_.warnings_, res.result_info_.warnings_); + if (OB_FAIL(sql_statistics.merge(res.sql_statistics_))) { + LOG_WARN("fail to add result sql stats", KR(ret), K(addr), K(res)); + } } } } @@ -693,12 +697,10 @@ int ObTableLoadCoordinator::commit(ObTableLoadResultInfo &result_info) ObTableLoadSqlStatistics sql_statistics; if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check coordinator status", KR(ret)); - } else if (OB_FAIL(commit_peers())) { + } else if (OB_FAIL(commit_peers(sql_statistics))) { LOG_WARN("fail to commit peers", KR(ret)); } else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) { - } else if (param_.online_opt_stat_gather_ && - OB_FAIL( - drive_sql_stat(coordinator_ctx_->exec_ctx_->get_exec_ctx()))) { + } else if (param_.online_opt_stat_gather_ && OB_FAIL(write_sql_stat(sql_statistics))) { LOG_WARN("fail to drive sql stat", KR(ret)); } else if (OB_FAIL(commit_redef_table())) { LOG_WARN("fail to commit redef table", KR(ret)); @@ -725,12 +727,10 @@ int ObTableLoadCoordinator::px_commit_data() ObTableLoadSqlStatistics sql_statistics; if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check coordinator status", KR(ret)); - } else if (OB_FAIL(commit_peers())) { + } else if (OB_FAIL(commit_peers(sql_statistics))) { LOG_WARN("fail to commit peers", KR(ret)); } else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) { - } else if (param_.online_opt_stat_gather_ && - OB_FAIL( - drive_sql_stat(coordinator_ctx_->exec_ctx_->get_exec_ctx()))) { + } else if (param_.online_opt_stat_gather_ && OB_FAIL(write_sql_stat(sql_statistics))) { LOG_WARN("fail to drive sql stat", KR(ret)); } } @@ -758,32 +758,28 @@ int ObTableLoadCoordinator::px_commit_ddl() return ret; } -int ObTableLoadCoordinator::drive_sql_stat(ObExecContext *ctx) +int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statistics) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); const uint64_t table_id = ctx_->ddl_param_.dest_table_id_; + ObSEArray global_column_stats; + ObSEArray global_table_stats; ObSchemaGetterGuard schema_guard; - ObSchemaGetterGuard *tmp_schema_guard = nullptr; const ObTableSchema *table_schema = nullptr; - if (OB_UNLIKELY(nullptr == ctx)) { + if (OB_UNLIKELY(sql_statistics.is_empty())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), KPC(ctx)); - } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, + LOG_WARN("invalid args", KR(ret), K(sql_statistics)); + } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else { - tmp_schema_guard = ctx->get_virtual_table_ctx().schema_guard_; - ctx->get_sql_ctx()->schema_guard_ = &schema_guard; - ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx()); - } - if (OB_SUCC(ret)) { - if (OB_FAIL(ObIncrementalStatEstimator::derive_global_stat_by_direct_load( - *ctx, table_id))) { - LOG_WARN("fail to drive global stat by direct load", KR(ret)); - } - ctx->get_sql_ctx()->schema_guard_ = tmp_schema_guard; - ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx()); + } else if (OB_FAIL(sql_statistics.get_col_stat_array(global_column_stats))) { + LOG_WARN("failed to get column stat array"); + } else if (OB_FAIL(sql_statistics.get_table_stat_array(global_table_stats))) { + LOG_WARN("failed to get table stat array"); + } else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write(&schema_guard, ctx_->session_info_, GCTX.sql_proxy_, global_table_stats, global_column_stats))) { + LOG_WARN("failed to batch write stats", K(ret), K(sql_statistics.table_stat_array_), + K(sql_statistics.col_stat_array_)); } return ret; } diff --git a/src/observer/table_load/ob_table_load_coordinator.h b/src/observer/table_load/ob_table_load_coordinator.h index f6b7383b93..dfc03b1b40 100644 --- a/src/observer/table_load/ob_table_load_coordinator.h +++ b/src/observer/table_load/ob_table_load_coordinator.h @@ -60,9 +60,9 @@ private: int confirm_begin_peers(); int pre_merge_peers(); int start_merge_peers(); - int commit_peers(); + int commit_peers(table::ObTableLoadSqlStatistics &sql_statistics); int commit_redef_table(); - int drive_sql_stat(sql::ObExecContext *ctx); + int write_sql_stat(table::ObTableLoadSqlStatistics &sql_statistics); int heart_beat_peer(); private: int add_check_merge_result_task(); diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index b63bc952f8..07abe1aecb 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -100,8 +100,7 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &idx_array, LOG_WARN("ObTableLoadCoordinatorCtx init twice", KR(ret), KP(this)); } else if (OB_UNLIKELY( idx_array.count() != ctx_->param_.column_count_ || nullptr == exec_ctx || - !exec_ctx->is_valid() || - (ctx_->param_.online_opt_stat_gather_ && nullptr == exec_ctx->get_exec_ctx()))) { + !exec_ctx->is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(ctx_->param_), K(idx_array.count()), KPC(exec_ctx)); } else { diff --git a/src/observer/table_load/ob_table_load_merger.cpp b/src/observer/table_load/ob_table_load_merger.cpp index fec1500b96..53a9db05f2 100644 --- a/src/observer/table_load/ob_table_load_merger.cpp +++ b/src/observer/table_load/ob_table_load_merger.cpp @@ -40,8 +40,8 @@ using namespace table; class ObTableLoadMerger::MergeTaskProcessor : public ObITableLoadTaskProcessor { public: - MergeTaskProcessor(ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadMerger *merger) - : ObITableLoadTaskProcessor(task), ctx_(ctx), merger_(merger) + MergeTaskProcessor(ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadMerger *merger, int64_t thread_idx) + : ObITableLoadTaskProcessor(task), ctx_(ctx), merger_(merger), thread_idx_(thread_idx) { ctx_->inc_ref_count(); } @@ -63,7 +63,7 @@ public: ret = OB_SUCCESS; break; } - } else if (OB_FAIL(merge_task->process())) { + } else if (OB_FAIL(merge_task->process(thread_idx_))) { LOG_WARN("fail to process merge task", KR(ret)); } if (nullptr != merge_task) { @@ -75,6 +75,7 @@ public: private: ObTableLoadTableCtx *const ctx_; ObTableLoadMerger *const merger_; + int64_t thread_idx_; }; class ObTableLoadMerger::MergeTaskCallback : public ObITableLoadTaskCallback @@ -203,10 +204,8 @@ int ObTableLoadMerger::build_merge_ctx() merge_param.table_data_desc_ = store_ctx_->table_data_desc_; merge_param.datum_utils_ = &(store_ctx_->ctx_->schema_.datum_utils_); merge_param.col_descs_ = &(store_ctx_->ctx_->schema_.column_descs_); - merge_param.cmp_funcs_ = &(store_ctx_->ctx_->schema_.cmp_funcs_); merge_param.is_heap_table_ = store_ctx_->ctx_->schema_.is_heap_table_; merge_param.is_fast_heap_table_ = store_ctx_->is_fast_heap_table_; - merge_param.online_opt_stat_gather_ = param_.online_opt_stat_gather_; merge_param.insert_table_ctx_ = store_ctx_->insert_table_ctx_; merge_param.dml_row_handler_ = store_ctx_->error_row_handler_; if (OB_FAIL(merge_ctx_.init(merge_param, store_ctx_->ls_partition_ids_, @@ -365,58 +364,6 @@ int ObTableLoadMerger::collect_dml_stat(ObTableLoadDmlStat &dml_stats) return ret; } -int ObTableLoadMerger::collect_sql_statistics(ObTableLoadSqlStatistics &sql_statistics) -{ - int ret = OB_SUCCESS; - if (store_ctx_->is_fast_heap_table_) { - ObDirectLoadMultiMap tables; - ObArray trans_store_array; - if (OB_FAIL(tables.init())) { - LOG_WARN("fail to init table", KR(ret)); - } else if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) { - LOG_WARN("fail to get trans store", KR(ret)); - } else { - for (int i = 0; OB_SUCC(ret) && i < trans_store_array.count(); ++i) { - ObTableLoadTransStore *trans_store = trans_store_array.at(i); - for (int j = 0; OB_SUCC(ret) && j < trans_store->session_store_array_.count(); ++j) { - ObTableLoadTransStore::SessionStore * session_store = trans_store->session_store_array_.at(j); - for (int k = 0 ; OB_SUCC(ret) && k < session_store->partition_table_array_.count(); ++k) { - ObIDirectLoadPartitionTable *table = session_store->partition_table_array_.at(k); - ObDirectLoadFastHeapTable *sstable = nullptr; - if (OB_ISNULL(sstable = dynamic_cast(table))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected not heap sstable", KR(ret), KPC(table)); - } else { - const ObTabletID &tablet_id = sstable->get_tablet_id(); - if (OB_FAIL(tables.add(tablet_id, sstable))) { - LOG_WARN("fail to add tables", KR(ret), KPC(sstable)); - } - } - } - } - } - for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { - ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); - ObArray heap_table_array ; - if (OB_FAIL(tables.get(tablet_ctx->get_tablet_id(), heap_table_array))) { - LOG_WARN("get heap sstable failed", KR(ret)); - } else if (OB_FAIL(tablet_ctx->collect_sql_statistics(heap_table_array, sql_statistics))) { - LOG_WARN("fail to collect sql statics", KR(ret)); - } - } - } - } else { - for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { - ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); - ObArray heap_table_array ; - if (OB_FAIL(tablet_ctx->collect_sql_statistics(heap_table_array, sql_statistics))) { - LOG_WARN("fail to collect sql statics", KR(ret)); - } - } - } - return ret; -} - int ObTableLoadMerger::start_merge() { int ret = OB_SUCCESS; @@ -429,7 +376,7 @@ int ObTableLoadMerger::start_merge() LOG_WARN("fail to alloc task", KR(ret)); } // 2. 设置processor - else if (OB_FAIL(task->set_processor(ctx, this))) { + else if (OB_FAIL(task->set_processor(ctx, this, thread_idx))) { LOG_WARN("fail to set merge task processor", KR(ret)); } // 3. 设置callback diff --git a/src/observer/table_load/ob_table_load_merger.h b/src/observer/table_load/ob_table_load_merger.h index 8d25d1c3f6..f7f1ab2da4 100644 --- a/src/observer/table_load/ob_table_load_merger.h +++ b/src/observer/table_load/ob_table_load_merger.h @@ -38,7 +38,6 @@ public: int start(); void stop(); int handle_table_compact_success(); - int collect_sql_statistics(table::ObTableLoadSqlStatistics &sql_statistics); int collect_dml_stat(table::ObTableLoadDmlStat &dml_stats); private: int build_merge_ctx(); diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index b4661fe450..957442d4af 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -25,7 +25,6 @@ #include "observer/table_load/ob_table_load_utils.h" #include "storage/direct_load/ob_direct_load_insert_table_ctx.h" #include "share/stat/ob_opt_stat_monitor_manager.h" -#include "share/stat/ob_dbms_stats_utils.h" #include "storage/blocksstable/ob_sstable.h" namespace oceanbase @@ -298,7 +297,7 @@ int ObTableLoadStore::start_merge() return ret; } -int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info) +int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlStatistics &sql_statistics) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -308,19 +307,12 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info) LOG_INFO("store commit"); ObMutexGuard guard(store_ctx_->get_op_lock()); ObTableLoadDmlStat dml_stats; - ObTableLoadSqlStatistics sql_statistics; if (OB_FAIL(store_ctx_->check_status(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check store status", KR(ret)); - } else if (OB_FAIL(store_ctx_->insert_table_ctx_->commit())) { + } else if (OB_FAIL(store_ctx_->insert_table_ctx_->commit(sql_statistics))) { LOG_WARN("fail to commit insert table", KR(ret)); } else if (ctx_->schema_.has_autoinc_column_ && OB_FAIL(store_ctx_->commit_autoinc_value())) { LOG_WARN("fail to commit sync auto increment value", KR(ret)); - } else if (param_.online_opt_stat_gather_ && - OB_FAIL(store_ctx_->merger_->collect_sql_statistics(sql_statistics))) { - LOG_WARN("fail to collect sql stats", KR(ret)); - } else if (param_.online_opt_stat_gather_ && - OB_FAIL(commit_sql_statistics(sql_statistics))) { - LOG_WARN("fail to commit sql stats", KR(ret)); } else if (OB_FAIL(store_ctx_->merger_->collect_dml_stat(dml_stats))) { LOG_WARN("fail to build dml stat", KR(ret)); } else if (OB_FAIL(ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(dml_stats.dml_stat_array_))) { @@ -335,34 +327,6 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info) return ret; } -int ObTableLoadStore::commit_sql_statistics(const ObTableLoadSqlStatistics &sql_statistics) -{ - int ret = OB_SUCCESS; - const uint64_t tenant_id = MTL_ID(); - const uint64_t table_id = param_.table_id_; - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - ObSEArray part_column_stats; - ObSEArray part_table_stats; - if (sql_statistics.is_empty()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("sql statistics is empty", K(ret)); - } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, - table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(sql_statistics.get_col_stat_array(part_column_stats))) { - LOG_WARN("failed to get column stat array"); - } else if (OB_FAIL(sql_statistics.get_table_stat_array(part_table_stats))) { - LOG_WARN("failed to get table stat array"); - } else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write( - &schema_guard, store_ctx_->ctx_->session_info_, GCTX.sql_proxy_, part_table_stats, - part_column_stats))) { - LOG_WARN("failed to batch write stats", K(ret), K(sql_statistics.table_stat_array_), - K(sql_statistics.col_stat_array_)); - } - return ret; -} - int ObTableLoadStore::get_status(ObTableLoadStatusType &status, int &error_code) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_store.h b/src/observer/table_load/ob_table_load_store.h index 59d2cd28fe..83344bd1d2 100644 --- a/src/observer/table_load/ob_table_load_store.h +++ b/src/observer/table_load/ob_table_load_store.h @@ -46,11 +46,9 @@ public: int confirm_begin(); int pre_merge(const table::ObTableLoadArray &committed_trans_id_array); int start_merge(); - int commit(table::ObTableLoadResultInfo &result_info); + int commit(table::ObTableLoadResultInfo &result_info, table::ObTableLoadSqlStatistics &sql_statistics); int get_status(table::ObTableLoadStatusType &status, int &error_code); int heart_beat(); -private: - int commit_sql_statistics(const table::ObTableLoadSqlStatistics &sql_statistics); private: class MergeTaskProcessor; class MergeTaskCallback; diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index fe771087ac..91e85790b6 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -83,11 +83,19 @@ int ObTableLoadStoreCtx::init( } else { ObDirectLoadInsertTableParam insert_table_param; insert_table_param.table_id_ = ctx_->param_.table_id_; + insert_table_param.dest_table_id_= ctx_->ddl_param_.dest_table_id_; insert_table_param.schema_version_ = ctx_->ddl_param_.schema_version_; insert_table_param.snapshot_version_ = ctx_->ddl_param_.snapshot_version_; insert_table_param.ddl_task_id_ = ctx_->ddl_param_.task_id_; insert_table_param.execution_id_ = 1; //仓氐说暂时设置为1,不然后面检测过不了 insert_table_param.data_version_ = ctx_->ddl_param_.data_version_; + insert_table_param.session_cnt_ = ctx_->param_.session_count_; + insert_table_param.rowkey_column_count_ = (!ctx_->schema_.is_heap_table_ ? ctx_->schema_.rowkey_column_count_ : 0); + insert_table_param.column_count_ = ctx_->param_.column_count_; + insert_table_param.online_opt_stat_gather_ = ctx_->param_.online_opt_stat_gather_; + insert_table_param.is_heap_table_ = ctx_->schema_.is_heap_table_; + insert_table_param.col_descs_ = &(ctx_->schema_.column_descs_); + insert_table_param.cmp_funcs_ = &(ctx_->schema_.cmp_funcs_); for (int64_t i = 0; OB_SUCC(ret) && i < partition_id_array.count(); ++i) { const ObLSID &ls_id = partition_id_array[i].ls_id_; const ObTableLoadPartitionId &part_tablet_id = partition_id_array[i].part_tablet_id_; diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index 4c48976c8a..42cda58205 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -210,16 +210,15 @@ int ObTableLoadTransStoreWriter::init_session_ctx_array() param.table_data_desc_ = *table_data_desc_; param.datum_utils_ = &(trans_ctx_->ctx_->schema_.datum_utils_); param.col_descs_ = &(trans_ctx_->ctx_->schema_.column_descs_); - param.cmp_funcs_ = &(trans_ctx_->ctx_->schema_.cmp_funcs_); param.file_mgr_ = trans_ctx_->ctx_->store_ctx_->tmp_file_mgr_; param.is_multiple_mode_ = trans_ctx_->ctx_->store_ctx_->is_multiple_mode_; param.is_fast_heap_table_ = trans_ctx_->ctx_->store_ctx_->is_fast_heap_table_; - param.online_opt_stat_gather_ = trans_ctx_->ctx_->param_.online_opt_stat_gather_; param.insert_table_ctx_ = trans_ctx_->ctx_->store_ctx_->insert_table_ctx_; param.fast_heap_table_ctx_ = trans_ctx_->ctx_->store_ctx_->fast_heap_table_ctx_; param.dml_row_handler_ = trans_ctx_->ctx_->store_ctx_->error_row_handler_; for (int64_t i = 0; OB_SUCC(ret) && i < session_count; ++i) { SessionContext *session_ctx = session_ctx_array_ + i; + param.thread_idx_ = i; if (param_.px_mode_) { session_ctx->extra_buf_size_ = table_data_desc_->extra_buf_size_; if (OB_ISNULL(session_ctx->extra_buf_ = diff --git a/src/share/table/ob_table_load_sql_statistics.cpp b/src/share/table/ob_table_load_sql_statistics.cpp index 8f56ad9cc8..dc97ea86fc 100644 --- a/src/share/table/ob_table_load_sql_statistics.cpp +++ b/src/share/table/ob_table_load_sql_statistics.cpp @@ -21,6 +21,7 @@ namespace table void ObTableLoadSqlStatistics::reset() { + int ret = OB_SUCCESS; for (int64_t i = 0; i < col_stat_array_.count(); ++i) { ObOptOSGColumnStat *col_stat = col_stat_array_.at(i); if (col_stat != nullptr) { @@ -82,53 +83,73 @@ int ObTableLoadSqlStatistics::allocate_col_stat(ObOptOSGColumnStat *&col_stat) return ret; } -int ObTableLoadSqlStatistics::add(const ObTableLoadSqlStatistics& other) +int ObTableLoadSqlStatistics::merge(const ObTableLoadSqlStatistics& other) { int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) { - ObOptTableStat *table_stat = other.table_stat_array_.at(i); - if (table_stat != nullptr) { - ObOptTableStat *copied_table_stat = nullptr; - int64_t size = table_stat->size(); - char *new_buf = nullptr; - if (OB_ISNULL(new_buf = static_cast(allocator_.alloc(size)))) { + if (is_empty()) { + for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) { + ObOptTableStat *table_stat = other.table_stat_array_.at(i); + if (table_stat != nullptr) { + ObOptTableStat *copied_table_stat = nullptr; + int64_t size = table_stat->size(); + char *new_buf = nullptr; + if (OB_ISNULL(new_buf = static_cast(allocator_.alloc(size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size)); + } else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) { + OB_LOG(WARN, "fail to copy table stat", KR(ret)); + } else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) { + OB_LOG(WARN, "fail to add table stat", KR(ret)); + } + if (OB_FAIL(ret)) { + if (copied_table_stat != nullptr) { + copied_table_stat->~ObOptTableStat(); + copied_table_stat = nullptr; + } + if(new_buf != nullptr) { + allocator_.free(new_buf); + new_buf = nullptr; + } + } + } + } + for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) { + ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i); + ObOptOSGColumnStat *copied_col_stat = nullptr; + if (OB_ISNULL(col_stat)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "get unexpected null"); + } else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) { ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size)); - } else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) { - OB_LOG(WARN, "fail to copy table stat", KR(ret)); - } else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) { - OB_LOG(WARN, "fail to add table stat", KR(ret)); + OB_LOG(WARN, "failed to create new col stat"); + } else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) { + OB_LOG(WARN, "fail to copy col stat", KR(ret)); + } else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) { + OB_LOG(WARN, "fail to add col stat", KR(ret)); } if (OB_FAIL(ret)) { - if (copied_table_stat != nullptr) { - copied_table_stat->~ObOptTableStat(); - copied_table_stat = nullptr; - } - if(new_buf != nullptr) { - allocator_.free(new_buf); - new_buf = nullptr; + if (copied_col_stat != nullptr) { + copied_col_stat->~ObOptOSGColumnStat(); + copied_col_stat = nullptr; } } } - } - for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) { - ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i); - ObOptOSGColumnStat *copied_col_stat = nullptr; - if (OB_ISNULL(col_stat)) { - ret = OB_ERR_UNEXPECTED; - OB_LOG(WARN, "get unexpected null"); - } else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "failed to create new col stat"); - } else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) { - OB_LOG(WARN, "fail to copy col stat", KR(ret)); - } else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) { - OB_LOG(WARN, "fail to add col stat", KR(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) { + ObOptTableStat *table_stat = other.table_stat_array_.at(i); + if (OB_FAIL(table_stat_array_.at(i)->merge_table_stat(*table_stat))) { + OB_LOG(WARN, "failed to merge table stat"); + } } - if (OB_FAIL(ret)) { - if (copied_col_stat != nullptr) { - copied_col_stat->~ObOptOSGColumnStat(); - copied_col_stat = nullptr; + for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) { + ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i); + if (OB_ISNULL(col_stat)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "get unexpected null"); + } + // ObOptOSGColumnStat的序列化结果只序列化里面的ObOptColumnStat, 需要调用ObOptColumnStat的merge函数 + else if (OB_FAIL(col_stat_array_.at(i)->col_stat_->merge_column_stat(*col_stat->col_stat_))) { + OB_LOG(WARN, "failed to merge col stat"); } } } diff --git a/src/share/table/ob_table_load_sql_statistics.h b/src/share/table/ob_table_load_sql_statistics.h index 6cfd7d93e2..30b2ab5c23 100644 --- a/src/share/table/ob_table_load_sql_statistics.h +++ b/src/share/table/ob_table_load_sql_statistics.h @@ -34,14 +34,16 @@ public: } int allocate_table_stat(ObOptTableStat *&table_stat); int allocate_col_stat(ObOptOSGColumnStat *&col_stat); - int add(const ObTableLoadSqlStatistics& other); + int merge(const ObTableLoadSqlStatistics& other); int get_table_stat_array(ObIArray &table_stat_array) const; int get_col_stat_array(ObIArray &col_stat_array) const; int persistence_col_stats(); + ObIArray & get_table_stat_array() {return table_stat_array_; } + ObIArray & get_col_stat_array() {return col_stat_array_; } TO_STRING_KV(K_(col_stat_array), K_(table_stat_array)); public: - common::ObSEArray table_stat_array_; - common::ObSEArray col_stat_array_; + common::ObArray table_stat_array_; + common::ObArray col_stat_array_; common::ObArenaAllocator allocator_; }; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index b5b980a037..0ae4221c2f 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -1989,12 +1989,9 @@ int ObLoadDataDirectImpl::init_execute_param() } // need_sort_ if (OB_SUCC(ret)) { - int64_t append = 0; int64_t enable_direct = 0; int64_t hint_need_sort = 0; - if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) { - LOG_WARN("fail to get value of APPEND", K(ret)); - } else if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) { + if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) { LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret)); } else if (OB_FAIL(hint.get_value(ObLoadDataHint::NEED_SORT, hint_need_sort))) { LOG_WARN("fail to get value of NEED_SORT", KR(ret), K(hint)); @@ -2019,8 +2016,7 @@ int ObLoadDataDirectImpl::init_execute_param() } // online_opt_stat_gather_ if (OB_SUCC(ret)) { - int64_t append = 0; - int64_t gather_optimizer_statistics = 0 ; + int64_t no_gather_optimizer_statistics = 0 ; ObSQLSessionInfo *session = nullptr; ObObj obj; if (OB_ISNULL(session = ctx_->get_my_session())) { @@ -2028,11 +2024,9 @@ int ObLoadDataDirectImpl::init_execute_param() LOG_WARN("session is null", KR(ret)); } else if (OB_FAIL(session->get_sys_variable(SYS_VAR__OPTIMIZER_GATHER_STATS_ON_LOAD, obj))) { LOG_WARN("fail to get sys variable", K(ret)); - } else if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) { + } else if (OB_FAIL(hint.get_value(ObLoadDataHint::NO_GATHER_OPTIMIZER_STATISTICS, no_gather_optimizer_statistics))) { LOG_WARN("fail to get value of APPEND", K(ret)); - } else if (OB_FAIL(hint.get_value(ObLoadDataHint::GATHER_OPTIMIZER_STATISTICS, gather_optimizer_statistics))) { - LOG_WARN("fail to get value of APPEND", K(ret)); - } else if (((append != 0) || (gather_optimizer_statistics != 0)) && obj.get_bool()) { + } else if ((no_gather_optimizer_statistics == 0) && obj.get_bool()) { execute_param_.online_opt_stat_gather_ = true; } else { execute_param_.online_opt_stat_gather_ = false; @@ -2040,19 +2034,11 @@ int ObLoadDataDirectImpl::init_execute_param() } // max_error_rows_ if (OB_SUCC(ret)) { - int64_t append = 0; - int64_t enable_direct = 0; int64_t hint_error_rows = 0; - if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) { - LOG_WARN("fail to get value of APPEND", K(ret)); - } else if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) { - LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret)); - } else if (OB_FAIL(hint.get_value(ObLoadDataHint::ERROR_ROWS, hint_error_rows))) { + if (OB_FAIL(hint.get_value(ObLoadDataHint::ERROR_ROWS, hint_error_rows))) { LOG_WARN("fail to get value of ERROR_ROWS", KR(ret), K(hint)); - } else if (enable_direct != 0) { - execute_param_.max_error_rows_ = hint_error_rows; } else { - execute_param_.max_error_rows_ = 0; + execute_param_.max_error_rows_ = hint_error_rows; } } // data_access_param_ diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp b/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp index 24ba72aef8..25a871d328 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp @@ -25,7 +25,7 @@ using namespace common; */ ObDirectLoadFastHeapTableCreateParam::ObDirectLoadFastHeapTableCreateParam() - : row_count_(0) , column_stat_array_(nullptr) + : row_count_(0) { } @@ -35,7 +35,7 @@ ObDirectLoadFastHeapTableCreateParam::~ObDirectLoadFastHeapTableCreateParam() bool ObDirectLoadFastHeapTableCreateParam::is_valid() const { - return tablet_id_.is_valid() && row_count_ >= 0 && nullptr != column_stat_array_ ; + return tablet_id_.is_valid() && row_count_ >= 0; } /** @@ -43,45 +43,14 @@ bool ObDirectLoadFastHeapTableCreateParam::is_valid() const */ ObDirectLoadFastHeapTable::ObDirectLoadFastHeapTable() - : allocator_("TLD_FastHTable"), is_inited_(false) + : is_inited_(false) { } ObDirectLoadFastHeapTable::~ObDirectLoadFastHeapTable() { - for (int64_t i = 0; i < column_stat_array_.count(); ++i) { - ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); - col_stat->~ObOptOSGColumnStat(); - col_stat = nullptr; - } } -int ObDirectLoadFastHeapTable::copy_col_stat(const ObDirectLoadFastHeapTableCreateParam ¶m) -{ - int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret)&& i < param.column_stat_array_->count(); ++i) { - ObOptOSGColumnStat *col_stat = param.column_stat_array_->at(i); - ObOptOSGColumnStat *copied_col_stat = NULL; - if (OB_ISNULL(col_stat)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null"); - } else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate memory"); - } else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) { - LOG_WARN("fail to copy colstat", KR(ret)); - } else if (OB_FAIL(column_stat_array_.push_back(copied_col_stat))) { - LOG_WARN("fail to add table", KR(ret)); - } - if (OB_FAIL(ret) && OB_NOT_NULL(copied_col_stat)) { - copied_col_stat->~ObOptOSGColumnStat(); - copied_col_stat = nullptr; - } - } - return ret; -} - - int ObDirectLoadFastHeapTable::init(const ObDirectLoadFastHeapTableCreateParam ¶m) { int ret = OB_SUCCESS; @@ -94,12 +63,7 @@ int ObDirectLoadFastHeapTable::init(const ObDirectLoadFastHeapTableCreateParam & } else { meta_.tablet_id_ = param.tablet_id_; meta_.row_count_ = param.row_count_; - allocator_.set_tenant_id(MTL_ID()); - if (OB_FAIL(copy_col_stat(param))){ - LOG_WARN("fail to inner init", KR(ret), K(param)); - } else { - is_inited_ = true; - } + is_inited_ = true; } return ret; } diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table.h b/src/storage/direct_load/ob_direct_load_fast_heap_table.h index 1c8c22bd9c..cc597bb893 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table.h +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table.h @@ -31,7 +31,6 @@ public: public: common::ObTabletID tablet_id_; int64_t row_count_; - common::ObArray *column_stat_array_; }; struct ObDirectLoadFastHeapTableMeta @@ -47,21 +46,13 @@ public: ObDirectLoadFastHeapTable(); virtual ~ObDirectLoadFastHeapTable(); int init(const ObDirectLoadFastHeapTableCreateParam ¶m); - const common::ObIArray &get_column_stat_array() const - { - return column_stat_array_; - } const common::ObTabletID &get_tablet_id() const override { return meta_.tablet_id_; } int64_t get_row_count() const override { return meta_.row_count_; } bool is_valid() const override { return is_inited_; } const ObDirectLoadFastHeapTableMeta &get_meta() const { return meta_; } TO_STRING_KV(K_(meta)); -private: - int copy_col_stat(const ObDirectLoadFastHeapTableCreateParam ¶m); private: ObDirectLoadFastHeapTableMeta meta_; - common::ObArenaAllocator allocator_; - common::ObArray column_stat_array_; bool is_inited_; DISABLE_COPY_ASSIGN(ObDirectLoadFastHeapTable); }; diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp index 21712a96be..38f53a74cd 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp @@ -34,13 +34,12 @@ using namespace share; ObDirectLoadFastHeapTableBuildParam::ObDirectLoadFastHeapTableBuildParam() : snapshot_version_(0), + thread_idx_(0), datum_utils_(nullptr), col_descs_(nullptr), - cmp_funcs_(nullptr), insert_table_ctx_(nullptr), fast_heap_table_ctx_(nullptr), - dml_row_handler_(nullptr), - online_opt_stat_gather_(false) + dml_row_handler_(nullptr) { } @@ -50,8 +49,8 @@ ObDirectLoadFastHeapTableBuildParam::~ObDirectLoadFastHeapTableBuildParam() bool ObDirectLoadFastHeapTableBuildParam::is_valid() const { - return tablet_id_.is_valid() && snapshot_version_ > 0 && table_data_desc_.is_valid() && - nullptr != col_descs_ && nullptr != cmp_funcs_ && nullptr != insert_table_ctx_ && + return tablet_id_.is_valid() && snapshot_version_ > 0 && thread_idx_ >= 0 && + table_data_desc_.is_valid() && nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != fast_heap_table_ctx_ && nullptr != dml_row_handler_ && nullptr != datum_utils_; } @@ -77,56 +76,6 @@ ObDirectLoadFastHeapTableBuilder::~ObDirectLoadFastHeapTableBuilder() slice_writer_allocator_.free(slice_writer_); slice_writer_ = nullptr; } - for (int64_t i = 0; i < column_stat_array_.count(); ++i) { - ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); - col_stat->~ObOptOSGColumnStat(); - allocator_.free(col_stat); - col_stat = nullptr; - } -} - -int ObDirectLoadFastHeapTableBuilder::init_sql_statistics() -{ - int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret) && i < param_.table_data_desc_.column_count_; ++i) { - ObOptOSGColumnStat *new_osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_); - if (OB_ISNULL(new_osg_col_stat)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate col stat"); - } else if (OB_FAIL(column_stat_array_.push_back(new_osg_col_stat))) { - LOG_WARN("fail to push back", KR(ret)); - } - if (OB_FAIL(ret)) { - if (new_osg_col_stat != nullptr) { - new_osg_col_stat->~ObOptOSGColumnStat(); - allocator_.free(new_osg_col_stat); - new_osg_col_stat = nullptr; - } - } - } - return ret; -} - -int ObDirectLoadFastHeapTableBuilder::collect_obj(const ObDatumRow &datum_row) -{ - int ret = OB_SUCCESS; - const int64_t extra_rowkey_cnt = ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); - for (int64_t i = 0; OB_SUCC(ret) && i < param_.table_data_desc_.column_count_; i++) { - const ObStorageDatum &datum = - datum_row.storage_datums_[i + extra_rowkey_cnt + 1]; - const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i + 1).get_cmp_func(); - const ObColDesc &col_desc = param_.col_descs_->at(i + 1); - ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); - bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); - if (col_stat != nullptr && is_valid) { - if (OB_FAIL(col_stat->update_column_stat_info(&datum, - col_desc.col_type_, - cmp_func.cmp_func_))) { - LOG_WARN("failed to update column stat info"); - } - } - } - return ret; } int ObDirectLoadFastHeapTableBuilder::init(const ObDirectLoadFastHeapTableBuildParam ¶m) @@ -142,9 +91,7 @@ int ObDirectLoadFastHeapTableBuilder::init(const ObDirectLoadFastHeapTableBuildP param_ = param; allocator_.set_tenant_id(MTL_ID()); slice_writer_allocator_.set_tenant_id(MTL_ID()); - if (param_.online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) { - LOG_WARN("fail to inner init sql statistics", KR(ret)); - } else if (OB_FAIL(param_.fast_heap_table_ctx_->get_tablet_context( + if (OB_FAIL(param_.fast_heap_table_ctx_->get_tablet_context( param_.tablet_id_, fast_heap_table_tablet_ctx_))) { LOG_WARN("fail to get tablet context", KR(ret)); } else if (OB_FAIL(init_sstable_slice_ctx())) { @@ -234,7 +181,7 @@ int ObDirectLoadFastHeapTableBuilder::append_row(const ObTabletID &tablet_id, } if (OB_FAIL(slice_writer_->append_row(datum_row_))) { LOG_WARN("fail to append row", KR(ret)); - } else if (param_.online_opt_stat_gather_ && OB_FAIL(collect_obj(datum_row_))) { + } else if (param_.insert_table_ctx_->collect_obj(param_.thread_idx_ ,datum_row_)) { LOG_WARN("fail to collect", KR(ret)); } else { ++row_count_; @@ -262,6 +209,7 @@ int ObDirectLoadFastHeapTableBuilder::close() if (OB_FAIL(slice_writer_->close())) { LOG_WARN("fail to close sstable slice writer", KR(ret)); } else { + param_.insert_table_ctx_->inc_row_count(row_count_); is_closed_ = true; } } @@ -282,7 +230,6 @@ int ObDirectLoadFastHeapTableBuilder::get_tables( ObDirectLoadFastHeapTableCreateParam create_param; create_param.tablet_id_ = param_.tablet_id_; create_param.row_count_ = row_count_; - create_param.column_stat_array_ = &column_stat_array_; ObDirectLoadFastHeapTable *fast_heap_table = nullptr; if (OB_ISNULL(fast_heap_table = OB_NEWx(ObDirectLoadFastHeapTable, (&allocator)))) { ret = OB_ALLOCATE_MEMORY_FAILED; diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h index c87f5bc018..3ad2cee9d2 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h @@ -39,20 +39,19 @@ public: ObDirectLoadFastHeapTableBuildParam(); ~ObDirectLoadFastHeapTableBuildParam(); bool is_valid() const; - TO_STRING_KV(K_(tablet_id), K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), - KP_(col_descs), KP_(cmp_funcs), KP_(insert_table_ctx), KP_(fast_heap_table_ctx), - KP_(dml_row_handler), K_(online_opt_stat_gather)); + TO_STRING_KV(K_(tablet_id), K_(snapshot_version), K_(thread_idx), K_(table_data_desc), + KP_(datum_utils), KP_(col_descs), KP_(insert_table_ctx), KP_(fast_heap_table_ctx), + KP_(dml_row_handler)); public: common::ObTabletID tablet_id_; int64_t snapshot_version_; + int64_t thread_idx_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; const common::ObIArray *col_descs_; - const blocksstable::ObStoreCmpFuncs *cmp_funcs_; ObDirectLoadInsertTableContext *insert_table_ctx_; ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_; ObDirectLoadDMLRowHandler *dml_row_handler_; - bool online_opt_stat_gather_; }; class ObDirectLoadFastHeapTableBuilder : public ObIDirectLoadPartitionTableBuilder @@ -70,8 +69,6 @@ public: int get_tables(common::ObIArray &table_array, common::ObIAllocator &allocator) override; private: - int init_sql_statistics(); - int collect_obj(const blocksstable::ObDatumRow &datum_row); int init_sstable_slice_ctx(); int switch_sstable_slice(); private: @@ -82,7 +79,6 @@ private: ObSSTableInsertSliceWriter *slice_writer_; ObDirectLoadFastHeapTableTabletWriteCtx write_ctx_; blocksstable::ObDatumRow datum_row_; - common::ObArray column_stat_array_; int64_t row_count_; bool is_closed_; bool is_inited_; diff --git a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp index 3abbdd57f1..440deddaba 100644 --- a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp @@ -13,6 +13,10 @@ #include "storage/direct_load/ob_direct_load_insert_table_ctx.h" #include "storage/ddl/ob_direct_insert_sstable_ctx.h" +#include "share/stat/ob_opt_table_stat.h" +#include "share/stat/ob_opt_column_stat.h" +#include "share/stat/ob_stat_item.h" +#include "observer/table_load/ob_table_load_schema.h" namespace oceanbase { @@ -20,13 +24,26 @@ namespace storage { using namespace common; using namespace table; +using namespace sql; +using namespace observer; /** * ObDirectLoadInsertTableParam */ ObDirectLoadInsertTableParam::ObDirectLoadInsertTableParam() - : table_id_(OB_INVALID_ID), schema_version_(0), snapshot_version_(0), execution_id_(0), ddl_task_id_(0) + : table_id_(OB_INVALID_ID), + dest_table_id_(OB_INVALID_ID), + schema_version_(0), + snapshot_version_(0), + execution_id_(0), + ddl_task_id_(0), + data_version_(0), + session_cnt_(0), + rowkey_column_count_(0), + column_count_(0), + online_opt_stat_gather_(false), + is_heap_table_(false) { } @@ -36,16 +53,25 @@ ObDirectLoadInsertTableParam::~ObDirectLoadInsertTableParam() bool ObDirectLoadInsertTableParam::is_valid() const { - return OB_INVALID_ID != table_id_ && schema_version_ >= 0 && snapshot_version_ >= 0 && - ls_partition_ids_.count() > 0; + return OB_INVALID_ID != table_id_ && OB_INVALID_ID != dest_table_id_ && schema_version_ >= 0 && + snapshot_version_ >= 0 && ls_partition_ids_.count() > 0; } int ObDirectLoadInsertTableParam::assign(const ObDirectLoadInsertTableParam &other) { int ret = OB_SUCCESS; table_id_ = other.table_id_; + dest_table_id_ = other.dest_table_id_; schema_version_ = other.schema_version_; snapshot_version_ = other.snapshot_version_; + data_version_ = other.data_version_; + session_cnt_ = other.session_cnt_; + rowkey_column_count_ = other.rowkey_column_count_; + column_count_ = other.column_count_; + online_opt_stat_gather_ = other.online_opt_stat_gather_; + is_heap_table_ = other.is_heap_table_; + col_descs_ = other.col_descs_; + cmp_funcs_ = other.cmp_funcs_; if (OB_FAIL(ls_partition_ids_.assign(other.ls_partition_ids_))) { LOG_WARN("fail to assign ls tablet ids", KR(ret)); } @@ -57,7 +83,7 @@ int ObDirectLoadInsertTableParam::assign(const ObDirectLoadInsertTableParam &oth */ ObDirectLoadInsertTableContext::ObDirectLoadInsertTableContext() - : tablet_finish_count_(0), is_inited_(false) + : allocator_("TLD_SqlStat"), tablet_finish_count_(0), table_row_count_(0), is_inited_(false) { } @@ -76,6 +102,12 @@ void ObDirectLoadInsertTableContext::reset() } ddl_ctrl_.context_id_ = 0; } + for (int64_t i = 0; i < session_sql_ctx_array_.count(); ++i) { + ObTableLoadSqlStatistics *sql_statistics = session_sql_ctx_array_.at(i); + sql_statistics->~ObTableLoadSqlStatistics(); + allocator_.free(sql_statistics); + } + session_sql_ctx_array_.reset(); is_inited_ = false; } @@ -112,6 +144,8 @@ int ObDirectLoadInsertTableContext::init(const ObDirectLoadInsertTableParam &par } else if (OB_FAIL(sstable_insert_mgr.create_table_context(table_insert_param, ddl_ctrl_.context_id_))) { LOG_WARN("fail to create table context", KR(ret), K(table_insert_param)); + } else if (param_.online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) { + LOG_WARN("fail to init sql statistics", KR(ret)); } else { is_inited_ = true; } @@ -122,6 +156,163 @@ int ObDirectLoadInsertTableContext::init(const ObDirectLoadInsertTableParam &par return ret; } +int ObDirectLoadInsertTableContext::init_sql_statistics() +{ + int ret = OB_SUCCESS; + ObOptTableStat *table_stat = nullptr; + for (int64_t i = 0; OB_SUCC(ret) && i < param_.session_cnt_; ++i) { + ObTableLoadSqlStatistics *sql_statistics = nullptr; + ObOptTableStat *table_stat = nullptr; + if (OB_ISNULL(sql_statistics = OB_NEWx(ObTableLoadSqlStatistics, (&allocator_)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to new ObTableLoadSqlStatistics", KR(ret)); + } else if (OB_FAIL(sql_statistics->allocate_table_stat(table_stat))) { + LOG_WARN("fail to allocate table stat", KR(ret)); + } else if (OB_FAIL(session_sql_ctx_array_.push_back(sql_statistics))) { + LOG_WARN("fail to push back", KR(ret)); + } else { + for (int64_t j = 0; OB_SUCC(ret) && j < param_.column_count_; ++j) { + ObOptOSGColumnStat *osg_col_stat = nullptr; + if (OB_FAIL(sql_statistics->allocate_col_stat(osg_col_stat))) { + LOG_WARN("fail to allocate col stat", KR(ret)); + } + } + } + if (OB_FAIL(ret)) { + if (nullptr != sql_statistics) { + sql_statistics->~ObTableLoadSqlStatistics(); + allocator_.free(sql_statistics); + sql_statistics = nullptr; + } + } + } + return ret; +} + +int ObDirectLoadInsertTableContext::collect_sql_statistics(ObTableLoadSqlStatistics &sql_statistics) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = nullptr; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObDirectLoadTabletMergeCtx not init", KR(ret), KP(this)); + } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, param_.dest_table_id_, + schema_guard, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(param_)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected error", K(ret)); + } else { + sql_statistics.reset(); + int64_t table_row_cnt = table_row_count_; + int64_t table_avg_len = 0; + int64_t col_cnt = param_.column_count_; + uint64_t partition_id = -1; + ObOptTableStat *table_stat = nullptr; + StatLevel stat_level = TABLE_LEVEL; + if (table_schema->get_part_level() == PARTITION_LEVEL_ZERO) { + partition_id = param_.dest_table_id_; + } + if (OB_FAIL(sql_statistics.allocate_table_stat(table_stat))) { + LOG_WARN("fail to allocate table stat", KR(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) { + int64_t col_id = param_.is_heap_table_ ? i + 1 : i; + ObOptOSGColumnStat *osg_col_stat = nullptr; + if (OB_FAIL(sql_statistics.allocate_col_stat(osg_col_stat))) { + LOG_WARN("fail to allocate table stat", KR(ret)); + } + // scan session_sql_ctx_array + for (int64_t j = 0; OB_SUCC(ret) && j < session_sql_ctx_array_.count(); ++j) { + ObOptOSGColumnStat *tmp_col_stat = + session_sql_ctx_array_.at(j)->get_col_stat_array().at(i); + if (OB_FAIL(osg_col_stat->merge_column_stat(*tmp_col_stat))) { + LOG_WARN("fail to merge column stat", KR(ret)); + } + } + + if (OB_SUCC(ret)) { + osg_col_stat->col_stat_->calc_avg_len(); + table_avg_len += osg_col_stat->col_stat_->get_avg_len(); + osg_col_stat->col_stat_->set_table_id(param_.dest_table_id_); + osg_col_stat->col_stat_->set_partition_id(partition_id); + osg_col_stat->col_stat_->set_stat_level(stat_level); + osg_col_stat->col_stat_->set_column_id(param_.col_descs_->at(col_id).col_id_); + osg_col_stat->col_stat_->set_num_distinct( + ObGlobalNdvEval::get_ndv_from_llc(osg_col_stat->col_stat_->get_llc_bitmap())); + if (OB_FAIL(osg_col_stat->set_min_max_datum_to_obj())) { + LOG_WARN("failed to set min max datum to obj", K(ret)); + } + } + } + if (OB_SUCC(ret)) { + table_stat->set_table_id(param_.dest_table_id_); + table_stat->set_partition_id(partition_id); + table_stat->set_object_type(stat_level); + table_stat->set_row_count(table_row_cnt); + table_stat->set_avg_row_size(table_avg_len); + } + } + } + return ret; +} + +int ObDirectLoadInsertTableContext::collect_obj(int64_t thread_idx, const ObDatumRow &datum_row) +{ + int ret = OB_SUCCESS; + const int64_t extra_rowkey_cnt = ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); + if (param_.online_opt_stat_gather_) { + if (OB_UNLIKELY(thread_idx >= session_sql_ctx_array_.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected thread idx", KR(ret), K(thread_idx), K(session_sql_ctx_array_.count())); + } else { + ObTableLoadSqlStatistics *sql_stat = session_sql_ctx_array_.at(thread_idx); + if (param_.is_heap_table_ ) { + for (int64_t i = 0; OB_SUCC(ret) && i < param_.column_count_; i++) { + const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt + 1]; + const ObColDesc &col_desc = param_.col_descs_->at(i + 1); + const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i + 1).get_cmp_func(); + ObOptOSGColumnStat *col_stat = sql_stat->get_col_stat_array().at(i); + bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); + if (col_stat != nullptr && is_valid) { + if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { + LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); + } + } + } + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < param_.rowkey_column_count_; i++) { + const ObStorageDatum &datum = datum_row.storage_datums_[i]; + const ObColDesc &col_desc = param_.col_descs_->at(i); + const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i).get_cmp_func(); + ObOptOSGColumnStat *col_stat = sql_stat->get_col_stat_array().at(i); + bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); + if (col_stat != nullptr && is_valid) { + if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { + LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); + } + } + } + for (int64_t i = param_.rowkey_column_count_; OB_SUCC(ret) && i < param_.column_count_; i++) { + const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt]; + const ObColDesc &col_desc = param_.col_descs_->at(i); + const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i).get_cmp_func(); + ObOptOSGColumnStat *col_stat = session_sql_ctx_array_.at(thread_idx)->get_col_stat_array().at(i); + bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); + if (col_stat != nullptr && is_valid) { + if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { + LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); + } + } + } + } + } + } + return ret; +} + int ObDirectLoadInsertTableContext::add_sstable_slice(const ObTabletID &tablet_id, const ObMacroDataSeq &start_seq, ObNewRowIterator &iter, @@ -200,7 +391,7 @@ int ObDirectLoadInsertTableContext::notify_tablet_finish(const ObTabletID &table return ret; } -int ObDirectLoadInsertTableContext::commit() +int ObDirectLoadInsertTableContext::commit(ObTableLoadSqlStatistics &sql_statistics) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -214,8 +405,9 @@ int ObDirectLoadInsertTableContext::commit() ObSSTableInsertManager &sstable_insert_mgr = ObSSTableInsertManager::get_instance(); if (OB_FAIL(sstable_insert_mgr.finish_table_context(ddl_ctrl_.context_id_, true))) { LOG_WARN("fail to finish table context", KR(ret), K_(ddl_ctrl)); - } else { - ddl_ctrl_.context_id_ = 0; + } else if (FALSE_IT(ddl_ctrl_.context_id_ = 0)) { + } else if (param_.online_opt_stat_gather_ && OB_FAIL(collect_sql_statistics(sql_statistics))) { + LOG_WARN("fail to collect sql stats", KR(ret)); } } return ret; diff --git a/src/storage/direct_load/ob_direct_load_insert_table_ctx.h b/src/storage/direct_load/ob_direct_load_insert_table_ctx.h index 0ebef10e84..8d65355a6e 100644 --- a/src/storage/direct_load/ob_direct_load_insert_table_ctx.h +++ b/src/storage/direct_load/ob_direct_load_insert_table_ctx.h @@ -12,6 +12,7 @@ #pragma once #include "share/table/ob_table_load_define.h" +#include "share/table/ob_table_load_sql_statistics.h" #include "sql/engine/px/ob_sub_trans_ctrl.h" namespace oceanbase @@ -27,14 +28,25 @@ public: ~ObDirectLoadInsertTableParam(); int assign(const ObDirectLoadInsertTableParam &other); bool is_valid() const; - TO_STRING_KV(K_(table_id), K_(schema_version), K_(snapshot_version), K_(ls_partition_ids), K_(execution_id), K_(ddl_task_id)); + TO_STRING_KV(K_(table_id), K_(dest_table_id), K_(schema_version), K_(snapshot_version), + K_(execution_id), K_(ddl_task_id), K_(data_version), K_(session_cnt), + K_(rowkey_column_count), K_(column_count), K_(online_opt_stat_gather), + K_(is_heap_table), K_(ls_partition_ids)); public: uint64_t table_id_; + uint64_t dest_table_id_; int64_t schema_version_; int64_t snapshot_version_; int64_t execution_id_; int64_t ddl_task_id_; int64_t data_version_; + int64_t session_cnt_; + int64_t rowkey_column_count_; + int64_t column_count_; + bool online_opt_stat_gather_; + bool is_heap_table_; + const common::ObIArray *col_descs_; + const blocksstable::ObStoreCmpFuncs *cmp_funcs_; common::ObArray ls_partition_ids_; }; @@ -45,6 +57,7 @@ public: ~ObDirectLoadInsertTableContext(); void reset(); int init(const ObDirectLoadInsertTableParam ¶m); + int collect_obj(int64_t thread_idx, const blocksstable::ObDatumRow &datum_row); int add_sstable_slice(const common::ObTabletID &tablet_id, const blocksstable::ObMacroDataSeq &start_seq, common::ObNewRowIterator &iter, @@ -54,12 +67,20 @@ public: ObSSTableInsertSliceWriter *&slice_writer, common::ObIAllocator &allocator); int notify_tablet_finish(const common::ObTabletID &tablet_id); - int commit(); - TO_STRING_KV(K_(param), K_(ddl_ctrl)); + int commit(table::ObTableLoadSqlStatistics &sql_statistics); + void inc_row_count(int64_t row_count) { ATOMIC_AAF(&table_row_count_, row_count); } + int64_t get_row_count() const { return table_row_count_; } + TO_STRING_KV(K_(param), K_(tablet_finish_count), K_(table_row_count), K_(ddl_ctrl)); private: + int init_sql_statistics(); + int collect_sql_statistics(table::ObTableLoadSqlStatistics &sql_statistics); +private: + common::ObArenaAllocator allocator_; ObDirectLoadInsertTableParam param_; sql::ObDDLCtrl ddl_ctrl_; int64_t tablet_finish_count_ CACHE_ALIGNED; + int64_t table_row_count_ CACHE_ALIGNED; + common::ObArray session_sql_ctx_array_; bool is_inited_; }; diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp index 6f16724f25..5746c83bd2 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp @@ -50,10 +50,8 @@ ObDirectLoadMergeParam::ObDirectLoadMergeParam() snapshot_version_(0), datum_utils_(nullptr), col_descs_(nullptr), - cmp_funcs_(nullptr), is_heap_table_(false), is_fast_heap_table_(false), - online_opt_stat_gather_(false), insert_table_ctx_(nullptr), dml_row_handler_(nullptr) { @@ -67,8 +65,7 @@ bool ObDirectLoadMergeParam::is_valid() const { return OB_INVALID_ID != table_id_ && 0 < rowkey_column_num_ && 0 < store_column_count_ && snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != datum_utils_ && - nullptr != col_descs_ && nullptr != cmp_funcs_ && nullptr != insert_table_ctx_ && - nullptr != dml_row_handler_; + nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != dml_row_handler_; } /** @@ -191,7 +188,6 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam ¶m, } else { allocator_.set_tenant_id(MTL_ID()); param_ = param; - target_partition_id_ = target_ls_partition_id.part_tablet_id_.partition_id_; tablet_id_ = ls_partition_id.part_tablet_id_.tablet_id_; target_tablet_id_ = target_ls_partition_id.part_tablet_id_.tablet_id_; is_inited_ = true; @@ -200,95 +196,6 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam ¶m, return ret; } -int ObDirectLoadTabletMergeCtx::collect_sql_statistics( - const ObIArray &fast_heap_table_array, ObTableLoadSqlStatistics &sql_statistics) -{ - int ret = OB_SUCCESS; - const uint64_t tenant_id = MTL_ID(); - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObDirectLoadTabletMergeCtx not init", KR(ret), KP(this)); - } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, param_.target_table_id_, schema_guard, - table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(param_)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected error", K(ret)); - } else { - int64_t table_row_cnt = 0; - int64_t table_avg_len = 0; - int64_t col_cnt = param_.table_data_desc_.column_count_; - ObOptTableStat *table_stat = nullptr; - StatLevel stat_level; - if (table_schema->get_part_level() == PARTITION_LEVEL_ZERO) { - stat_level = TABLE_LEVEL; - } else if (table_schema->get_part_level() == PARTITION_LEVEL_ONE) { - stat_level = PARTITION_LEVEL; - } else if (table_schema->get_part_level() == PARTITION_LEVEL_TWO) { - stat_level = SUBPARTITION_LEVEL; - } else { - stat_level = INVALID_LEVEL; - } - if (OB_FAIL(sql_statistics.allocate_table_stat(table_stat))) { - LOG_WARN("fail to allocate table stat", KR(ret)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) { - int64_t col_id = param_.is_heap_table_ ? i + 1 : i; - int64_t row_count = 0; - ObOptOSGColumnStat *osg_col_stat = nullptr; - if (OB_FAIL(sql_statistics.allocate_col_stat(osg_col_stat))) { - LOG_WARN("fail to allocate table stat", KR(ret)); - } - // scan task_array - for (int64_t j = 0; OB_SUCC(ret) && j < task_array_.count(); ++j) { - ObOptOSGColumnStat *tmp_col_stat = task_array_.at(j)->get_column_stat_array().at(i); - if (task_array_.at(j)->get_row_count() != 0) { - if (OB_FAIL(osg_col_stat->merge_column_stat(*tmp_col_stat))) { - LOG_WARN("fail to merge column stat", KR(ret)); - } else { - row_count += task_array_.at(j)->get_row_count(); - } - } - } - // scan fast heap table - for (int64_t j = 0; OB_SUCC(ret) && j < fast_heap_table_array.count(); ++j) { - ObOptOSGColumnStat *tmp_col_stat = fast_heap_table_array.at(j)->get_column_stat_array().at(i); - if (fast_heap_table_array.at(j)->get_row_count() != 0) { - if (OB_FAIL(osg_col_stat->merge_column_stat(*tmp_col_stat))) { - LOG_WARN("fail to merge column stat", KR(ret)); - } else { - row_count += fast_heap_table_array.at(j)->get_row_count(); - } - } - } - if (OB_SUCC(ret)) { - table_row_cnt = row_count; - osg_col_stat->col_stat_->calc_avg_len(); - table_avg_len += osg_col_stat->col_stat_->get_avg_len(); - osg_col_stat->col_stat_->set_table_id(param_.target_table_id_); - osg_col_stat->col_stat_->set_partition_id(target_partition_id_); - osg_col_stat->col_stat_->set_stat_level(stat_level); - osg_col_stat->col_stat_->set_column_id(param_.col_descs_->at(col_id).col_id_); - osg_col_stat->col_stat_->set_num_distinct(ObGlobalNdvEval::get_ndv_from_llc(osg_col_stat->col_stat_->get_llc_bitmap())); - if (OB_FAIL(osg_col_stat->set_min_max_datum_to_obj())) { - LOG_WARN("failed to set min max datum to obj", K(ret)); - } - } - } - if (OB_SUCC(ret)) { - table_stat->set_table_id(param_.target_table_id_); - table_stat->set_partition_id(target_partition_id_); - table_stat->set_object_type(stat_level); - table_stat->set_row_count(table_row_cnt); - table_stat->set_avg_row_size(table_avg_len); - } - } - } - return ret; -} - int ObDirectLoadTabletMergeCtx::collect_dml_stat(const common::ObIArray &fast_heap_table_array, ObTableLoadDmlStat &dml_stats) { int ret = OB_SUCCESS; diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.h b/src/storage/direct_load/ob_direct_load_merge_ctx.h index 750a3ecf83..b47685e3cf 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.h +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.h @@ -51,8 +51,7 @@ public: bool is_valid() const; TO_STRING_KV(K_(table_id), K_(target_table_id), K_(rowkey_column_num), K_(store_column_count), K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), KP_(col_descs), - KP_(cmp_funcs), K_(is_heap_table), K_(is_fast_heap_table), - K_(online_opt_stat_gather), KP_(insert_table_ctx), KP_(dml_row_handler)); + K_(is_heap_table), K_(is_fast_heap_table), KP_(insert_table_ctx), KP_(dml_row_handler)); public: uint64_t table_id_; uint64_t target_table_id_; @@ -62,10 +61,8 @@ public: storage::ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; const common::ObIArray *col_descs_; - const blocksstable::ObStoreCmpFuncs *cmp_funcs_; bool is_heap_table_; bool is_fast_heap_table_; - bool online_opt_stat_gather_; ObDirectLoadInsertTableContext *insert_table_ctx_; ObDirectLoadDMLRowHandler *dml_row_handler_; }; @@ -109,8 +106,6 @@ public: int build_aggregate_merge_task_for_multiple_heap_table( const common::ObIArray &table_array); int inc_finish_count(bool &is_ready); - int collect_sql_statistics( - const common::ObIArray &fast_heap_table_array, table::ObTableLoadSqlStatistics &sql_statistics); int collect_dml_stat(const common::ObIArray &fast_heap_table_array, table::ObTableLoadDmlStat &dml_stats); const ObDirectLoadMergeParam &get_param() const { return param_; } @@ -120,7 +115,7 @@ public: { return task_array_; } - TO_STRING_KV(K_(param), K_(target_partition_id), K_(tablet_id), K_(target_tablet_id)); + TO_STRING_KV(K_(param), K_(tablet_id), K_(target_tablet_id)); private: int init_sstable_array(const common::ObIArray &table_array); int init_multiple_sstable_array( @@ -148,7 +143,6 @@ private: private: common::ObArenaAllocator allocator_; ObDirectLoadMergeParam param_; - uint64_t target_partition_id_; common::ObTabletID tablet_id_; common::ObTabletID target_tablet_id_; ObDirectLoadOriginTable origin_table_; diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp index 5a32c8bc79..0057629f1b 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp @@ -21,7 +21,6 @@ #include "storage/direct_load/ob_direct_load_multiple_heap_table.h" #include "storage/direct_load/ob_direct_load_origin_table.h" - namespace oceanbase { namespace storage @@ -29,6 +28,7 @@ namespace storage using namespace common; using namespace blocksstable; using namespace share; +using namespace table; /** * ObDirectLoadPartitionMergeTask @@ -48,16 +48,9 @@ ObDirectLoadPartitionMergeTask::ObDirectLoadPartitionMergeTask() ObDirectLoadPartitionMergeTask::~ObDirectLoadPartitionMergeTask() { - for (int64_t i = 0; i < column_stat_array_.count(); ++i) { - ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); - if (col_stat != nullptr) { - col_stat->~ObOptOSGColumnStat(); - col_stat = nullptr; - } - } } -int ObDirectLoadPartitionMergeTask::process() +int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -70,9 +63,7 @@ int ObDirectLoadPartitionMergeTask::process() ObSSTableInsertSliceWriter *writer = nullptr; ObMacroDataSeq block_start_seq; block_start_seq.set_parallel_degree(parallel_idx_); - if (merge_param_->online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) { - LOG_WARN("fail to init sql statistics", KR(ret)); - } else if (OB_FAIL(construct_row_iter(allocator_, row_iter))) { + if (OB_FAIL(construct_row_iter(allocator_, row_iter))) { LOG_WARN("fail to construct row iter", KR(ret)); } else if (OB_FAIL(merge_param_->insert_table_ctx_->construct_sstable_slice_writer( target_tablet_id, block_start_seq, writer, allocator_))) { @@ -94,7 +85,7 @@ int ObDirectLoadPartitionMergeTask::process() } } else if (OB_FAIL(writer->append_row(*const_cast(datum_row)))) { LOG_WARN("fail to append row", KR(ret), KPC(datum_row)); - } else if (merge_param_->online_opt_stat_gather_ && OB_FAIL(collect_obj(*datum_row))) { + } else if (OB_FAIL(merge_param_->insert_table_ctx_->collect_obj(thread_idx, *datum_row))) { LOG_WARN("fail to collect statistics", KR(ret)); } else { ++affected_rows_; @@ -103,6 +94,8 @@ int ObDirectLoadPartitionMergeTask::process() if (OB_SUCC(ret)) { if (OB_FAIL(writer->close())) { LOG_WARN("fail to close writer", KR(ret)); + } else { + merge_param_->insert_table_ctx_->inc_row_count(affected_rows_); } } LOG_INFO("add sstable slice end", KR(ret), K(target_tablet_id), K(tablet_id), @@ -131,74 +124,6 @@ int ObDirectLoadPartitionMergeTask::process() return ret; } -int ObDirectLoadPartitionMergeTask::init_sql_statistics() -{ - int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret)&& i < merge_param_->table_data_desc_.column_count_; ++i) { - ObOptOSGColumnStat *col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_); - if (OB_ISNULL(col_stat)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to allocate buffer", KR(ret)); - } else if (OB_FAIL(column_stat_array_.push_back(col_stat))){ - LOG_WARN("fail to push back", KR(ret)); - } - if (OB_FAIL(ret)) { - if (col_stat != nullptr) { - col_stat->~ObOptOSGColumnStat(); - allocator_.free(col_stat); - col_stat = nullptr; - } - } - } - return ret; -} - -int ObDirectLoadPartitionMergeTask::collect_obj(const ObDatumRow &datum_row) -{ - int ret = OB_SUCCESS; - const int64_t extra_rowkey_cnt = ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); - if (merge_param_->is_heap_table_ ) { - for (int64_t i = 0; OB_SUCC(ret) && i < merge_param_->table_data_desc_.column_count_; i++) { - const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt + 1]; - const ObColDesc &col_desc = merge_param_->col_descs_->at(i + 1); - const ObCmpFunc &cmp_func = merge_param_->cmp_funcs_->at(i + 1).get_cmp_func(); - ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); - bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); - if (col_stat != nullptr && is_valid) { - if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { - LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); - } - } - } - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < merge_param_->rowkey_column_num_; i++) { - const ObStorageDatum &datum = datum_row.storage_datums_[i]; - const ObColDesc &col_desc = merge_param_->col_descs_->at(i); - const ObCmpFunc &cmp_func = merge_param_->cmp_funcs_->at(i).get_cmp_func(); - ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); - bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); - if (col_stat != nullptr && is_valid) { - if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { - LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); - } - } - } - for (int64_t i = merge_param_->rowkey_column_num_; OB_SUCC(ret) && i < merge_param_->table_data_desc_.column_count_; i++) { - const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt]; - const ObColDesc &col_desc = merge_param_->col_descs_->at(i); - const ObCmpFunc &cmp_func = merge_param_->cmp_funcs_->at(i).get_cmp_func(); - ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); - bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); - if (col_stat != nullptr && is_valid) { - if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { - LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); - } - } - } - } - return ret; -} - void ObDirectLoadPartitionMergeTask::stop() { is_stop_ = true; diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.h b/src/storage/direct_load/ob_direct_load_partition_merge_task.h index e338228471..72c19fd92a 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.h +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.h @@ -41,26 +41,18 @@ class ObDirectLoadPartitionMergeTask : public common::ObDLinkBase &get_column_stat_array() const - { - return column_stat_array_; - } + int process(int64_t thread_idx); int64_t get_row_count() const { return affected_rows_; } void stop(); TO_STRING_KV(KPC_(merge_param), KPC_(merge_ctx), K_(parallel_idx)); protected: virtual int construct_row_iter(common::ObIAllocator &allocator, ObIStoreRowIterator *&row_iter) = 0; -private: - int init_sql_statistics(); - int collect_obj(const blocksstable::ObDatumRow &datum_row); protected: const ObDirectLoadMergeParam *merge_param_; ObDirectLoadTabletMergeCtx *merge_ctx_; int64_t parallel_idx_; int64_t affected_rows_; - common::ObArray column_stat_array_; common::ObArenaAllocator allocator_; bool is_stop_; bool is_inited_; diff --git a/src/storage/direct_load/ob_direct_load_table_store.cpp b/src/storage/direct_load/ob_direct_load_table_store.cpp index 30f40eb4f5..a250dc6bee 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.cpp +++ b/src/storage/direct_load/ob_direct_load_table_store.cpp @@ -33,9 +33,9 @@ using namespace table; ObDirectLoadTableStoreParam::ObDirectLoadTableStoreParam() : snapshot_version_(0), + thread_idx_(-1), datum_utils_(nullptr), col_descs_(nullptr), - cmp_funcs_(nullptr), file_mgr_(nullptr), is_multiple_mode_(false), is_fast_heap_table_(false), @@ -53,8 +53,8 @@ ObDirectLoadTableStoreParam::~ObDirectLoadTableStoreParam() bool ObDirectLoadTableStoreParam::is_valid() const { - return snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != datum_utils_ && - nullptr != col_descs_ && nullptr != cmp_funcs_ && nullptr != file_mgr_ && + return snapshot_version_ > 0 && thread_idx_ >= 0 && table_data_desc_.is_valid() && + nullptr != datum_utils_ && nullptr != col_descs_ && nullptr != file_mgr_ && (!is_fast_heap_table_ || (nullptr != insert_table_ctx_ && nullptr != fast_heap_table_ctx_)) && nullptr != dml_row_handler_; @@ -112,11 +112,10 @@ int ObDirectLoadTableStoreBucket::init(const ObDirectLoadTableStoreParam ¶m, fast_heap_table_build_param.table_data_desc_ = param.table_data_desc_; fast_heap_table_build_param.datum_utils_ = param.datum_utils_; fast_heap_table_build_param.col_descs_ = param.col_descs_; - fast_heap_table_build_param.cmp_funcs_ = param.cmp_funcs_; fast_heap_table_build_param.insert_table_ctx_ = param.insert_table_ctx_; fast_heap_table_build_param.fast_heap_table_ctx_ = param.fast_heap_table_ctx_; fast_heap_table_build_param.dml_row_handler_ = param.dml_row_handler_; - fast_heap_table_build_param.online_opt_stat_gather_ = param.online_opt_stat_gather_; + fast_heap_table_build_param.thread_idx_ = param.thread_idx_; ObDirectLoadFastHeapTableBuilder *fast_heap_table_builder = nullptr; if (OB_ISNULL(fast_heap_table_builder = table_builder_allocator_->alloc())) { diff --git a/src/storage/direct_load/ob_direct_load_table_store.h b/src/storage/direct_load/ob_direct_load_table_store.h index 7f4c236af7..0605ef2ebf 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.h +++ b/src/storage/direct_load/ob_direct_load_table_store.h @@ -34,20 +34,19 @@ public: ObDirectLoadTableStoreParam(); ~ObDirectLoadTableStoreParam(); bool is_valid() const; - TO_STRING_KV(K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), KP_(col_descs), - KP_(cmp_funcs), KP_(file_mgr), K_(is_multiple_mode), K_(is_fast_heap_table), + TO_STRING_KV(K_(snapshot_version), K_(thread_idx), K_(table_data_desc), KP_(datum_utils), + KP_(col_descs), KP_(file_mgr), K_(is_multiple_mode), K_(is_fast_heap_table), KP_(insert_table_ctx), KP_(fast_heap_table_ctx), KP_(dml_row_handler), KP_(extra_buf), K_(extra_buf_size)); public: int64_t snapshot_version_; + int64_t thread_idx_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; const common::ObIArray *col_descs_; - const blocksstable::ObStoreCmpFuncs *cmp_funcs_; ObDirectLoadTmpFileManager *file_mgr_; bool is_multiple_mode_; bool is_fast_heap_table_; - bool online_opt_stat_gather_; ObDirectLoadInsertTableContext *insert_table_ctx_; ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_; ObDirectLoadDMLRowHandler *dml_row_handler_;