diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index cb78e6098..bc6de6eaf 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -23,7 +23,6 @@ #include "observer/table_load/ob_table_load_schema.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_table_ctx.h" -#include "lib/mysqlclient/ob_mysql_result.h" namespace oceanbase { @@ -34,7 +33,6 @@ using namespace omt; using namespace share::schema; using namespace sql; using namespace table; -using namespace common; // begin ObTableDirectLoadBeginExecutor::ObTableDirectLoadBeginExecutor( @@ -209,49 +207,11 @@ int ObTableDirectLoadBeginExecutor::process() return ret; } -int ObTableDirectLoadBeginExecutor::check_need_online_gather(const uint64_t tenant_id, bool &online_opt_stat_gather) -{ - int ret = OB_SUCCESS; - ObSqlString sql; - SMART_VAR(ObMySQLProxy::MySQLResult, res) { - sqlclient::ObMySQLResult *result = nullptr; - if (OB_FAIL(sql.assign_fmt("SELECT value FROM %s WHERE (zone, name, schema_version) in (select zone, name, max(schema_version) FROM %s" - " group by zone, name) and name = '%s'", - OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_SV__OPTIMIZER_GATHER_STATS_ON_LOAD))) { - LOG_WARN("fail to append sql", KR(ret), K(tenant_id)); - } else if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, sql.ptr()))) { - LOG_WARN("fail to execute sql", KR(ret), K(sql), K(tenant_id)); - } else if (OB_ISNULL(result = res.get_result())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to get sql result", KR(ret), K(sql), K(tenant_id)); - } else { - while (OB_SUCC(ret)) { - if (OB_FAIL(result->next())) { - if (OB_ITER_END != ret) { - LOG_WARN("fail to get next row", KR(ret), K(tenant_id)); - } - } else { - ObString data; - EXTRACT_VARCHAR_FIELD_MYSQL(*result, "value", data); - if(0 == strcmp(data.ptr(), "1")) { - online_opt_stat_gather = true; - } - } - } - if (OB_ITER_END == ret) { - ret = OB_SUCCESS; - } - } - } - return ret; -} - int ObTableDirectLoadBeginExecutor::create_table_ctx() { int ret = OB_SUCCESS; const uint64_t tenant_id = client_task_->tenant_id_; const uint64_t table_id = client_task_->table_id_; - bool online_opt_stat_gather = false; ObTableLoadDDLParam ddl_param; ObTableLoadParam param; // start redef table @@ -280,8 +240,6 @@ int ObTableDirectLoadBeginExecutor::create_table_ctx() ObTenant *tenant = nullptr; if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) { LOG_WARN("fail to get tenant", KR(ret), K(tenant_id)); - } else if (OB_FAIL(check_need_online_gather(tenant_id, online_opt_stat_gather))) { - LOG_WARN("fail to get tenant", KR(ret), K(tenant_id)); } else { param.tenant_id_ = tenant_id; param.table_id_ = table_id; @@ -292,7 +250,7 @@ int ObTableDirectLoadBeginExecutor::create_table_ctx() param.column_count_ = client_task_->column_names_.count(); param.need_sort_ = true; param.px_mode_ = false; - param.online_opt_stat_gather_ = online_opt_stat_gather; + param.online_opt_stat_gather_ = false; param.dup_action_ = arg_.dup_action_; if (OB_FAIL(param.normalize())) { LOG_WARN("fail to normalize param", KR(ret)); diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h index c86feb0c4..743e0f031 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h @@ -71,7 +71,7 @@ protected: private: int create_table_ctx(); int do_begin(); - int check_need_online_gather(const uint64_t tenant_id, bool &online_opt_stat_gather); + private: ObTableLoadClientTask *client_task_; ObTableLoadTableCtx *table_ctx_; diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index f60ddd11e..e69e77411 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -264,7 +264,7 @@ int ObDirectLoadControlCommitExecutor::process() ObTableLoadStore store(table_ctx); if (OB_FAIL(store.init())) { LOG_WARN("fail to init store", KR(ret)); - } else if (OB_FAIL(store.commit(res_.result_info_, res_.sql_statistics_))) { + } else if (OB_FAIL(store.commit(res_.result_info_))) { LOG_WARN("fail to store commit", KR(ret)); } else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) { LOG_WARN("fail to remove table ctx", KR(ret), K(key)); diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 370015403..231a6a932 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -28,7 +28,6 @@ #include "observer/table_load/ob_table_load_utils.h" #include "share/ob_share_util.h" #include "share/stat/ob_incremental_stat_estimator.h" -#include "share/stat/ob_dbms_stats_utils.h" namespace oceanbase { @@ -629,7 +628,7 @@ int ObTableLoadCoordinator::add_check_merge_result_task() * commit */ -int ObTableLoadCoordinator::commit_peers(table::ObTableLoadSqlStatistics &sql_statistics) +int ObTableLoadCoordinator::commit_peers() { int ret = OB_SUCCESS; ObTableLoadArray all_addr_array; @@ -647,7 +646,7 @@ int ObTableLoadCoordinator::commit_peers(table::ObTableLoadSqlStatistics &sql_st ObTableLoadStore store(ctx_); if (OB_FAIL(store.init())) { LOG_WARN("fail to init store", KR(ret)); - } else if (OB_FAIL(store.commit(res.result_info_, res.sql_statistics_))) { + } else if (OB_FAIL(store.commit(res.result_info_))) { LOG_WARN("fail to commit store", KR(ret)); } } else { // 远端, 发送rpc @@ -658,9 +657,6 @@ int ObTableLoadCoordinator::commit_peers(table::ObTableLoadSqlStatistics &sql_st ATOMIC_AAF(&coordinator_ctx_->result_info_.deleted_, res.result_info_.deleted_); ATOMIC_AAF(&coordinator_ctx_->result_info_.skipped_, res.result_info_.skipped_); ATOMIC_AAF(&coordinator_ctx_->result_info_.warnings_, res.result_info_.warnings_); - if (OB_FAIL(sql_statistics.merge(res.sql_statistics_))) { - LOG_WARN("fail to add result sql stats", KR(ret), K(addr), K(res)); - } } } } @@ -697,10 +693,12 @@ int ObTableLoadCoordinator::commit(ObTableLoadResultInfo &result_info) ObTableLoadSqlStatistics sql_statistics; if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check coordinator status", KR(ret)); - } else if (OB_FAIL(commit_peers(sql_statistics))) { + } else if (OB_FAIL(commit_peers())) { LOG_WARN("fail to commit peers", KR(ret)); } else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) { - } else if (param_.online_opt_stat_gather_ && OB_FAIL(write_sql_stat(sql_statistics))) { + } else if (param_.online_opt_stat_gather_ && + OB_FAIL( + drive_sql_stat(coordinator_ctx_->exec_ctx_->get_exec_ctx()))) { LOG_WARN("fail to drive sql stat", KR(ret)); } else if (OB_FAIL(commit_redef_table())) { LOG_WARN("fail to commit redef table", KR(ret)); @@ -727,10 +725,12 @@ int ObTableLoadCoordinator::px_commit_data() ObTableLoadSqlStatistics sql_statistics; if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check coordinator status", KR(ret)); - } else if (OB_FAIL(commit_peers(sql_statistics))) { + } else if (OB_FAIL(commit_peers())) { LOG_WARN("fail to commit peers", KR(ret)); } else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) { - } else if (param_.online_opt_stat_gather_ && OB_FAIL(write_sql_stat(sql_statistics))) { + } else if (param_.online_opt_stat_gather_ && + OB_FAIL( + drive_sql_stat(coordinator_ctx_->exec_ctx_->get_exec_ctx()))) { LOG_WARN("fail to drive sql stat", KR(ret)); } } @@ -758,28 +758,32 @@ int ObTableLoadCoordinator::px_commit_ddl() return ret; } -int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statistics) +int ObTableLoadCoordinator::drive_sql_stat(ObExecContext *ctx) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); const uint64_t table_id = ctx_->ddl_param_.dest_table_id_; - ObSEArray global_column_stats; - ObSEArray global_table_stats; ObSchemaGetterGuard schema_guard; + ObSchemaGetterGuard *tmp_schema_guard = nullptr; const ObTableSchema *table_schema = nullptr; - if (OB_UNLIKELY(sql_statistics.is_empty())) { + if (OB_UNLIKELY(nullptr == ctx)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(sql_statistics)); - } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, + LOG_WARN("invalid args", KR(ret), KPC(ctx)); + } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(sql_statistics.get_col_stat_array(global_column_stats))) { - LOG_WARN("failed to get column stat array"); - } else if (OB_FAIL(sql_statistics.get_table_stat_array(global_table_stats))) { - LOG_WARN("failed to get table stat array"); - } else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write(&schema_guard, ctx_->session_info_, GCTX.sql_proxy_, global_table_stats, global_column_stats))) { - LOG_WARN("failed to batch write stats", K(ret), K(sql_statistics.table_stat_array_), - K(sql_statistics.col_stat_array_)); + } else { + tmp_schema_guard = ctx->get_virtual_table_ctx().schema_guard_; + ctx->get_sql_ctx()->schema_guard_ = &schema_guard; + ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx()); + } + if (OB_SUCC(ret)) { + if (OB_FAIL(ObIncrementalStatEstimator::derive_global_stat_by_direct_load( + *ctx, table_id))) { + LOG_WARN("fail to drive global stat by direct load", KR(ret)); + } + ctx->get_sql_ctx()->schema_guard_ = tmp_schema_guard; + ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx()); } return ret; } diff --git a/src/observer/table_load/ob_table_load_coordinator.h b/src/observer/table_load/ob_table_load_coordinator.h index dfc03b1b4..f6b7383b9 100644 --- a/src/observer/table_load/ob_table_load_coordinator.h +++ b/src/observer/table_load/ob_table_load_coordinator.h @@ -60,9 +60,9 @@ private: int confirm_begin_peers(); int pre_merge_peers(); int start_merge_peers(); - int commit_peers(table::ObTableLoadSqlStatistics &sql_statistics); + int commit_peers(); int commit_redef_table(); - int write_sql_stat(table::ObTableLoadSqlStatistics &sql_statistics); + int drive_sql_stat(sql::ObExecContext *ctx); int heart_beat_peer(); private: int add_check_merge_result_task(); diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index 07abe1aec..b63bc952f 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -100,7 +100,8 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &idx_array, LOG_WARN("ObTableLoadCoordinatorCtx init twice", KR(ret), KP(this)); } else if (OB_UNLIKELY( idx_array.count() != ctx_->param_.column_count_ || nullptr == exec_ctx || - !exec_ctx->is_valid())) { + !exec_ctx->is_valid() || + (ctx_->param_.online_opt_stat_gather_ && nullptr == exec_ctx->get_exec_ctx()))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(ctx_->param_), K(idx_array.count()), KPC(exec_ctx)); } else { diff --git a/src/observer/table_load/ob_table_load_merger.cpp b/src/observer/table_load/ob_table_load_merger.cpp index 53a9db05f..fec1500b9 100644 --- a/src/observer/table_load/ob_table_load_merger.cpp +++ b/src/observer/table_load/ob_table_load_merger.cpp @@ -40,8 +40,8 @@ using namespace table; class ObTableLoadMerger::MergeTaskProcessor : public ObITableLoadTaskProcessor { public: - MergeTaskProcessor(ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadMerger *merger, int64_t thread_idx) - : ObITableLoadTaskProcessor(task), ctx_(ctx), merger_(merger), thread_idx_(thread_idx) + MergeTaskProcessor(ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadMerger *merger) + : ObITableLoadTaskProcessor(task), ctx_(ctx), merger_(merger) { ctx_->inc_ref_count(); } @@ -63,7 +63,7 @@ public: ret = OB_SUCCESS; break; } - } else if (OB_FAIL(merge_task->process(thread_idx_))) { + } else if (OB_FAIL(merge_task->process())) { LOG_WARN("fail to process merge task", KR(ret)); } if (nullptr != merge_task) { @@ -75,7 +75,6 @@ public: private: ObTableLoadTableCtx *const ctx_; ObTableLoadMerger *const merger_; - int64_t thread_idx_; }; class ObTableLoadMerger::MergeTaskCallback : public ObITableLoadTaskCallback @@ -204,8 +203,10 @@ int ObTableLoadMerger::build_merge_ctx() merge_param.table_data_desc_ = store_ctx_->table_data_desc_; merge_param.datum_utils_ = &(store_ctx_->ctx_->schema_.datum_utils_); merge_param.col_descs_ = &(store_ctx_->ctx_->schema_.column_descs_); + merge_param.cmp_funcs_ = &(store_ctx_->ctx_->schema_.cmp_funcs_); merge_param.is_heap_table_ = store_ctx_->ctx_->schema_.is_heap_table_; merge_param.is_fast_heap_table_ = store_ctx_->is_fast_heap_table_; + merge_param.online_opt_stat_gather_ = param_.online_opt_stat_gather_; merge_param.insert_table_ctx_ = store_ctx_->insert_table_ctx_; merge_param.dml_row_handler_ = store_ctx_->error_row_handler_; if (OB_FAIL(merge_ctx_.init(merge_param, store_ctx_->ls_partition_ids_, @@ -364,6 +365,58 @@ int ObTableLoadMerger::collect_dml_stat(ObTableLoadDmlStat &dml_stats) return ret; } +int ObTableLoadMerger::collect_sql_statistics(ObTableLoadSqlStatistics &sql_statistics) +{ + int ret = OB_SUCCESS; + if (store_ctx_->is_fast_heap_table_) { + ObDirectLoadMultiMap tables; + ObArray trans_store_array; + if (OB_FAIL(tables.init())) { + LOG_WARN("fail to init table", KR(ret)); + } else if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) { + LOG_WARN("fail to get trans store", KR(ret)); + } else { + for (int i = 0; OB_SUCC(ret) && i < trans_store_array.count(); ++i) { + ObTableLoadTransStore *trans_store = trans_store_array.at(i); + for (int j = 0; OB_SUCC(ret) && j < trans_store->session_store_array_.count(); ++j) { + ObTableLoadTransStore::SessionStore * session_store = trans_store->session_store_array_.at(j); + for (int k = 0 ; OB_SUCC(ret) && k < session_store->partition_table_array_.count(); ++k) { + ObIDirectLoadPartitionTable *table = session_store->partition_table_array_.at(k); + ObDirectLoadFastHeapTable *sstable = nullptr; + if (OB_ISNULL(sstable = dynamic_cast(table))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected not heap sstable", KR(ret), KPC(table)); + } else { + const ObTabletID &tablet_id = sstable->get_tablet_id(); + if (OB_FAIL(tables.add(tablet_id, sstable))) { + LOG_WARN("fail to add tables", KR(ret), KPC(sstable)); + } + } + } + } + } + for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { + ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); + ObArray heap_table_array ; + if (OB_FAIL(tables.get(tablet_ctx->get_tablet_id(), heap_table_array))) { + LOG_WARN("get heap sstable failed", KR(ret)); + } else if (OB_FAIL(tablet_ctx->collect_sql_statistics(heap_table_array, sql_statistics))) { + LOG_WARN("fail to collect sql statics", KR(ret)); + } + } + } + } else { + for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { + ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); + ObArray heap_table_array ; + if (OB_FAIL(tablet_ctx->collect_sql_statistics(heap_table_array, sql_statistics))) { + LOG_WARN("fail to collect sql statics", KR(ret)); + } + } + } + return ret; +} + int ObTableLoadMerger::start_merge() { int ret = OB_SUCCESS; @@ -376,7 +429,7 @@ int ObTableLoadMerger::start_merge() LOG_WARN("fail to alloc task", KR(ret)); } // 2. 设置processor - else if (OB_FAIL(task->set_processor(ctx, this, thread_idx))) { + else if (OB_FAIL(task->set_processor(ctx, this))) { LOG_WARN("fail to set merge task processor", KR(ret)); } // 3. 设置callback diff --git a/src/observer/table_load/ob_table_load_merger.h b/src/observer/table_load/ob_table_load_merger.h index f7f1ab2da..8d25d1c3f 100644 --- a/src/observer/table_load/ob_table_load_merger.h +++ b/src/observer/table_load/ob_table_load_merger.h @@ -38,6 +38,7 @@ public: int start(); void stop(); int handle_table_compact_success(); + int collect_sql_statistics(table::ObTableLoadSqlStatistics &sql_statistics); int collect_dml_stat(table::ObTableLoadDmlStat &dml_stats); private: int build_merge_ctx(); diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index 957442d4a..b4661fe45 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -25,6 +25,7 @@ #include "observer/table_load/ob_table_load_utils.h" #include "storage/direct_load/ob_direct_load_insert_table_ctx.h" #include "share/stat/ob_opt_stat_monitor_manager.h" +#include "share/stat/ob_dbms_stats_utils.h" #include "storage/blocksstable/ob_sstable.h" namespace oceanbase @@ -297,7 +298,7 @@ int ObTableLoadStore::start_merge() return ret; } -int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlStatistics &sql_statistics) +int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -307,12 +308,19 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlS LOG_INFO("store commit"); ObMutexGuard guard(store_ctx_->get_op_lock()); ObTableLoadDmlStat dml_stats; + ObTableLoadSqlStatistics sql_statistics; if (OB_FAIL(store_ctx_->check_status(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check store status", KR(ret)); - } else if (OB_FAIL(store_ctx_->insert_table_ctx_->commit(sql_statistics))) { + } else if (OB_FAIL(store_ctx_->insert_table_ctx_->commit())) { LOG_WARN("fail to commit insert table", KR(ret)); } else if (ctx_->schema_.has_autoinc_column_ && OB_FAIL(store_ctx_->commit_autoinc_value())) { LOG_WARN("fail to commit sync auto increment value", KR(ret)); + } else if (param_.online_opt_stat_gather_ && + OB_FAIL(store_ctx_->merger_->collect_sql_statistics(sql_statistics))) { + LOG_WARN("fail to collect sql stats", KR(ret)); + } else if (param_.online_opt_stat_gather_ && + OB_FAIL(commit_sql_statistics(sql_statistics))) { + LOG_WARN("fail to commit sql stats", KR(ret)); } else if (OB_FAIL(store_ctx_->merger_->collect_dml_stat(dml_stats))) { LOG_WARN("fail to build dml stat", KR(ret)); } else if (OB_FAIL(ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(dml_stats.dml_stat_array_))) { @@ -327,6 +335,34 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlS return ret; } +int ObTableLoadStore::commit_sql_statistics(const ObTableLoadSqlStatistics &sql_statistics) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + const uint64_t table_id = param_.table_id_; + ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = nullptr; + ObSEArray part_column_stats; + ObSEArray part_table_stats; + if (sql_statistics.is_empty()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql statistics is empty", K(ret)); + } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, + table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_FAIL(sql_statistics.get_col_stat_array(part_column_stats))) { + LOG_WARN("failed to get column stat array"); + } else if (OB_FAIL(sql_statistics.get_table_stat_array(part_table_stats))) { + LOG_WARN("failed to get table stat array"); + } else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write( + &schema_guard, store_ctx_->ctx_->session_info_, GCTX.sql_proxy_, part_table_stats, + part_column_stats))) { + LOG_WARN("failed to batch write stats", K(ret), K(sql_statistics.table_stat_array_), + K(sql_statistics.col_stat_array_)); + } + return ret; +} + int ObTableLoadStore::get_status(ObTableLoadStatusType &status, int &error_code) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_store.h b/src/observer/table_load/ob_table_load_store.h index 83344bd1d..59d2cd28f 100644 --- a/src/observer/table_load/ob_table_load_store.h +++ b/src/observer/table_load/ob_table_load_store.h @@ -46,9 +46,11 @@ public: int confirm_begin(); int pre_merge(const table::ObTableLoadArray &committed_trans_id_array); int start_merge(); - int commit(table::ObTableLoadResultInfo &result_info, table::ObTableLoadSqlStatistics &sql_statistics); + int commit(table::ObTableLoadResultInfo &result_info); int get_status(table::ObTableLoadStatusType &status, int &error_code); int heart_beat(); +private: + int commit_sql_statistics(const table::ObTableLoadSqlStatistics &sql_statistics); private: class MergeTaskProcessor; class MergeTaskCallback; diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 91e85790b..fe771087a 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -83,19 +83,11 @@ int ObTableLoadStoreCtx::init( } else { ObDirectLoadInsertTableParam insert_table_param; insert_table_param.table_id_ = ctx_->param_.table_id_; - insert_table_param.dest_table_id_= ctx_->ddl_param_.dest_table_id_; insert_table_param.schema_version_ = ctx_->ddl_param_.schema_version_; insert_table_param.snapshot_version_ = ctx_->ddl_param_.snapshot_version_; insert_table_param.ddl_task_id_ = ctx_->ddl_param_.task_id_; insert_table_param.execution_id_ = 1; //仓氐说暂时设置为1,不然后面检测过不了 insert_table_param.data_version_ = ctx_->ddl_param_.data_version_; - insert_table_param.session_cnt_ = ctx_->param_.session_count_; - insert_table_param.rowkey_column_count_ = (!ctx_->schema_.is_heap_table_ ? ctx_->schema_.rowkey_column_count_ : 0); - insert_table_param.column_count_ = ctx_->param_.column_count_; - insert_table_param.online_opt_stat_gather_ = ctx_->param_.online_opt_stat_gather_; - insert_table_param.is_heap_table_ = ctx_->schema_.is_heap_table_; - insert_table_param.col_descs_ = &(ctx_->schema_.column_descs_); - insert_table_param.cmp_funcs_ = &(ctx_->schema_.cmp_funcs_); for (int64_t i = 0; OB_SUCC(ret) && i < partition_id_array.count(); ++i) { const ObLSID &ls_id = partition_id_array[i].ls_id_; const ObTableLoadPartitionId &part_tablet_id = partition_id_array[i].part_tablet_id_; diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index 42cda5820..4c48976c8 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -210,15 +210,16 @@ int ObTableLoadTransStoreWriter::init_session_ctx_array() param.table_data_desc_ = *table_data_desc_; param.datum_utils_ = &(trans_ctx_->ctx_->schema_.datum_utils_); param.col_descs_ = &(trans_ctx_->ctx_->schema_.column_descs_); + param.cmp_funcs_ = &(trans_ctx_->ctx_->schema_.cmp_funcs_); param.file_mgr_ = trans_ctx_->ctx_->store_ctx_->tmp_file_mgr_; param.is_multiple_mode_ = trans_ctx_->ctx_->store_ctx_->is_multiple_mode_; param.is_fast_heap_table_ = trans_ctx_->ctx_->store_ctx_->is_fast_heap_table_; + param.online_opt_stat_gather_ = trans_ctx_->ctx_->param_.online_opt_stat_gather_; param.insert_table_ctx_ = trans_ctx_->ctx_->store_ctx_->insert_table_ctx_; param.fast_heap_table_ctx_ = trans_ctx_->ctx_->store_ctx_->fast_heap_table_ctx_; param.dml_row_handler_ = trans_ctx_->ctx_->store_ctx_->error_row_handler_; for (int64_t i = 0; OB_SUCC(ret) && i < session_count; ++i) { SessionContext *session_ctx = session_ctx_array_ + i; - param.thread_idx_ = i; if (param_.px_mode_) { session_ctx->extra_buf_size_ = table_data_desc_->extra_buf_size_; if (OB_ISNULL(session_ctx->extra_buf_ = diff --git a/src/share/table/ob_table_load_sql_statistics.cpp b/src/share/table/ob_table_load_sql_statistics.cpp index dc97ea86f..8f56ad9cc 100644 --- a/src/share/table/ob_table_load_sql_statistics.cpp +++ b/src/share/table/ob_table_load_sql_statistics.cpp @@ -21,7 +21,6 @@ namespace table void ObTableLoadSqlStatistics::reset() { - int ret = OB_SUCCESS; for (int64_t i = 0; i < col_stat_array_.count(); ++i) { ObOptOSGColumnStat *col_stat = col_stat_array_.at(i); if (col_stat != nullptr) { @@ -83,73 +82,53 @@ int ObTableLoadSqlStatistics::allocate_col_stat(ObOptOSGColumnStat *&col_stat) return ret; } -int ObTableLoadSqlStatistics::merge(const ObTableLoadSqlStatistics& other) +int ObTableLoadSqlStatistics::add(const ObTableLoadSqlStatistics& other) { int ret = OB_SUCCESS; - if (is_empty()) { - for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) { - ObOptTableStat *table_stat = other.table_stat_array_.at(i); - if (table_stat != nullptr) { - ObOptTableStat *copied_table_stat = nullptr; - int64_t size = table_stat->size(); - char *new_buf = nullptr; - if (OB_ISNULL(new_buf = static_cast(allocator_.alloc(size)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size)); - } else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) { - OB_LOG(WARN, "fail to copy table stat", KR(ret)); - } else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) { - OB_LOG(WARN, "fail to add table stat", KR(ret)); - } - if (OB_FAIL(ret)) { - if (copied_table_stat != nullptr) { - copied_table_stat->~ObOptTableStat(); - copied_table_stat = nullptr; - } - if(new_buf != nullptr) { - allocator_.free(new_buf); - new_buf = nullptr; - } - } - } - } - for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) { - ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i); - ObOptOSGColumnStat *copied_col_stat = nullptr; - if (OB_ISNULL(col_stat)) { - ret = OB_ERR_UNEXPECTED; - OB_LOG(WARN, "get unexpected null"); - } else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) { + for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) { + ObOptTableStat *table_stat = other.table_stat_array_.at(i); + if (table_stat != nullptr) { + ObOptTableStat *copied_table_stat = nullptr; + int64_t size = table_stat->size(); + char *new_buf = nullptr; + if (OB_ISNULL(new_buf = static_cast(allocator_.alloc(size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "failed to create new col stat"); - } else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) { - OB_LOG(WARN, "fail to copy col stat", KR(ret)); - } else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) { - OB_LOG(WARN, "fail to add col stat", KR(ret)); + OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size)); + } else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) { + OB_LOG(WARN, "fail to copy table stat", KR(ret)); + } else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) { + OB_LOG(WARN, "fail to add table stat", KR(ret)); } if (OB_FAIL(ret)) { - if (copied_col_stat != nullptr) { - copied_col_stat->~ObOptOSGColumnStat(); - copied_col_stat = nullptr; + if (copied_table_stat != nullptr) { + copied_table_stat->~ObOptTableStat(); + copied_table_stat = nullptr; + } + if(new_buf != nullptr) { + allocator_.free(new_buf); + new_buf = nullptr; } } } - } else { - for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) { - ObOptTableStat *table_stat = other.table_stat_array_.at(i); - if (OB_FAIL(table_stat_array_.at(i)->merge_table_stat(*table_stat))) { - OB_LOG(WARN, "failed to merge table stat"); - } + } + for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) { + ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i); + ObOptOSGColumnStat *copied_col_stat = nullptr; + if (OB_ISNULL(col_stat)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "get unexpected null"); + } else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed to create new col stat"); + } else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) { + OB_LOG(WARN, "fail to copy col stat", KR(ret)); + } else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) { + OB_LOG(WARN, "fail to add col stat", KR(ret)); } - for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) { - ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i); - if (OB_ISNULL(col_stat)) { - ret = OB_ERR_UNEXPECTED; - OB_LOG(WARN, "get unexpected null"); - } - // ObOptOSGColumnStat的序列化结果只序列化里面的ObOptColumnStat, 需要调用ObOptColumnStat的merge函数 - else if (OB_FAIL(col_stat_array_.at(i)->col_stat_->merge_column_stat(*col_stat->col_stat_))) { - OB_LOG(WARN, "failed to merge col stat"); + if (OB_FAIL(ret)) { + if (copied_col_stat != nullptr) { + copied_col_stat->~ObOptOSGColumnStat(); + copied_col_stat = nullptr; } } } diff --git a/src/share/table/ob_table_load_sql_statistics.h b/src/share/table/ob_table_load_sql_statistics.h index 30b2ab5c2..6cfd7d93e 100644 --- a/src/share/table/ob_table_load_sql_statistics.h +++ b/src/share/table/ob_table_load_sql_statistics.h @@ -34,16 +34,14 @@ public: } int allocate_table_stat(ObOptTableStat *&table_stat); int allocate_col_stat(ObOptOSGColumnStat *&col_stat); - int merge(const ObTableLoadSqlStatistics& other); + int add(const ObTableLoadSqlStatistics& other); int get_table_stat_array(ObIArray &table_stat_array) const; int get_col_stat_array(ObIArray &col_stat_array) const; int persistence_col_stats(); - ObIArray & get_table_stat_array() {return table_stat_array_; } - ObIArray & get_col_stat_array() {return col_stat_array_; } TO_STRING_KV(K_(col_stat_array), K_(table_stat_array)); public: - common::ObArray table_stat_array_; - common::ObArray col_stat_array_; + common::ObSEArray table_stat_array_; + common::ObSEArray col_stat_array_; common::ObArenaAllocator allocator_; }; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 0ae4221c2..b5b980a03 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -1989,9 +1989,12 @@ int ObLoadDataDirectImpl::init_execute_param() } // need_sort_ if (OB_SUCC(ret)) { + int64_t append = 0; int64_t enable_direct = 0; int64_t hint_need_sort = 0; - if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) { + if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) { + LOG_WARN("fail to get value of APPEND", K(ret)); + } else if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) { LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret)); } else if (OB_FAIL(hint.get_value(ObLoadDataHint::NEED_SORT, hint_need_sort))) { LOG_WARN("fail to get value of NEED_SORT", KR(ret), K(hint)); @@ -2016,7 +2019,8 @@ int ObLoadDataDirectImpl::init_execute_param() } // online_opt_stat_gather_ if (OB_SUCC(ret)) { - int64_t no_gather_optimizer_statistics = 0 ; + int64_t append = 0; + int64_t gather_optimizer_statistics = 0 ; ObSQLSessionInfo *session = nullptr; ObObj obj; if (OB_ISNULL(session = ctx_->get_my_session())) { @@ -2024,9 +2028,11 @@ int ObLoadDataDirectImpl::init_execute_param() LOG_WARN("session is null", KR(ret)); } else if (OB_FAIL(session->get_sys_variable(SYS_VAR__OPTIMIZER_GATHER_STATS_ON_LOAD, obj))) { LOG_WARN("fail to get sys variable", K(ret)); - } else if (OB_FAIL(hint.get_value(ObLoadDataHint::NO_GATHER_OPTIMIZER_STATISTICS, no_gather_optimizer_statistics))) { + } else if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) { LOG_WARN("fail to get value of APPEND", K(ret)); - } else if ((no_gather_optimizer_statistics == 0) && obj.get_bool()) { + } else if (OB_FAIL(hint.get_value(ObLoadDataHint::GATHER_OPTIMIZER_STATISTICS, gather_optimizer_statistics))) { + LOG_WARN("fail to get value of APPEND", K(ret)); + } else if (((append != 0) || (gather_optimizer_statistics != 0)) && obj.get_bool()) { execute_param_.online_opt_stat_gather_ = true; } else { execute_param_.online_opt_stat_gather_ = false; @@ -2034,11 +2040,19 @@ int ObLoadDataDirectImpl::init_execute_param() } // max_error_rows_ if (OB_SUCC(ret)) { + int64_t append = 0; + int64_t enable_direct = 0; int64_t hint_error_rows = 0; - if (OB_FAIL(hint.get_value(ObLoadDataHint::ERROR_ROWS, hint_error_rows))) { + if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) { + LOG_WARN("fail to get value of APPEND", K(ret)); + } else if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) { + LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret)); + } else if (OB_FAIL(hint.get_value(ObLoadDataHint::ERROR_ROWS, hint_error_rows))) { LOG_WARN("fail to get value of ERROR_ROWS", KR(ret), K(hint)); - } else { + } else if (enable_direct != 0) { execute_param_.max_error_rows_ = hint_error_rows; + } else { + execute_param_.max_error_rows_ = 0; } } // data_access_param_ diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp b/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp index 25a871d32..24ba72aef 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp @@ -25,7 +25,7 @@ using namespace common; */ ObDirectLoadFastHeapTableCreateParam::ObDirectLoadFastHeapTableCreateParam() - : row_count_(0) + : row_count_(0) , column_stat_array_(nullptr) { } @@ -35,7 +35,7 @@ ObDirectLoadFastHeapTableCreateParam::~ObDirectLoadFastHeapTableCreateParam() bool ObDirectLoadFastHeapTableCreateParam::is_valid() const { - return tablet_id_.is_valid() && row_count_ >= 0; + return tablet_id_.is_valid() && row_count_ >= 0 && nullptr != column_stat_array_ ; } /** @@ -43,14 +43,45 @@ bool ObDirectLoadFastHeapTableCreateParam::is_valid() const */ ObDirectLoadFastHeapTable::ObDirectLoadFastHeapTable() - : is_inited_(false) + : allocator_("TLD_FastHTable"), is_inited_(false) { } ObDirectLoadFastHeapTable::~ObDirectLoadFastHeapTable() { + for (int64_t i = 0; i < column_stat_array_.count(); ++i) { + ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); + col_stat->~ObOptOSGColumnStat(); + col_stat = nullptr; + } } +int ObDirectLoadFastHeapTable::copy_col_stat(const ObDirectLoadFastHeapTableCreateParam ¶m) +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret)&& i < param.column_stat_array_->count(); ++i) { + ObOptOSGColumnStat *col_stat = param.column_stat_array_->at(i); + ObOptOSGColumnStat *copied_col_stat = NULL; + if (OB_ISNULL(col_stat)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null"); + } else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate memory"); + } else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) { + LOG_WARN("fail to copy colstat", KR(ret)); + } else if (OB_FAIL(column_stat_array_.push_back(copied_col_stat))) { + LOG_WARN("fail to add table", KR(ret)); + } + if (OB_FAIL(ret) && OB_NOT_NULL(copied_col_stat)) { + copied_col_stat->~ObOptOSGColumnStat(); + copied_col_stat = nullptr; + } + } + return ret; +} + + int ObDirectLoadFastHeapTable::init(const ObDirectLoadFastHeapTableCreateParam ¶m) { int ret = OB_SUCCESS; @@ -63,7 +94,12 @@ int ObDirectLoadFastHeapTable::init(const ObDirectLoadFastHeapTableCreateParam & } else { meta_.tablet_id_ = param.tablet_id_; meta_.row_count_ = param.row_count_; - is_inited_ = true; + allocator_.set_tenant_id(MTL_ID()); + if (OB_FAIL(copy_col_stat(param))){ + LOG_WARN("fail to inner init", KR(ret), K(param)); + } else { + is_inited_ = true; + } } return ret; } diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table.h b/src/storage/direct_load/ob_direct_load_fast_heap_table.h index cc597bb89..1c8c22bd9 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table.h +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table.h @@ -31,6 +31,7 @@ public: public: common::ObTabletID tablet_id_; int64_t row_count_; + common::ObArray *column_stat_array_; }; struct ObDirectLoadFastHeapTableMeta @@ -46,13 +47,21 @@ public: ObDirectLoadFastHeapTable(); virtual ~ObDirectLoadFastHeapTable(); int init(const ObDirectLoadFastHeapTableCreateParam ¶m); + const common::ObIArray &get_column_stat_array() const + { + return column_stat_array_; + } const common::ObTabletID &get_tablet_id() const override { return meta_.tablet_id_; } int64_t get_row_count() const override { return meta_.row_count_; } bool is_valid() const override { return is_inited_; } const ObDirectLoadFastHeapTableMeta &get_meta() const { return meta_; } TO_STRING_KV(K_(meta)); +private: + int copy_col_stat(const ObDirectLoadFastHeapTableCreateParam ¶m); private: ObDirectLoadFastHeapTableMeta meta_; + common::ObArenaAllocator allocator_; + common::ObArray column_stat_array_; bool is_inited_; DISABLE_COPY_ASSIGN(ObDirectLoadFastHeapTable); }; diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp index 38f53a74c..21712a96b 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp @@ -34,12 +34,13 @@ using namespace share; ObDirectLoadFastHeapTableBuildParam::ObDirectLoadFastHeapTableBuildParam() : snapshot_version_(0), - thread_idx_(0), datum_utils_(nullptr), col_descs_(nullptr), + cmp_funcs_(nullptr), insert_table_ctx_(nullptr), fast_heap_table_ctx_(nullptr), - dml_row_handler_(nullptr) + dml_row_handler_(nullptr), + online_opt_stat_gather_(false) { } @@ -49,8 +50,8 @@ ObDirectLoadFastHeapTableBuildParam::~ObDirectLoadFastHeapTableBuildParam() bool ObDirectLoadFastHeapTableBuildParam::is_valid() const { - return tablet_id_.is_valid() && snapshot_version_ > 0 && thread_idx_ >= 0 && - table_data_desc_.is_valid() && nullptr != col_descs_ && nullptr != insert_table_ctx_ && + return tablet_id_.is_valid() && snapshot_version_ > 0 && table_data_desc_.is_valid() && + nullptr != col_descs_ && nullptr != cmp_funcs_ && nullptr != insert_table_ctx_ && nullptr != fast_heap_table_ctx_ && nullptr != dml_row_handler_ && nullptr != datum_utils_; } @@ -76,6 +77,56 @@ ObDirectLoadFastHeapTableBuilder::~ObDirectLoadFastHeapTableBuilder() slice_writer_allocator_.free(slice_writer_); slice_writer_ = nullptr; } + for (int64_t i = 0; i < column_stat_array_.count(); ++i) { + ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); + col_stat->~ObOptOSGColumnStat(); + allocator_.free(col_stat); + col_stat = nullptr; + } +} + +int ObDirectLoadFastHeapTableBuilder::init_sql_statistics() +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < param_.table_data_desc_.column_count_; ++i) { + ObOptOSGColumnStat *new_osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_); + if (OB_ISNULL(new_osg_col_stat)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate col stat"); + } else if (OB_FAIL(column_stat_array_.push_back(new_osg_col_stat))) { + LOG_WARN("fail to push back", KR(ret)); + } + if (OB_FAIL(ret)) { + if (new_osg_col_stat != nullptr) { + new_osg_col_stat->~ObOptOSGColumnStat(); + allocator_.free(new_osg_col_stat); + new_osg_col_stat = nullptr; + } + } + } + return ret; +} + +int ObDirectLoadFastHeapTableBuilder::collect_obj(const ObDatumRow &datum_row) +{ + int ret = OB_SUCCESS; + const int64_t extra_rowkey_cnt = ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); + for (int64_t i = 0; OB_SUCC(ret) && i < param_.table_data_desc_.column_count_; i++) { + const ObStorageDatum &datum = + datum_row.storage_datums_[i + extra_rowkey_cnt + 1]; + const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i + 1).get_cmp_func(); + const ObColDesc &col_desc = param_.col_descs_->at(i + 1); + ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); + bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); + if (col_stat != nullptr && is_valid) { + if (OB_FAIL(col_stat->update_column_stat_info(&datum, + col_desc.col_type_, + cmp_func.cmp_func_))) { + LOG_WARN("failed to update column stat info"); + } + } + } + return ret; } int ObDirectLoadFastHeapTableBuilder::init(const ObDirectLoadFastHeapTableBuildParam ¶m) @@ -91,7 +142,9 @@ int ObDirectLoadFastHeapTableBuilder::init(const ObDirectLoadFastHeapTableBuildP param_ = param; allocator_.set_tenant_id(MTL_ID()); slice_writer_allocator_.set_tenant_id(MTL_ID()); - if (OB_FAIL(param_.fast_heap_table_ctx_->get_tablet_context( + if (param_.online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) { + LOG_WARN("fail to inner init sql statistics", KR(ret)); + } else if (OB_FAIL(param_.fast_heap_table_ctx_->get_tablet_context( param_.tablet_id_, fast_heap_table_tablet_ctx_))) { LOG_WARN("fail to get tablet context", KR(ret)); } else if (OB_FAIL(init_sstable_slice_ctx())) { @@ -181,7 +234,7 @@ int ObDirectLoadFastHeapTableBuilder::append_row(const ObTabletID &tablet_id, } if (OB_FAIL(slice_writer_->append_row(datum_row_))) { LOG_WARN("fail to append row", KR(ret)); - } else if (param_.insert_table_ctx_->collect_obj(param_.thread_idx_ ,datum_row_)) { + } else if (param_.online_opt_stat_gather_ && OB_FAIL(collect_obj(datum_row_))) { LOG_WARN("fail to collect", KR(ret)); } else { ++row_count_; @@ -209,7 +262,6 @@ int ObDirectLoadFastHeapTableBuilder::close() if (OB_FAIL(slice_writer_->close())) { LOG_WARN("fail to close sstable slice writer", KR(ret)); } else { - param_.insert_table_ctx_->inc_row_count(row_count_); is_closed_ = true; } } @@ -230,6 +282,7 @@ int ObDirectLoadFastHeapTableBuilder::get_tables( ObDirectLoadFastHeapTableCreateParam create_param; create_param.tablet_id_ = param_.tablet_id_; create_param.row_count_ = row_count_; + create_param.column_stat_array_ = &column_stat_array_; ObDirectLoadFastHeapTable *fast_heap_table = nullptr; if (OB_ISNULL(fast_heap_table = OB_NEWx(ObDirectLoadFastHeapTable, (&allocator)))) { ret = OB_ALLOCATE_MEMORY_FAILED; diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h index 3ad2cee9d..c87f5bc01 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h @@ -39,19 +39,20 @@ public: ObDirectLoadFastHeapTableBuildParam(); ~ObDirectLoadFastHeapTableBuildParam(); bool is_valid() const; - TO_STRING_KV(K_(tablet_id), K_(snapshot_version), K_(thread_idx), K_(table_data_desc), - KP_(datum_utils), KP_(col_descs), KP_(insert_table_ctx), KP_(fast_heap_table_ctx), - KP_(dml_row_handler)); + TO_STRING_KV(K_(tablet_id), K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), + KP_(col_descs), KP_(cmp_funcs), KP_(insert_table_ctx), KP_(fast_heap_table_ctx), + KP_(dml_row_handler), K_(online_opt_stat_gather)); public: common::ObTabletID tablet_id_; int64_t snapshot_version_; - int64_t thread_idx_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; const common::ObIArray *col_descs_; + const blocksstable::ObStoreCmpFuncs *cmp_funcs_; ObDirectLoadInsertTableContext *insert_table_ctx_; ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_; ObDirectLoadDMLRowHandler *dml_row_handler_; + bool online_opt_stat_gather_; }; class ObDirectLoadFastHeapTableBuilder : public ObIDirectLoadPartitionTableBuilder @@ -69,6 +70,8 @@ public: int get_tables(common::ObIArray &table_array, common::ObIAllocator &allocator) override; private: + int init_sql_statistics(); + int collect_obj(const blocksstable::ObDatumRow &datum_row); int init_sstable_slice_ctx(); int switch_sstable_slice(); private: @@ -79,6 +82,7 @@ private: ObSSTableInsertSliceWriter *slice_writer_; ObDirectLoadFastHeapTableTabletWriteCtx write_ctx_; blocksstable::ObDatumRow datum_row_; + common::ObArray column_stat_array_; int64_t row_count_; bool is_closed_; bool is_inited_; diff --git a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp index 440deddab..3abbdd57f 100644 --- a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp @@ -13,10 +13,6 @@ #include "storage/direct_load/ob_direct_load_insert_table_ctx.h" #include "storage/ddl/ob_direct_insert_sstable_ctx.h" -#include "share/stat/ob_opt_table_stat.h" -#include "share/stat/ob_opt_column_stat.h" -#include "share/stat/ob_stat_item.h" -#include "observer/table_load/ob_table_load_schema.h" namespace oceanbase { @@ -24,26 +20,13 @@ namespace storage { using namespace common; using namespace table; -using namespace sql; -using namespace observer; /** * ObDirectLoadInsertTableParam */ ObDirectLoadInsertTableParam::ObDirectLoadInsertTableParam() - : table_id_(OB_INVALID_ID), - dest_table_id_(OB_INVALID_ID), - schema_version_(0), - snapshot_version_(0), - execution_id_(0), - ddl_task_id_(0), - data_version_(0), - session_cnt_(0), - rowkey_column_count_(0), - column_count_(0), - online_opt_stat_gather_(false), - is_heap_table_(false) + : table_id_(OB_INVALID_ID), schema_version_(0), snapshot_version_(0), execution_id_(0), ddl_task_id_(0) { } @@ -53,25 +36,16 @@ ObDirectLoadInsertTableParam::~ObDirectLoadInsertTableParam() bool ObDirectLoadInsertTableParam::is_valid() const { - return OB_INVALID_ID != table_id_ && OB_INVALID_ID != dest_table_id_ && schema_version_ >= 0 && - snapshot_version_ >= 0 && ls_partition_ids_.count() > 0; + return OB_INVALID_ID != table_id_ && schema_version_ >= 0 && snapshot_version_ >= 0 && + ls_partition_ids_.count() > 0; } int ObDirectLoadInsertTableParam::assign(const ObDirectLoadInsertTableParam &other) { int ret = OB_SUCCESS; table_id_ = other.table_id_; - dest_table_id_ = other.dest_table_id_; schema_version_ = other.schema_version_; snapshot_version_ = other.snapshot_version_; - data_version_ = other.data_version_; - session_cnt_ = other.session_cnt_; - rowkey_column_count_ = other.rowkey_column_count_; - column_count_ = other.column_count_; - online_opt_stat_gather_ = other.online_opt_stat_gather_; - is_heap_table_ = other.is_heap_table_; - col_descs_ = other.col_descs_; - cmp_funcs_ = other.cmp_funcs_; if (OB_FAIL(ls_partition_ids_.assign(other.ls_partition_ids_))) { LOG_WARN("fail to assign ls tablet ids", KR(ret)); } @@ -83,7 +57,7 @@ int ObDirectLoadInsertTableParam::assign(const ObDirectLoadInsertTableParam &oth */ ObDirectLoadInsertTableContext::ObDirectLoadInsertTableContext() - : allocator_("TLD_SqlStat"), tablet_finish_count_(0), table_row_count_(0), is_inited_(false) + : tablet_finish_count_(0), is_inited_(false) { } @@ -102,12 +76,6 @@ void ObDirectLoadInsertTableContext::reset() } ddl_ctrl_.context_id_ = 0; } - for (int64_t i = 0; i < session_sql_ctx_array_.count(); ++i) { - ObTableLoadSqlStatistics *sql_statistics = session_sql_ctx_array_.at(i); - sql_statistics->~ObTableLoadSqlStatistics(); - allocator_.free(sql_statistics); - } - session_sql_ctx_array_.reset(); is_inited_ = false; } @@ -144,8 +112,6 @@ int ObDirectLoadInsertTableContext::init(const ObDirectLoadInsertTableParam &par } else if (OB_FAIL(sstable_insert_mgr.create_table_context(table_insert_param, ddl_ctrl_.context_id_))) { LOG_WARN("fail to create table context", KR(ret), K(table_insert_param)); - } else if (param_.online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) { - LOG_WARN("fail to init sql statistics", KR(ret)); } else { is_inited_ = true; } @@ -156,163 +122,6 @@ int ObDirectLoadInsertTableContext::init(const ObDirectLoadInsertTableParam &par return ret; } -int ObDirectLoadInsertTableContext::init_sql_statistics() -{ - int ret = OB_SUCCESS; - ObOptTableStat *table_stat = nullptr; - for (int64_t i = 0; OB_SUCC(ret) && i < param_.session_cnt_; ++i) { - ObTableLoadSqlStatistics *sql_statistics = nullptr; - ObOptTableStat *table_stat = nullptr; - if (OB_ISNULL(sql_statistics = OB_NEWx(ObTableLoadSqlStatistics, (&allocator_)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to new ObTableLoadSqlStatistics", KR(ret)); - } else if (OB_FAIL(sql_statistics->allocate_table_stat(table_stat))) { - LOG_WARN("fail to allocate table stat", KR(ret)); - } else if (OB_FAIL(session_sql_ctx_array_.push_back(sql_statistics))) { - LOG_WARN("fail to push back", KR(ret)); - } else { - for (int64_t j = 0; OB_SUCC(ret) && j < param_.column_count_; ++j) { - ObOptOSGColumnStat *osg_col_stat = nullptr; - if (OB_FAIL(sql_statistics->allocate_col_stat(osg_col_stat))) { - LOG_WARN("fail to allocate col stat", KR(ret)); - } - } - } - if (OB_FAIL(ret)) { - if (nullptr != sql_statistics) { - sql_statistics->~ObTableLoadSqlStatistics(); - allocator_.free(sql_statistics); - sql_statistics = nullptr; - } - } - } - return ret; -} - -int ObDirectLoadInsertTableContext::collect_sql_statistics(ObTableLoadSqlStatistics &sql_statistics) -{ - int ret = OB_SUCCESS; - const uint64_t tenant_id = MTL_ID(); - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObDirectLoadTabletMergeCtx not init", KR(ret), KP(this)); - } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, param_.dest_table_id_, - schema_guard, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(param_)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected error", K(ret)); - } else { - sql_statistics.reset(); - int64_t table_row_cnt = table_row_count_; - int64_t table_avg_len = 0; - int64_t col_cnt = param_.column_count_; - uint64_t partition_id = -1; - ObOptTableStat *table_stat = nullptr; - StatLevel stat_level = TABLE_LEVEL; - if (table_schema->get_part_level() == PARTITION_LEVEL_ZERO) { - partition_id = param_.dest_table_id_; - } - if (OB_FAIL(sql_statistics.allocate_table_stat(table_stat))) { - LOG_WARN("fail to allocate table stat", KR(ret)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) { - int64_t col_id = param_.is_heap_table_ ? i + 1 : i; - ObOptOSGColumnStat *osg_col_stat = nullptr; - if (OB_FAIL(sql_statistics.allocate_col_stat(osg_col_stat))) { - LOG_WARN("fail to allocate table stat", KR(ret)); - } - // scan session_sql_ctx_array - for (int64_t j = 0; OB_SUCC(ret) && j < session_sql_ctx_array_.count(); ++j) { - ObOptOSGColumnStat *tmp_col_stat = - session_sql_ctx_array_.at(j)->get_col_stat_array().at(i); - if (OB_FAIL(osg_col_stat->merge_column_stat(*tmp_col_stat))) { - LOG_WARN("fail to merge column stat", KR(ret)); - } - } - - if (OB_SUCC(ret)) { - osg_col_stat->col_stat_->calc_avg_len(); - table_avg_len += osg_col_stat->col_stat_->get_avg_len(); - osg_col_stat->col_stat_->set_table_id(param_.dest_table_id_); - osg_col_stat->col_stat_->set_partition_id(partition_id); - osg_col_stat->col_stat_->set_stat_level(stat_level); - osg_col_stat->col_stat_->set_column_id(param_.col_descs_->at(col_id).col_id_); - osg_col_stat->col_stat_->set_num_distinct( - ObGlobalNdvEval::get_ndv_from_llc(osg_col_stat->col_stat_->get_llc_bitmap())); - if (OB_FAIL(osg_col_stat->set_min_max_datum_to_obj())) { - LOG_WARN("failed to set min max datum to obj", K(ret)); - } - } - } - if (OB_SUCC(ret)) { - table_stat->set_table_id(param_.dest_table_id_); - table_stat->set_partition_id(partition_id); - table_stat->set_object_type(stat_level); - table_stat->set_row_count(table_row_cnt); - table_stat->set_avg_row_size(table_avg_len); - } - } - } - return ret; -} - -int ObDirectLoadInsertTableContext::collect_obj(int64_t thread_idx, const ObDatumRow &datum_row) -{ - int ret = OB_SUCCESS; - const int64_t extra_rowkey_cnt = ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); - if (param_.online_opt_stat_gather_) { - if (OB_UNLIKELY(thread_idx >= session_sql_ctx_array_.count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected thread idx", KR(ret), K(thread_idx), K(session_sql_ctx_array_.count())); - } else { - ObTableLoadSqlStatistics *sql_stat = session_sql_ctx_array_.at(thread_idx); - if (param_.is_heap_table_ ) { - for (int64_t i = 0; OB_SUCC(ret) && i < param_.column_count_; i++) { - const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt + 1]; - const ObColDesc &col_desc = param_.col_descs_->at(i + 1); - const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i + 1).get_cmp_func(); - ObOptOSGColumnStat *col_stat = sql_stat->get_col_stat_array().at(i); - bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); - if (col_stat != nullptr && is_valid) { - if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { - LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); - } - } - } - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < param_.rowkey_column_count_; i++) { - const ObStorageDatum &datum = datum_row.storage_datums_[i]; - const ObColDesc &col_desc = param_.col_descs_->at(i); - const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i).get_cmp_func(); - ObOptOSGColumnStat *col_stat = sql_stat->get_col_stat_array().at(i); - bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); - if (col_stat != nullptr && is_valid) { - if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { - LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); - } - } - } - for (int64_t i = param_.rowkey_column_count_; OB_SUCC(ret) && i < param_.column_count_; i++) { - const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt]; - const ObColDesc &col_desc = param_.col_descs_->at(i); - const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i).get_cmp_func(); - ObOptOSGColumnStat *col_stat = session_sql_ctx_array_.at(thread_idx)->get_col_stat_array().at(i); - bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); - if (col_stat != nullptr && is_valid) { - if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { - LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); - } - } - } - } - } - } - return ret; -} - int ObDirectLoadInsertTableContext::add_sstable_slice(const ObTabletID &tablet_id, const ObMacroDataSeq &start_seq, ObNewRowIterator &iter, @@ -391,7 +200,7 @@ int ObDirectLoadInsertTableContext::notify_tablet_finish(const ObTabletID &table return ret; } -int ObDirectLoadInsertTableContext::commit(ObTableLoadSqlStatistics &sql_statistics) +int ObDirectLoadInsertTableContext::commit() { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -405,9 +214,8 @@ int ObDirectLoadInsertTableContext::commit(ObTableLoadSqlStatistics &sql_statist ObSSTableInsertManager &sstable_insert_mgr = ObSSTableInsertManager::get_instance(); if (OB_FAIL(sstable_insert_mgr.finish_table_context(ddl_ctrl_.context_id_, true))) { LOG_WARN("fail to finish table context", KR(ret), K_(ddl_ctrl)); - } else if (FALSE_IT(ddl_ctrl_.context_id_ = 0)) { - } else if (param_.online_opt_stat_gather_ && OB_FAIL(collect_sql_statistics(sql_statistics))) { - LOG_WARN("fail to collect sql stats", KR(ret)); + } else { + ddl_ctrl_.context_id_ = 0; } } return ret; diff --git a/src/storage/direct_load/ob_direct_load_insert_table_ctx.h b/src/storage/direct_load/ob_direct_load_insert_table_ctx.h index 8d65355a6..0ebef10e8 100644 --- a/src/storage/direct_load/ob_direct_load_insert_table_ctx.h +++ b/src/storage/direct_load/ob_direct_load_insert_table_ctx.h @@ -12,7 +12,6 @@ #pragma once #include "share/table/ob_table_load_define.h" -#include "share/table/ob_table_load_sql_statistics.h" #include "sql/engine/px/ob_sub_trans_ctrl.h" namespace oceanbase @@ -28,25 +27,14 @@ public: ~ObDirectLoadInsertTableParam(); int assign(const ObDirectLoadInsertTableParam &other); bool is_valid() const; - TO_STRING_KV(K_(table_id), K_(dest_table_id), K_(schema_version), K_(snapshot_version), - K_(execution_id), K_(ddl_task_id), K_(data_version), K_(session_cnt), - K_(rowkey_column_count), K_(column_count), K_(online_opt_stat_gather), - K_(is_heap_table), K_(ls_partition_ids)); + TO_STRING_KV(K_(table_id), K_(schema_version), K_(snapshot_version), K_(ls_partition_ids), K_(execution_id), K_(ddl_task_id)); public: uint64_t table_id_; - uint64_t dest_table_id_; int64_t schema_version_; int64_t snapshot_version_; int64_t execution_id_; int64_t ddl_task_id_; int64_t data_version_; - int64_t session_cnt_; - int64_t rowkey_column_count_; - int64_t column_count_; - bool online_opt_stat_gather_; - bool is_heap_table_; - const common::ObIArray *col_descs_; - const blocksstable::ObStoreCmpFuncs *cmp_funcs_; common::ObArray ls_partition_ids_; }; @@ -57,7 +45,6 @@ public: ~ObDirectLoadInsertTableContext(); void reset(); int init(const ObDirectLoadInsertTableParam ¶m); - int collect_obj(int64_t thread_idx, const blocksstable::ObDatumRow &datum_row); int add_sstable_slice(const common::ObTabletID &tablet_id, const blocksstable::ObMacroDataSeq &start_seq, common::ObNewRowIterator &iter, @@ -67,20 +54,12 @@ public: ObSSTableInsertSliceWriter *&slice_writer, common::ObIAllocator &allocator); int notify_tablet_finish(const common::ObTabletID &tablet_id); - int commit(table::ObTableLoadSqlStatistics &sql_statistics); - void inc_row_count(int64_t row_count) { ATOMIC_AAF(&table_row_count_, row_count); } - int64_t get_row_count() const { return table_row_count_; } - TO_STRING_KV(K_(param), K_(tablet_finish_count), K_(table_row_count), K_(ddl_ctrl)); + int commit(); + TO_STRING_KV(K_(param), K_(ddl_ctrl)); private: - int init_sql_statistics(); - int collect_sql_statistics(table::ObTableLoadSqlStatistics &sql_statistics); -private: - common::ObArenaAllocator allocator_; ObDirectLoadInsertTableParam param_; sql::ObDDLCtrl ddl_ctrl_; int64_t tablet_finish_count_ CACHE_ALIGNED; - int64_t table_row_count_ CACHE_ALIGNED; - common::ObArray session_sql_ctx_array_; bool is_inited_; }; diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp index 5746c83bd..6f16724f2 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp @@ -50,8 +50,10 @@ ObDirectLoadMergeParam::ObDirectLoadMergeParam() snapshot_version_(0), datum_utils_(nullptr), col_descs_(nullptr), + cmp_funcs_(nullptr), is_heap_table_(false), is_fast_heap_table_(false), + online_opt_stat_gather_(false), insert_table_ctx_(nullptr), dml_row_handler_(nullptr) { @@ -65,7 +67,8 @@ bool ObDirectLoadMergeParam::is_valid() const { return OB_INVALID_ID != table_id_ && 0 < rowkey_column_num_ && 0 < store_column_count_ && snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != datum_utils_ && - nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != dml_row_handler_; + nullptr != col_descs_ && nullptr != cmp_funcs_ && nullptr != insert_table_ctx_ && + nullptr != dml_row_handler_; } /** @@ -188,6 +191,7 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam ¶m, } else { allocator_.set_tenant_id(MTL_ID()); param_ = param; + target_partition_id_ = target_ls_partition_id.part_tablet_id_.partition_id_; tablet_id_ = ls_partition_id.part_tablet_id_.tablet_id_; target_tablet_id_ = target_ls_partition_id.part_tablet_id_.tablet_id_; is_inited_ = true; @@ -196,6 +200,95 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam ¶m, return ret; } +int ObDirectLoadTabletMergeCtx::collect_sql_statistics( + const ObIArray &fast_heap_table_array, ObTableLoadSqlStatistics &sql_statistics) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = nullptr; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObDirectLoadTabletMergeCtx not init", KR(ret), KP(this)); + } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, param_.target_table_id_, schema_guard, + table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(param_)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected error", K(ret)); + } else { + int64_t table_row_cnt = 0; + int64_t table_avg_len = 0; + int64_t col_cnt = param_.table_data_desc_.column_count_; + ObOptTableStat *table_stat = nullptr; + StatLevel stat_level; + if (table_schema->get_part_level() == PARTITION_LEVEL_ZERO) { + stat_level = TABLE_LEVEL; + } else if (table_schema->get_part_level() == PARTITION_LEVEL_ONE) { + stat_level = PARTITION_LEVEL; + } else if (table_schema->get_part_level() == PARTITION_LEVEL_TWO) { + stat_level = SUBPARTITION_LEVEL; + } else { + stat_level = INVALID_LEVEL; + } + if (OB_FAIL(sql_statistics.allocate_table_stat(table_stat))) { + LOG_WARN("fail to allocate table stat", KR(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) { + int64_t col_id = param_.is_heap_table_ ? i + 1 : i; + int64_t row_count = 0; + ObOptOSGColumnStat *osg_col_stat = nullptr; + if (OB_FAIL(sql_statistics.allocate_col_stat(osg_col_stat))) { + LOG_WARN("fail to allocate table stat", KR(ret)); + } + // scan task_array + for (int64_t j = 0; OB_SUCC(ret) && j < task_array_.count(); ++j) { + ObOptOSGColumnStat *tmp_col_stat = task_array_.at(j)->get_column_stat_array().at(i); + if (task_array_.at(j)->get_row_count() != 0) { + if (OB_FAIL(osg_col_stat->merge_column_stat(*tmp_col_stat))) { + LOG_WARN("fail to merge column stat", KR(ret)); + } else { + row_count += task_array_.at(j)->get_row_count(); + } + } + } + // scan fast heap table + for (int64_t j = 0; OB_SUCC(ret) && j < fast_heap_table_array.count(); ++j) { + ObOptOSGColumnStat *tmp_col_stat = fast_heap_table_array.at(j)->get_column_stat_array().at(i); + if (fast_heap_table_array.at(j)->get_row_count() != 0) { + if (OB_FAIL(osg_col_stat->merge_column_stat(*tmp_col_stat))) { + LOG_WARN("fail to merge column stat", KR(ret)); + } else { + row_count += fast_heap_table_array.at(j)->get_row_count(); + } + } + } + if (OB_SUCC(ret)) { + table_row_cnt = row_count; + osg_col_stat->col_stat_->calc_avg_len(); + table_avg_len += osg_col_stat->col_stat_->get_avg_len(); + osg_col_stat->col_stat_->set_table_id(param_.target_table_id_); + osg_col_stat->col_stat_->set_partition_id(target_partition_id_); + osg_col_stat->col_stat_->set_stat_level(stat_level); + osg_col_stat->col_stat_->set_column_id(param_.col_descs_->at(col_id).col_id_); + osg_col_stat->col_stat_->set_num_distinct(ObGlobalNdvEval::get_ndv_from_llc(osg_col_stat->col_stat_->get_llc_bitmap())); + if (OB_FAIL(osg_col_stat->set_min_max_datum_to_obj())) { + LOG_WARN("failed to set min max datum to obj", K(ret)); + } + } + } + if (OB_SUCC(ret)) { + table_stat->set_table_id(param_.target_table_id_); + table_stat->set_partition_id(target_partition_id_); + table_stat->set_object_type(stat_level); + table_stat->set_row_count(table_row_cnt); + table_stat->set_avg_row_size(table_avg_len); + } + } + } + return ret; +} + int ObDirectLoadTabletMergeCtx::collect_dml_stat(const common::ObIArray &fast_heap_table_array, ObTableLoadDmlStat &dml_stats) { int ret = OB_SUCCESS; diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.h b/src/storage/direct_load/ob_direct_load_merge_ctx.h index b47685e3c..750a3ecf8 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.h +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.h @@ -51,7 +51,8 @@ public: bool is_valid() const; TO_STRING_KV(K_(table_id), K_(target_table_id), K_(rowkey_column_num), K_(store_column_count), K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), KP_(col_descs), - K_(is_heap_table), K_(is_fast_heap_table), KP_(insert_table_ctx), KP_(dml_row_handler)); + KP_(cmp_funcs), K_(is_heap_table), K_(is_fast_heap_table), + K_(online_opt_stat_gather), KP_(insert_table_ctx), KP_(dml_row_handler)); public: uint64_t table_id_; uint64_t target_table_id_; @@ -61,8 +62,10 @@ public: storage::ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; const common::ObIArray *col_descs_; + const blocksstable::ObStoreCmpFuncs *cmp_funcs_; bool is_heap_table_; bool is_fast_heap_table_; + bool online_opt_stat_gather_; ObDirectLoadInsertTableContext *insert_table_ctx_; ObDirectLoadDMLRowHandler *dml_row_handler_; }; @@ -106,6 +109,8 @@ public: int build_aggregate_merge_task_for_multiple_heap_table( const common::ObIArray &table_array); int inc_finish_count(bool &is_ready); + int collect_sql_statistics( + const common::ObIArray &fast_heap_table_array, table::ObTableLoadSqlStatistics &sql_statistics); int collect_dml_stat(const common::ObIArray &fast_heap_table_array, table::ObTableLoadDmlStat &dml_stats); const ObDirectLoadMergeParam &get_param() const { return param_; } @@ -115,7 +120,7 @@ public: { return task_array_; } - TO_STRING_KV(K_(param), K_(tablet_id), K_(target_tablet_id)); + TO_STRING_KV(K_(param), K_(target_partition_id), K_(tablet_id), K_(target_tablet_id)); private: int init_sstable_array(const common::ObIArray &table_array); int init_multiple_sstable_array( @@ -143,6 +148,7 @@ private: private: common::ObArenaAllocator allocator_; ObDirectLoadMergeParam param_; + uint64_t target_partition_id_; common::ObTabletID tablet_id_; common::ObTabletID target_tablet_id_; ObDirectLoadOriginTable origin_table_; diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp index 0057629f1..5a32c8bc7 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp @@ -21,6 +21,7 @@ #include "storage/direct_load/ob_direct_load_multiple_heap_table.h" #include "storage/direct_load/ob_direct_load_origin_table.h" + namespace oceanbase { namespace storage @@ -28,7 +29,6 @@ namespace storage using namespace common; using namespace blocksstable; using namespace share; -using namespace table; /** * ObDirectLoadPartitionMergeTask @@ -48,9 +48,16 @@ ObDirectLoadPartitionMergeTask::ObDirectLoadPartitionMergeTask() ObDirectLoadPartitionMergeTask::~ObDirectLoadPartitionMergeTask() { + for (int64_t i = 0; i < column_stat_array_.count(); ++i) { + ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); + if (col_stat != nullptr) { + col_stat->~ObOptOSGColumnStat(); + col_stat = nullptr; + } + } } -int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx) +int ObDirectLoadPartitionMergeTask::process() { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -63,7 +70,9 @@ int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx) ObSSTableInsertSliceWriter *writer = nullptr; ObMacroDataSeq block_start_seq; block_start_seq.set_parallel_degree(parallel_idx_); - if (OB_FAIL(construct_row_iter(allocator_, row_iter))) { + if (merge_param_->online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) { + LOG_WARN("fail to init sql statistics", KR(ret)); + } else if (OB_FAIL(construct_row_iter(allocator_, row_iter))) { LOG_WARN("fail to construct row iter", KR(ret)); } else if (OB_FAIL(merge_param_->insert_table_ctx_->construct_sstable_slice_writer( target_tablet_id, block_start_seq, writer, allocator_))) { @@ -85,7 +94,7 @@ int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx) } } else if (OB_FAIL(writer->append_row(*const_cast(datum_row)))) { LOG_WARN("fail to append row", KR(ret), KPC(datum_row)); - } else if (OB_FAIL(merge_param_->insert_table_ctx_->collect_obj(thread_idx, *datum_row))) { + } else if (merge_param_->online_opt_stat_gather_ && OB_FAIL(collect_obj(*datum_row))) { LOG_WARN("fail to collect statistics", KR(ret)); } else { ++affected_rows_; @@ -94,8 +103,6 @@ int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx) if (OB_SUCC(ret)) { if (OB_FAIL(writer->close())) { LOG_WARN("fail to close writer", KR(ret)); - } else { - merge_param_->insert_table_ctx_->inc_row_count(affected_rows_); } } LOG_INFO("add sstable slice end", KR(ret), K(target_tablet_id), K(tablet_id), @@ -124,6 +131,74 @@ int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx) return ret; } +int ObDirectLoadPartitionMergeTask::init_sql_statistics() +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret)&& i < merge_param_->table_data_desc_.column_count_; ++i) { + ObOptOSGColumnStat *col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_); + if (OB_ISNULL(col_stat)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate buffer", KR(ret)); + } else if (OB_FAIL(column_stat_array_.push_back(col_stat))){ + LOG_WARN("fail to push back", KR(ret)); + } + if (OB_FAIL(ret)) { + if (col_stat != nullptr) { + col_stat->~ObOptOSGColumnStat(); + allocator_.free(col_stat); + col_stat = nullptr; + } + } + } + return ret; +} + +int ObDirectLoadPartitionMergeTask::collect_obj(const ObDatumRow &datum_row) +{ + int ret = OB_SUCCESS; + const int64_t extra_rowkey_cnt = ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); + if (merge_param_->is_heap_table_ ) { + for (int64_t i = 0; OB_SUCC(ret) && i < merge_param_->table_data_desc_.column_count_; i++) { + const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt + 1]; + const ObColDesc &col_desc = merge_param_->col_descs_->at(i + 1); + const ObCmpFunc &cmp_func = merge_param_->cmp_funcs_->at(i + 1).get_cmp_func(); + ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); + bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); + if (col_stat != nullptr && is_valid) { + if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { + LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); + } + } + } + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < merge_param_->rowkey_column_num_; i++) { + const ObStorageDatum &datum = datum_row.storage_datums_[i]; + const ObColDesc &col_desc = merge_param_->col_descs_->at(i); + const ObCmpFunc &cmp_func = merge_param_->cmp_funcs_->at(i).get_cmp_func(); + ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); + bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); + if (col_stat != nullptr && is_valid) { + if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { + LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); + } + } + } + for (int64_t i = merge_param_->rowkey_column_num_; OB_SUCC(ret) && i < merge_param_->table_data_desc_.column_count_; i++) { + const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt]; + const ObColDesc &col_desc = merge_param_->col_descs_->at(i); + const ObCmpFunc &cmp_func = merge_param_->cmp_funcs_->at(i).get_cmp_func(); + ObOptOSGColumnStat *col_stat = column_stat_array_.at(i); + bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type()); + if (col_stat != nullptr && is_valid) { + if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) { + LOG_WARN("Failed to merge obj", K(ret), KP(col_stat)); + } + } + } + } + return ret; +} + void ObDirectLoadPartitionMergeTask::stop() { is_stop_ = true; diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.h b/src/storage/direct_load/ob_direct_load_partition_merge_task.h index 72c19fd92..e33822847 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.h +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.h @@ -41,18 +41,26 @@ class ObDirectLoadPartitionMergeTask : public common::ObDLinkBase &get_column_stat_array() const + { + return column_stat_array_; + } int64_t get_row_count() const { return affected_rows_; } void stop(); TO_STRING_KV(KPC_(merge_param), KPC_(merge_ctx), K_(parallel_idx)); protected: virtual int construct_row_iter(common::ObIAllocator &allocator, ObIStoreRowIterator *&row_iter) = 0; +private: + int init_sql_statistics(); + int collect_obj(const blocksstable::ObDatumRow &datum_row); protected: const ObDirectLoadMergeParam *merge_param_; ObDirectLoadTabletMergeCtx *merge_ctx_; int64_t parallel_idx_; int64_t affected_rows_; + common::ObArray column_stat_array_; common::ObArenaAllocator allocator_; bool is_stop_; bool is_inited_; diff --git a/src/storage/direct_load/ob_direct_load_table_store.cpp b/src/storage/direct_load/ob_direct_load_table_store.cpp index a250dc6be..30f40eb4f 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.cpp +++ b/src/storage/direct_load/ob_direct_load_table_store.cpp @@ -33,9 +33,9 @@ using namespace table; ObDirectLoadTableStoreParam::ObDirectLoadTableStoreParam() : snapshot_version_(0), - thread_idx_(-1), datum_utils_(nullptr), col_descs_(nullptr), + cmp_funcs_(nullptr), file_mgr_(nullptr), is_multiple_mode_(false), is_fast_heap_table_(false), @@ -53,8 +53,8 @@ ObDirectLoadTableStoreParam::~ObDirectLoadTableStoreParam() bool ObDirectLoadTableStoreParam::is_valid() const { - return snapshot_version_ > 0 && thread_idx_ >= 0 && table_data_desc_.is_valid() && - nullptr != datum_utils_ && nullptr != col_descs_ && nullptr != file_mgr_ && + return snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != datum_utils_ && + nullptr != col_descs_ && nullptr != cmp_funcs_ && nullptr != file_mgr_ && (!is_fast_heap_table_ || (nullptr != insert_table_ctx_ && nullptr != fast_heap_table_ctx_)) && nullptr != dml_row_handler_; @@ -112,10 +112,11 @@ int ObDirectLoadTableStoreBucket::init(const ObDirectLoadTableStoreParam ¶m, fast_heap_table_build_param.table_data_desc_ = param.table_data_desc_; fast_heap_table_build_param.datum_utils_ = param.datum_utils_; fast_heap_table_build_param.col_descs_ = param.col_descs_; + fast_heap_table_build_param.cmp_funcs_ = param.cmp_funcs_; fast_heap_table_build_param.insert_table_ctx_ = param.insert_table_ctx_; fast_heap_table_build_param.fast_heap_table_ctx_ = param.fast_heap_table_ctx_; fast_heap_table_build_param.dml_row_handler_ = param.dml_row_handler_; - fast_heap_table_build_param.thread_idx_ = param.thread_idx_; + fast_heap_table_build_param.online_opt_stat_gather_ = param.online_opt_stat_gather_; ObDirectLoadFastHeapTableBuilder *fast_heap_table_builder = nullptr; if (OB_ISNULL(fast_heap_table_builder = table_builder_allocator_->alloc())) { diff --git a/src/storage/direct_load/ob_direct_load_table_store.h b/src/storage/direct_load/ob_direct_load_table_store.h index 0605ef2eb..7f4c236af 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.h +++ b/src/storage/direct_load/ob_direct_load_table_store.h @@ -34,19 +34,20 @@ public: ObDirectLoadTableStoreParam(); ~ObDirectLoadTableStoreParam(); bool is_valid() const; - TO_STRING_KV(K_(snapshot_version), K_(thread_idx), K_(table_data_desc), KP_(datum_utils), - KP_(col_descs), KP_(file_mgr), K_(is_multiple_mode), K_(is_fast_heap_table), + TO_STRING_KV(K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), KP_(col_descs), + KP_(cmp_funcs), KP_(file_mgr), K_(is_multiple_mode), K_(is_fast_heap_table), KP_(insert_table_ctx), KP_(fast_heap_table_ctx), KP_(dml_row_handler), KP_(extra_buf), K_(extra_buf_size)); public: int64_t snapshot_version_; - int64_t thread_idx_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; const common::ObIArray *col_descs_; + const blocksstable::ObStoreCmpFuncs *cmp_funcs_; ObDirectLoadTmpFileManager *file_mgr_; bool is_multiple_mode_; bool is_fast_heap_table_; + bool online_opt_stat_gather_; ObDirectLoadInsertTableContext *insert_table_ctx_; ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_; ObDirectLoadDMLRowHandler *dml_row_handler_;