revert sql statistics

Co-authored-by: yongshige <598633031@qq.com>
This commit is contained in:
coolfishchen 2023-11-08 08:12:40 +00:00 committed by ob-robot
parent 38085f4781
commit e78091d50f
27 changed files with 525 additions and 413 deletions

View File

@ -23,7 +23,6 @@
#include "observer/table_load/ob_table_load_schema.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "lib/mysqlclient/ob_mysql_result.h"
namespace oceanbase
{
@ -34,7 +33,6 @@ using namespace omt;
using namespace share::schema;
using namespace sql;
using namespace table;
using namespace common;
// begin
ObTableDirectLoadBeginExecutor::ObTableDirectLoadBeginExecutor(
@ -209,49 +207,11 @@ int ObTableDirectLoadBeginExecutor::process()
return ret;
}
int ObTableDirectLoadBeginExecutor::check_need_online_gather(const uint64_t tenant_id, bool &online_opt_stat_gather)
{
int ret = OB_SUCCESS;
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
sqlclient::ObMySQLResult *result = nullptr;
if (OB_FAIL(sql.assign_fmt("SELECT value FROM %s WHERE (zone, name, schema_version) in (select zone, name, max(schema_version) FROM %s"
" group by zone, name) and name = '%s'",
OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_ALL_SYS_VARIABLE_HISTORY_TNAME, OB_SV__OPTIMIZER_GATHER_STATS_ON_LOAD))) {
LOG_WARN("fail to append sql", KR(ret), K(tenant_id));
} else if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, sql.ptr()))) {
LOG_WARN("fail to execute sql", KR(ret), K(sql), K(tenant_id));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get sql result", KR(ret), K(sql), K(tenant_id));
} else {
while (OB_SUCC(ret)) {
if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next row", KR(ret), K(tenant_id));
}
} else {
ObString data;
EXTRACT_VARCHAR_FIELD_MYSQL(*result, "value", data);
if(0 == strcmp(data.ptr(), "1")) {
online_opt_stat_gather = true;
}
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
}
return ret;
}
int ObTableDirectLoadBeginExecutor::create_table_ctx()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = client_task_->tenant_id_;
const uint64_t table_id = client_task_->table_id_;
bool online_opt_stat_gather = false;
ObTableLoadDDLParam ddl_param;
ObTableLoadParam param;
// start redef table
@ -280,8 +240,6 @@ int ObTableDirectLoadBeginExecutor::create_table_ctx()
ObTenant *tenant = nullptr;
if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) {
LOG_WARN("fail to get tenant", KR(ret), K(tenant_id));
} else if (OB_FAIL(check_need_online_gather(tenant_id, online_opt_stat_gather))) {
LOG_WARN("fail to get tenant", KR(ret), K(tenant_id));
} else {
param.tenant_id_ = tenant_id;
param.table_id_ = table_id;
@ -292,7 +250,7 @@ int ObTableDirectLoadBeginExecutor::create_table_ctx()
param.column_count_ = client_task_->column_names_.count();
param.need_sort_ = true;
param.px_mode_ = false;
param.online_opt_stat_gather_ = online_opt_stat_gather;
param.online_opt_stat_gather_ = false;
param.dup_action_ = arg_.dup_action_;
if (OB_FAIL(param.normalize())) {
LOG_WARN("fail to normalize param", KR(ret));

View File

@ -71,7 +71,7 @@ protected:
private:
int create_table_ctx();
int do_begin();
int check_need_online_gather(const uint64_t tenant_id, bool &online_opt_stat_gather);
private:
ObTableLoadClientTask *client_task_;
ObTableLoadTableCtx *table_ctx_;

View File

@ -264,7 +264,7 @@ int ObDirectLoadControlCommitExecutor::process()
ObTableLoadStore store(table_ctx);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.commit(res_.result_info_, res_.sql_statistics_))) {
} else if (OB_FAIL(store.commit(res_.result_info_))) {
LOG_WARN("fail to store commit", KR(ret));
} else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
LOG_WARN("fail to remove table ctx", KR(ret), K(key));

View File

@ -28,7 +28,6 @@
#include "observer/table_load/ob_table_load_utils.h"
#include "share/ob_share_util.h"
#include "share/stat/ob_incremental_stat_estimator.h"
#include "share/stat/ob_dbms_stats_utils.h"
namespace oceanbase
{
@ -629,7 +628,7 @@ int ObTableLoadCoordinator::add_check_merge_result_task()
* commit
*/
int ObTableLoadCoordinator::commit_peers(table::ObTableLoadSqlStatistics &sql_statistics)
int ObTableLoadCoordinator::commit_peers()
{
int ret = OB_SUCCESS;
ObTableLoadArray<ObAddr> all_addr_array;
@ -647,7 +646,7 @@ int ObTableLoadCoordinator::commit_peers(table::ObTableLoadSqlStatistics &sql_st
ObTableLoadStore store(ctx_);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.commit(res.result_info_, res.sql_statistics_))) {
} else if (OB_FAIL(store.commit(res.result_info_))) {
LOG_WARN("fail to commit store", KR(ret));
}
} else { // 远端, 发送rpc
@ -658,9 +657,6 @@ int ObTableLoadCoordinator::commit_peers(table::ObTableLoadSqlStatistics &sql_st
ATOMIC_AAF(&coordinator_ctx_->result_info_.deleted_, res.result_info_.deleted_);
ATOMIC_AAF(&coordinator_ctx_->result_info_.skipped_, res.result_info_.skipped_);
ATOMIC_AAF(&coordinator_ctx_->result_info_.warnings_, res.result_info_.warnings_);
if (OB_FAIL(sql_statistics.merge(res.sql_statistics_))) {
LOG_WARN("fail to add result sql stats", KR(ret), K(addr), K(res));
}
}
}
}
@ -697,10 +693,12 @@ int ObTableLoadCoordinator::commit(ObTableLoadResultInfo &result_info)
ObTableLoadSqlStatistics sql_statistics;
if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) {
LOG_WARN("fail to check coordinator status", KR(ret));
} else if (OB_FAIL(commit_peers(sql_statistics))) {
} else if (OB_FAIL(commit_peers())) {
LOG_WARN("fail to commit peers", KR(ret));
} else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) {
} else if (param_.online_opt_stat_gather_ && OB_FAIL(write_sql_stat(sql_statistics))) {
} else if (param_.online_opt_stat_gather_ &&
OB_FAIL(
drive_sql_stat(coordinator_ctx_->exec_ctx_->get_exec_ctx()))) {
LOG_WARN("fail to drive sql stat", KR(ret));
} else if (OB_FAIL(commit_redef_table())) {
LOG_WARN("fail to commit redef table", KR(ret));
@ -727,10 +725,12 @@ int ObTableLoadCoordinator::px_commit_data()
ObTableLoadSqlStatistics sql_statistics;
if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::MERGED))) {
LOG_WARN("fail to check coordinator status", KR(ret));
} else if (OB_FAIL(commit_peers(sql_statistics))) {
} else if (OB_FAIL(commit_peers())) {
LOG_WARN("fail to commit peers", KR(ret));
} else if (FALSE_IT(coordinator_ctx_->set_enable_heart_beat(false))) {
} else if (param_.online_opt_stat_gather_ && OB_FAIL(write_sql_stat(sql_statistics))) {
} else if (param_.online_opt_stat_gather_ &&
OB_FAIL(
drive_sql_stat(coordinator_ctx_->exec_ctx_->get_exec_ctx()))) {
LOG_WARN("fail to drive sql stat", KR(ret));
}
}
@ -758,28 +758,32 @@ int ObTableLoadCoordinator::px_commit_ddl()
return ret;
}
int ObTableLoadCoordinator::write_sql_stat(ObTableLoadSqlStatistics &sql_statistics)
int ObTableLoadCoordinator::drive_sql_stat(ObExecContext *ctx)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
const uint64_t table_id = ctx_->ddl_param_.dest_table_id_;
ObSEArray<ObOptColumnStat *, 64> global_column_stats;
ObSEArray<ObOptTableStat *, 64> global_table_stats;
ObSchemaGetterGuard schema_guard;
ObSchemaGetterGuard *tmp_schema_guard = nullptr;
const ObTableSchema *table_schema = nullptr;
if (OB_UNLIKELY(sql_statistics.is_empty())) {
if (OB_UNLIKELY(nullptr == ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(sql_statistics));
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard,
LOG_WARN("invalid args", KR(ret), KPC(ctx));
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard,
table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(sql_statistics.get_col_stat_array(global_column_stats))) {
LOG_WARN("failed to get column stat array");
} else if (OB_FAIL(sql_statistics.get_table_stat_array(global_table_stats))) {
LOG_WARN("failed to get table stat array");
} else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write(&schema_guard, ctx_->session_info_, GCTX.sql_proxy_, global_table_stats, global_column_stats))) {
LOG_WARN("failed to batch write stats", K(ret), K(sql_statistics.table_stat_array_),
K(sql_statistics.col_stat_array_));
} else {
tmp_schema_guard = ctx->get_virtual_table_ctx().schema_guard_;
ctx->get_sql_ctx()->schema_guard_ = &schema_guard;
ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx());
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObIncrementalStatEstimator::derive_global_stat_by_direct_load(
*ctx, table_id))) {
LOG_WARN("fail to drive global stat by direct load", KR(ret));
}
ctx->get_sql_ctx()->schema_guard_ = tmp_schema_guard;
ctx->get_das_ctx().set_sql_ctx(ctx->get_sql_ctx());
}
return ret;
}

View File

@ -60,9 +60,9 @@ private:
int confirm_begin_peers();
int pre_merge_peers();
int start_merge_peers();
int commit_peers(table::ObTableLoadSqlStatistics &sql_statistics);
int commit_peers();
int commit_redef_table();
int write_sql_stat(table::ObTableLoadSqlStatistics &sql_statistics);
int drive_sql_stat(sql::ObExecContext *ctx);
int heart_beat_peer();
private:
int add_check_merge_result_task();

View File

@ -100,7 +100,8 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray<int64_t> &idx_array,
LOG_WARN("ObTableLoadCoordinatorCtx init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(
idx_array.count() != ctx_->param_.column_count_ || nullptr == exec_ctx ||
!exec_ctx->is_valid())) {
!exec_ctx->is_valid() ||
(ctx_->param_.online_opt_stat_gather_ && nullptr == exec_ctx->get_exec_ctx()))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(ctx_->param_), K(idx_array.count()), KPC(exec_ctx));
} else {

View File

@ -40,8 +40,8 @@ using namespace table;
class ObTableLoadMerger::MergeTaskProcessor : public ObITableLoadTaskProcessor
{
public:
MergeTaskProcessor(ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadMerger *merger, int64_t thread_idx)
: ObITableLoadTaskProcessor(task), ctx_(ctx), merger_(merger), thread_idx_(thread_idx)
MergeTaskProcessor(ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadMerger *merger)
: ObITableLoadTaskProcessor(task), ctx_(ctx), merger_(merger)
{
ctx_->inc_ref_count();
}
@ -63,7 +63,7 @@ public:
ret = OB_SUCCESS;
break;
}
} else if (OB_FAIL(merge_task->process(thread_idx_))) {
} else if (OB_FAIL(merge_task->process())) {
LOG_WARN("fail to process merge task", KR(ret));
}
if (nullptr != merge_task) {
@ -75,7 +75,6 @@ public:
private:
ObTableLoadTableCtx *const ctx_;
ObTableLoadMerger *const merger_;
int64_t thread_idx_;
};
class ObTableLoadMerger::MergeTaskCallback : public ObITableLoadTaskCallback
@ -204,8 +203,10 @@ int ObTableLoadMerger::build_merge_ctx()
merge_param.table_data_desc_ = store_ctx_->table_data_desc_;
merge_param.datum_utils_ = &(store_ctx_->ctx_->schema_.datum_utils_);
merge_param.col_descs_ = &(store_ctx_->ctx_->schema_.column_descs_);
merge_param.cmp_funcs_ = &(store_ctx_->ctx_->schema_.cmp_funcs_);
merge_param.is_heap_table_ = store_ctx_->ctx_->schema_.is_heap_table_;
merge_param.is_fast_heap_table_ = store_ctx_->is_fast_heap_table_;
merge_param.online_opt_stat_gather_ = param_.online_opt_stat_gather_;
merge_param.insert_table_ctx_ = store_ctx_->insert_table_ctx_;
merge_param.dml_row_handler_ = store_ctx_->error_row_handler_;
if (OB_FAIL(merge_ctx_.init(merge_param, store_ctx_->ls_partition_ids_,
@ -364,6 +365,58 @@ int ObTableLoadMerger::collect_dml_stat(ObTableLoadDmlStat &dml_stats)
return ret;
}
int ObTableLoadMerger::collect_sql_statistics(ObTableLoadSqlStatistics &sql_statistics)
{
int ret = OB_SUCCESS;
if (store_ctx_->is_fast_heap_table_) {
ObDirectLoadMultiMap<ObTabletID, ObDirectLoadFastHeapTable *> tables;
ObArray<ObTableLoadTransStore *> trans_store_array;
if (OB_FAIL(tables.init())) {
LOG_WARN("fail to init table", KR(ret));
} else if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) {
LOG_WARN("fail to get trans store", KR(ret));
} else {
for (int i = 0; OB_SUCC(ret) && i < trans_store_array.count(); ++i) {
ObTableLoadTransStore *trans_store = trans_store_array.at(i);
for (int j = 0; OB_SUCC(ret) && j < trans_store->session_store_array_.count(); ++j) {
ObTableLoadTransStore::SessionStore * session_store = trans_store->session_store_array_.at(j);
for (int k = 0 ; OB_SUCC(ret) && k < session_store->partition_table_array_.count(); ++k) {
ObIDirectLoadPartitionTable *table = session_store->partition_table_array_.at(k);
ObDirectLoadFastHeapTable *sstable = nullptr;
if (OB_ISNULL(sstable = dynamic_cast<ObDirectLoadFastHeapTable *>(table))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected not heap sstable", KR(ret), KPC(table));
} else {
const ObTabletID &tablet_id = sstable->get_tablet_id();
if (OB_FAIL(tables.add(tablet_id, sstable))) {
LOG_WARN("fail to add tables", KR(ret), KPC(sstable));
}
}
}
}
}
for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) {
ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i);
ObArray<ObDirectLoadFastHeapTable *> heap_table_array ;
if (OB_FAIL(tables.get(tablet_ctx->get_tablet_id(), heap_table_array))) {
LOG_WARN("get heap sstable failed", KR(ret));
} else if (OB_FAIL(tablet_ctx->collect_sql_statistics(heap_table_array, sql_statistics))) {
LOG_WARN("fail to collect sql statics", KR(ret));
}
}
}
} else {
for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) {
ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i);
ObArray<ObDirectLoadFastHeapTable *> heap_table_array ;
if (OB_FAIL(tablet_ctx->collect_sql_statistics(heap_table_array, sql_statistics))) {
LOG_WARN("fail to collect sql statics", KR(ret));
}
}
}
return ret;
}
int ObTableLoadMerger::start_merge()
{
int ret = OB_SUCCESS;
@ -376,7 +429,7 @@ int ObTableLoadMerger::start_merge()
LOG_WARN("fail to alloc task", KR(ret));
}
// 2. 设置processor
else if (OB_FAIL(task->set_processor<MergeTaskProcessor>(ctx, this, thread_idx))) {
else if (OB_FAIL(task->set_processor<MergeTaskProcessor>(ctx, this))) {
LOG_WARN("fail to set merge task processor", KR(ret));
}
// 3. 设置callback

View File

@ -38,6 +38,7 @@ public:
int start();
void stop();
int handle_table_compact_success();
int collect_sql_statistics(table::ObTableLoadSqlStatistics &sql_statistics);
int collect_dml_stat(table::ObTableLoadDmlStat &dml_stats);
private:
int build_merge_ctx();

View File

@ -25,6 +25,7 @@
#include "observer/table_load/ob_table_load_utils.h"
#include "storage/direct_load/ob_direct_load_insert_table_ctx.h"
#include "share/stat/ob_opt_stat_monitor_manager.h"
#include "share/stat/ob_dbms_stats_utils.h"
#include "storage/blocksstable/ob_sstable.h"
namespace oceanbase
@ -297,7 +298,7 @@ int ObTableLoadStore::start_merge()
return ret;
}
int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlStatistics &sql_statistics)
int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -307,12 +308,19 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlS
LOG_INFO("store commit");
ObMutexGuard guard(store_ctx_->get_op_lock());
ObTableLoadDmlStat dml_stats;
ObTableLoadSqlStatistics sql_statistics;
if (OB_FAIL(store_ctx_->check_status(ObTableLoadStatusType::MERGED))) {
LOG_WARN("fail to check store status", KR(ret));
} else if (OB_FAIL(store_ctx_->insert_table_ctx_->commit(sql_statistics))) {
} else if (OB_FAIL(store_ctx_->insert_table_ctx_->commit())) {
LOG_WARN("fail to commit insert table", KR(ret));
} else if (ctx_->schema_.has_autoinc_column_ && OB_FAIL(store_ctx_->commit_autoinc_value())) {
LOG_WARN("fail to commit sync auto increment value", KR(ret));
} else if (param_.online_opt_stat_gather_ &&
OB_FAIL(store_ctx_->merger_->collect_sql_statistics(sql_statistics))) {
LOG_WARN("fail to collect sql stats", KR(ret));
} else if (param_.online_opt_stat_gather_ &&
OB_FAIL(commit_sql_statistics(sql_statistics))) {
LOG_WARN("fail to commit sql stats", KR(ret));
} else if (OB_FAIL(store_ctx_->merger_->collect_dml_stat(dml_stats))) {
LOG_WARN("fail to build dml stat", KR(ret));
} else if (OB_FAIL(ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(dml_stats.dml_stat_array_))) {
@ -327,6 +335,34 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlS
return ret;
}
int ObTableLoadStore::commit_sql_statistics(const ObTableLoadSqlStatistics &sql_statistics)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
const uint64_t table_id = param_.table_id_;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
ObSEArray<ObOptColumnStat *, 64> part_column_stats;
ObSEArray<ObOptTableStat *, 64> part_table_stats;
if (sql_statistics.is_empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql statistics is empty", K(ret));
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard,
table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(sql_statistics.get_col_stat_array(part_column_stats))) {
LOG_WARN("failed to get column stat array");
} else if (OB_FAIL(sql_statistics.get_table_stat_array(part_table_stats))) {
LOG_WARN("failed to get table stat array");
} else if (OB_FAIL(ObDbmsStatsUtils::split_batch_write(
&schema_guard, store_ctx_->ctx_->session_info_, GCTX.sql_proxy_, part_table_stats,
part_column_stats))) {
LOG_WARN("failed to batch write stats", K(ret), K(sql_statistics.table_stat_array_),
K(sql_statistics.col_stat_array_));
}
return ret;
}
int ObTableLoadStore::get_status(ObTableLoadStatusType &status, int &error_code)
{
int ret = OB_SUCCESS;

View File

@ -46,9 +46,11 @@ public:
int confirm_begin();
int pre_merge(const table::ObTableLoadArray<table::ObTableLoadTransId> &committed_trans_id_array);
int start_merge();
int commit(table::ObTableLoadResultInfo &result_info, table::ObTableLoadSqlStatistics &sql_statistics);
int commit(table::ObTableLoadResultInfo &result_info);
int get_status(table::ObTableLoadStatusType &status, int &error_code);
int heart_beat();
private:
int commit_sql_statistics(const table::ObTableLoadSqlStatistics &sql_statistics);
private:
class MergeTaskProcessor;
class MergeTaskCallback;

View File

@ -83,19 +83,11 @@ int ObTableLoadStoreCtx::init(
} else {
ObDirectLoadInsertTableParam insert_table_param;
insert_table_param.table_id_ = ctx_->param_.table_id_;
insert_table_param.dest_table_id_= ctx_->ddl_param_.dest_table_id_;
insert_table_param.schema_version_ = ctx_->ddl_param_.schema_version_;
insert_table_param.snapshot_version_ = ctx_->ddl_param_.snapshot_version_;
insert_table_param.ddl_task_id_ = ctx_->ddl_param_.task_id_;
insert_table_param.execution_id_ = 1; //仓氐说暂时设置为1,不然后面检测过不了
insert_table_param.data_version_ = ctx_->ddl_param_.data_version_;
insert_table_param.session_cnt_ = ctx_->param_.session_count_;
insert_table_param.rowkey_column_count_ = (!ctx_->schema_.is_heap_table_ ? ctx_->schema_.rowkey_column_count_ : 0);
insert_table_param.column_count_ = ctx_->param_.column_count_;
insert_table_param.online_opt_stat_gather_ = ctx_->param_.online_opt_stat_gather_;
insert_table_param.is_heap_table_ = ctx_->schema_.is_heap_table_;
insert_table_param.col_descs_ = &(ctx_->schema_.column_descs_);
insert_table_param.cmp_funcs_ = &(ctx_->schema_.cmp_funcs_);
for (int64_t i = 0; OB_SUCC(ret) && i < partition_id_array.count(); ++i) {
const ObLSID &ls_id = partition_id_array[i].ls_id_;
const ObTableLoadPartitionId &part_tablet_id = partition_id_array[i].part_tablet_id_;

View File

@ -210,15 +210,16 @@ int ObTableLoadTransStoreWriter::init_session_ctx_array()
param.table_data_desc_ = *table_data_desc_;
param.datum_utils_ = &(trans_ctx_->ctx_->schema_.datum_utils_);
param.col_descs_ = &(trans_ctx_->ctx_->schema_.column_descs_);
param.cmp_funcs_ = &(trans_ctx_->ctx_->schema_.cmp_funcs_);
param.file_mgr_ = trans_ctx_->ctx_->store_ctx_->tmp_file_mgr_;
param.is_multiple_mode_ = trans_ctx_->ctx_->store_ctx_->is_multiple_mode_;
param.is_fast_heap_table_ = trans_ctx_->ctx_->store_ctx_->is_fast_heap_table_;
param.online_opt_stat_gather_ = trans_ctx_->ctx_->param_.online_opt_stat_gather_;
param.insert_table_ctx_ = trans_ctx_->ctx_->store_ctx_->insert_table_ctx_;
param.fast_heap_table_ctx_ = trans_ctx_->ctx_->store_ctx_->fast_heap_table_ctx_;
param.dml_row_handler_ = trans_ctx_->ctx_->store_ctx_->error_row_handler_;
for (int64_t i = 0; OB_SUCC(ret) && i < session_count; ++i) {
SessionContext *session_ctx = session_ctx_array_ + i;
param.thread_idx_ = i;
if (param_.px_mode_) {
session_ctx->extra_buf_size_ = table_data_desc_->extra_buf_size_;
if (OB_ISNULL(session_ctx->extra_buf_ =

View File

@ -21,7 +21,6 @@ namespace table
void ObTableLoadSqlStatistics::reset()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; i < col_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = col_stat_array_.at(i);
if (col_stat != nullptr) {
@ -83,73 +82,53 @@ int ObTableLoadSqlStatistics::allocate_col_stat(ObOptOSGColumnStat *&col_stat)
return ret;
}
int ObTableLoadSqlStatistics::merge(const ObTableLoadSqlStatistics& other)
int ObTableLoadSqlStatistics::add(const ObTableLoadSqlStatistics& other)
{
int ret = OB_SUCCESS;
if (is_empty()) {
for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) {
ObOptTableStat *table_stat = other.table_stat_array_.at(i);
if (table_stat != nullptr) {
ObOptTableStat *copied_table_stat = nullptr;
int64_t size = table_stat->size();
char *new_buf = nullptr;
if (OB_ISNULL(new_buf = static_cast<char *>(allocator_.alloc(size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size));
} else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) {
OB_LOG(WARN, "fail to copy table stat", KR(ret));
} else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) {
OB_LOG(WARN, "fail to add table stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (copied_table_stat != nullptr) {
copied_table_stat->~ObOptTableStat();
copied_table_stat = nullptr;
}
if(new_buf != nullptr) {
allocator_.free(new_buf);
new_buf = nullptr;
}
}
}
}
for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i);
ObOptOSGColumnStat *copied_col_stat = nullptr;
if (OB_ISNULL(col_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "get unexpected null");
} else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) {
for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) {
ObOptTableStat *table_stat = other.table_stat_array_.at(i);
if (table_stat != nullptr) {
ObOptTableStat *copied_table_stat = nullptr;
int64_t size = table_stat->size();
char *new_buf = nullptr;
if (OB_ISNULL(new_buf = static_cast<char *>(allocator_.alloc(size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed to create new col stat");
} else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) {
OB_LOG(WARN, "fail to copy col stat", KR(ret));
} else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) {
OB_LOG(WARN, "fail to add col stat", KR(ret));
OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size));
} else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) {
OB_LOG(WARN, "fail to copy table stat", KR(ret));
} else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) {
OB_LOG(WARN, "fail to add table stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (copied_col_stat != nullptr) {
copied_col_stat->~ObOptOSGColumnStat();
copied_col_stat = nullptr;
if (copied_table_stat != nullptr) {
copied_table_stat->~ObOptTableStat();
copied_table_stat = nullptr;
}
if(new_buf != nullptr) {
allocator_.free(new_buf);
new_buf = nullptr;
}
}
}
} else {
for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) {
ObOptTableStat *table_stat = other.table_stat_array_.at(i);
if (OB_FAIL(table_stat_array_.at(i)->merge_table_stat(*table_stat))) {
OB_LOG(WARN, "failed to merge table stat");
}
}
for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i);
ObOptOSGColumnStat *copied_col_stat = nullptr;
if (OB_ISNULL(col_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "get unexpected null");
} else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed to create new col stat");
} else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) {
OB_LOG(WARN, "fail to copy col stat", KR(ret));
} else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) {
OB_LOG(WARN, "fail to add col stat", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i);
if (OB_ISNULL(col_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "get unexpected null");
}
// ObOptOSGColumnStat的序列化结果只序列化里面的ObOptColumnStat, 需要调用ObOptColumnStat的merge函数
else if (OB_FAIL(col_stat_array_.at(i)->col_stat_->merge_column_stat(*col_stat->col_stat_))) {
OB_LOG(WARN, "failed to merge col stat");
if (OB_FAIL(ret)) {
if (copied_col_stat != nullptr) {
copied_col_stat->~ObOptOSGColumnStat();
copied_col_stat = nullptr;
}
}
}

View File

@ -34,16 +34,14 @@ public:
}
int allocate_table_stat(ObOptTableStat *&table_stat);
int allocate_col_stat(ObOptOSGColumnStat *&col_stat);
int merge(const ObTableLoadSqlStatistics& other);
int add(const ObTableLoadSqlStatistics& other);
int get_table_stat_array(ObIArray<ObOptTableStat*> &table_stat_array) const;
int get_col_stat_array(ObIArray<ObOptColumnStat*> &col_stat_array) const;
int persistence_col_stats();
ObIArray<ObOptTableStat*> & get_table_stat_array() {return table_stat_array_; }
ObIArray<ObOptOSGColumnStat*> & get_col_stat_array() {return col_stat_array_; }
TO_STRING_KV(K_(col_stat_array), K_(table_stat_array));
public:
common::ObArray<ObOptTableStat *> table_stat_array_;
common::ObArray<ObOptOSGColumnStat *> col_stat_array_;
common::ObSEArray<ObOptTableStat *, 64, common::ModulePageAllocator, true> table_stat_array_;
common::ObSEArray<ObOptOSGColumnStat *, 64, common::ModulePageAllocator, true> col_stat_array_;
common::ObArenaAllocator allocator_;
};

View File

@ -1989,9 +1989,12 @@ int ObLoadDataDirectImpl::init_execute_param()
}
// need_sort_
if (OB_SUCC(ret)) {
int64_t append = 0;
int64_t enable_direct = 0;
int64_t hint_need_sort = 0;
if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) {
if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) {
LOG_WARN("fail to get value of APPEND", K(ret));
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) {
LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret));
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::NEED_SORT, hint_need_sort))) {
LOG_WARN("fail to get value of NEED_SORT", KR(ret), K(hint));
@ -2016,7 +2019,8 @@ int ObLoadDataDirectImpl::init_execute_param()
}
// online_opt_stat_gather_
if (OB_SUCC(ret)) {
int64_t no_gather_optimizer_statistics = 0 ;
int64_t append = 0;
int64_t gather_optimizer_statistics = 0 ;
ObSQLSessionInfo *session = nullptr;
ObObj obj;
if (OB_ISNULL(session = ctx_->get_my_session())) {
@ -2024,9 +2028,11 @@ int ObLoadDataDirectImpl::init_execute_param()
LOG_WARN("session is null", KR(ret));
} else if (OB_FAIL(session->get_sys_variable(SYS_VAR__OPTIMIZER_GATHER_STATS_ON_LOAD, obj))) {
LOG_WARN("fail to get sys variable", K(ret));
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::NO_GATHER_OPTIMIZER_STATISTICS, no_gather_optimizer_statistics))) {
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) {
LOG_WARN("fail to get value of APPEND", K(ret));
} else if ((no_gather_optimizer_statistics == 0) && obj.get_bool()) {
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::GATHER_OPTIMIZER_STATISTICS, gather_optimizer_statistics))) {
LOG_WARN("fail to get value of APPEND", K(ret));
} else if (((append != 0) || (gather_optimizer_statistics != 0)) && obj.get_bool()) {
execute_param_.online_opt_stat_gather_ = true;
} else {
execute_param_.online_opt_stat_gather_ = false;
@ -2034,11 +2040,19 @@ int ObLoadDataDirectImpl::init_execute_param()
}
// max_error_rows_
if (OB_SUCC(ret)) {
int64_t append = 0;
int64_t enable_direct = 0;
int64_t hint_error_rows = 0;
if (OB_FAIL(hint.get_value(ObLoadDataHint::ERROR_ROWS, hint_error_rows))) {
if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) {
LOG_WARN("fail to get value of APPEND", K(ret));
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) {
LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret));
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::ERROR_ROWS, hint_error_rows))) {
LOG_WARN("fail to get value of ERROR_ROWS", KR(ret), K(hint));
} else {
} else if (enable_direct != 0) {
execute_param_.max_error_rows_ = hint_error_rows;
} else {
execute_param_.max_error_rows_ = 0;
}
}
// data_access_param_

View File

@ -25,7 +25,7 @@ using namespace common;
*/
ObDirectLoadFastHeapTableCreateParam::ObDirectLoadFastHeapTableCreateParam()
: row_count_(0)
: row_count_(0) , column_stat_array_(nullptr)
{
}
@ -35,7 +35,7 @@ ObDirectLoadFastHeapTableCreateParam::~ObDirectLoadFastHeapTableCreateParam()
bool ObDirectLoadFastHeapTableCreateParam::is_valid() const
{
return tablet_id_.is_valid() && row_count_ >= 0;
return tablet_id_.is_valid() && row_count_ >= 0 && nullptr != column_stat_array_ ;
}
/**
@ -43,14 +43,45 @@ bool ObDirectLoadFastHeapTableCreateParam::is_valid() const
*/
ObDirectLoadFastHeapTable::ObDirectLoadFastHeapTable()
: is_inited_(false)
: allocator_("TLD_FastHTable"), is_inited_(false)
{
}
ObDirectLoadFastHeapTable::~ObDirectLoadFastHeapTable()
{
for (int64_t i = 0; i < column_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = column_stat_array_.at(i);
col_stat->~ObOptOSGColumnStat();
col_stat = nullptr;
}
}
int ObDirectLoadFastHeapTable::copy_col_stat(const ObDirectLoadFastHeapTableCreateParam &param)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret)&& i < param.column_stat_array_->count(); ++i) {
ObOptOSGColumnStat *col_stat = param.column_stat_array_->at(i);
ObOptOSGColumnStat *copied_col_stat = NULL;
if (OB_ISNULL(col_stat)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null");
} else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory");
} else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) {
LOG_WARN("fail to copy colstat", KR(ret));
} else if (OB_FAIL(column_stat_array_.push_back(copied_col_stat))) {
LOG_WARN("fail to add table", KR(ret));
}
if (OB_FAIL(ret) && OB_NOT_NULL(copied_col_stat)) {
copied_col_stat->~ObOptOSGColumnStat();
copied_col_stat = nullptr;
}
}
return ret;
}
int ObDirectLoadFastHeapTable::init(const ObDirectLoadFastHeapTableCreateParam &param)
{
int ret = OB_SUCCESS;
@ -63,7 +94,12 @@ int ObDirectLoadFastHeapTable::init(const ObDirectLoadFastHeapTableCreateParam &
} else {
meta_.tablet_id_ = param.tablet_id_;
meta_.row_count_ = param.row_count_;
is_inited_ = true;
allocator_.set_tenant_id(MTL_ID());
if (OB_FAIL(copy_col_stat(param))){
LOG_WARN("fail to inner init", KR(ret), K(param));
} else {
is_inited_ = true;
}
}
return ret;
}

View File

@ -31,6 +31,7 @@ public:
public:
common::ObTabletID tablet_id_;
int64_t row_count_;
common::ObArray<common::ObOptOSGColumnStat*> *column_stat_array_;
};
struct ObDirectLoadFastHeapTableMeta
@ -46,13 +47,21 @@ public:
ObDirectLoadFastHeapTable();
virtual ~ObDirectLoadFastHeapTable();
int init(const ObDirectLoadFastHeapTableCreateParam &param);
const common::ObIArray<ObOptOSGColumnStat*> &get_column_stat_array() const
{
return column_stat_array_;
}
const common::ObTabletID &get_tablet_id() const override { return meta_.tablet_id_; }
int64_t get_row_count() const override { return meta_.row_count_; }
bool is_valid() const override { return is_inited_; }
const ObDirectLoadFastHeapTableMeta &get_meta() const { return meta_; }
TO_STRING_KV(K_(meta));
private:
int copy_col_stat(const ObDirectLoadFastHeapTableCreateParam &param);
private:
ObDirectLoadFastHeapTableMeta meta_;
common::ObArenaAllocator allocator_;
common::ObArray<common::ObOptOSGColumnStat*> column_stat_array_;
bool is_inited_;
DISABLE_COPY_ASSIGN(ObDirectLoadFastHeapTable);
};

View File

@ -34,12 +34,13 @@ using namespace share;
ObDirectLoadFastHeapTableBuildParam::ObDirectLoadFastHeapTableBuildParam()
: snapshot_version_(0),
thread_idx_(0),
datum_utils_(nullptr),
col_descs_(nullptr),
cmp_funcs_(nullptr),
insert_table_ctx_(nullptr),
fast_heap_table_ctx_(nullptr),
dml_row_handler_(nullptr)
dml_row_handler_(nullptr),
online_opt_stat_gather_(false)
{
}
@ -49,8 +50,8 @@ ObDirectLoadFastHeapTableBuildParam::~ObDirectLoadFastHeapTableBuildParam()
bool ObDirectLoadFastHeapTableBuildParam::is_valid() const
{
return tablet_id_.is_valid() && snapshot_version_ > 0 && thread_idx_ >= 0 &&
table_data_desc_.is_valid() && nullptr != col_descs_ && nullptr != insert_table_ctx_ &&
return tablet_id_.is_valid() && snapshot_version_ > 0 && table_data_desc_.is_valid() &&
nullptr != col_descs_ && nullptr != cmp_funcs_ && nullptr != insert_table_ctx_ &&
nullptr != fast_heap_table_ctx_ && nullptr != dml_row_handler_ && nullptr != datum_utils_;
}
@ -76,6 +77,56 @@ ObDirectLoadFastHeapTableBuilder::~ObDirectLoadFastHeapTableBuilder()
slice_writer_allocator_.free(slice_writer_);
slice_writer_ = nullptr;
}
for (int64_t i = 0; i < column_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = column_stat_array_.at(i);
col_stat->~ObOptOSGColumnStat();
allocator_.free(col_stat);
col_stat = nullptr;
}
}
int ObDirectLoadFastHeapTableBuilder::init_sql_statistics()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < param_.table_data_desc_.column_count_; ++i) {
ObOptOSGColumnStat *new_osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_);
if (OB_ISNULL(new_osg_col_stat)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate col stat");
} else if (OB_FAIL(column_stat_array_.push_back(new_osg_col_stat))) {
LOG_WARN("fail to push back", KR(ret));
}
if (OB_FAIL(ret)) {
if (new_osg_col_stat != nullptr) {
new_osg_col_stat->~ObOptOSGColumnStat();
allocator_.free(new_osg_col_stat);
new_osg_col_stat = nullptr;
}
}
}
return ret;
}
int ObDirectLoadFastHeapTableBuilder::collect_obj(const ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
const int64_t extra_rowkey_cnt = ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
for (int64_t i = 0; OB_SUCC(ret) && i < param_.table_data_desc_.column_count_; i++) {
const ObStorageDatum &datum =
datum_row.storage_datums_[i + extra_rowkey_cnt + 1];
const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i + 1).get_cmp_func();
const ObColDesc &col_desc = param_.col_descs_->at(i + 1);
ObOptOSGColumnStat *col_stat = column_stat_array_.at(i);
bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type());
if (col_stat != nullptr && is_valid) {
if (OB_FAIL(col_stat->update_column_stat_info(&datum,
col_desc.col_type_,
cmp_func.cmp_func_))) {
LOG_WARN("failed to update column stat info");
}
}
}
return ret;
}
int ObDirectLoadFastHeapTableBuilder::init(const ObDirectLoadFastHeapTableBuildParam &param)
@ -91,7 +142,9 @@ int ObDirectLoadFastHeapTableBuilder::init(const ObDirectLoadFastHeapTableBuildP
param_ = param;
allocator_.set_tenant_id(MTL_ID());
slice_writer_allocator_.set_tenant_id(MTL_ID());
if (OB_FAIL(param_.fast_heap_table_ctx_->get_tablet_context(
if (param_.online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) {
LOG_WARN("fail to inner init sql statistics", KR(ret));
} else if (OB_FAIL(param_.fast_heap_table_ctx_->get_tablet_context(
param_.tablet_id_, fast_heap_table_tablet_ctx_))) {
LOG_WARN("fail to get tablet context", KR(ret));
} else if (OB_FAIL(init_sstable_slice_ctx())) {
@ -181,7 +234,7 @@ int ObDirectLoadFastHeapTableBuilder::append_row(const ObTabletID &tablet_id,
}
if (OB_FAIL(slice_writer_->append_row(datum_row_))) {
LOG_WARN("fail to append row", KR(ret));
} else if (param_.insert_table_ctx_->collect_obj(param_.thread_idx_ ,datum_row_)) {
} else if (param_.online_opt_stat_gather_ && OB_FAIL(collect_obj(datum_row_))) {
LOG_WARN("fail to collect", KR(ret));
} else {
++row_count_;
@ -209,7 +262,6 @@ int ObDirectLoadFastHeapTableBuilder::close()
if (OB_FAIL(slice_writer_->close())) {
LOG_WARN("fail to close sstable slice writer", KR(ret));
} else {
param_.insert_table_ctx_->inc_row_count(row_count_);
is_closed_ = true;
}
}
@ -230,6 +282,7 @@ int ObDirectLoadFastHeapTableBuilder::get_tables(
ObDirectLoadFastHeapTableCreateParam create_param;
create_param.tablet_id_ = param_.tablet_id_;
create_param.row_count_ = row_count_;
create_param.column_stat_array_ = &column_stat_array_;
ObDirectLoadFastHeapTable *fast_heap_table = nullptr;
if (OB_ISNULL(fast_heap_table = OB_NEWx(ObDirectLoadFastHeapTable, (&allocator)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;

View File

@ -39,19 +39,20 @@ public:
ObDirectLoadFastHeapTableBuildParam();
~ObDirectLoadFastHeapTableBuildParam();
bool is_valid() const;
TO_STRING_KV(K_(tablet_id), K_(snapshot_version), K_(thread_idx), K_(table_data_desc),
KP_(datum_utils), KP_(col_descs), KP_(insert_table_ctx), KP_(fast_heap_table_ctx),
KP_(dml_row_handler));
TO_STRING_KV(K_(tablet_id), K_(snapshot_version), K_(table_data_desc), KP_(datum_utils),
KP_(col_descs), KP_(cmp_funcs), KP_(insert_table_ctx), KP_(fast_heap_table_ctx),
KP_(dml_row_handler), K_(online_opt_stat_gather));
public:
common::ObTabletID tablet_id_;
int64_t snapshot_version_;
int64_t thread_idx_;
ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
const common::ObIArray<share::schema::ObColDesc> *col_descs_;
const blocksstable::ObStoreCmpFuncs *cmp_funcs_;
ObDirectLoadInsertTableContext *insert_table_ctx_;
ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
bool online_opt_stat_gather_;
};
class ObDirectLoadFastHeapTableBuilder : public ObIDirectLoadPartitionTableBuilder
@ -69,6 +70,8 @@ public:
int get_tables(common::ObIArray<ObIDirectLoadPartitionTable *> &table_array,
common::ObIAllocator &allocator) override;
private:
int init_sql_statistics();
int collect_obj(const blocksstable::ObDatumRow &datum_row);
int init_sstable_slice_ctx();
int switch_sstable_slice();
private:
@ -79,6 +82,7 @@ private:
ObSSTableInsertSliceWriter *slice_writer_;
ObDirectLoadFastHeapTableTabletWriteCtx write_ctx_;
blocksstable::ObDatumRow datum_row_;
common::ObArray<ObOptOSGColumnStat *> column_stat_array_;
int64_t row_count_;
bool is_closed_;
bool is_inited_;

View File

@ -13,10 +13,6 @@
#include "storage/direct_load/ob_direct_load_insert_table_ctx.h"
#include "storage/ddl/ob_direct_insert_sstable_ctx.h"
#include "share/stat/ob_opt_table_stat.h"
#include "share/stat/ob_opt_column_stat.h"
#include "share/stat/ob_stat_item.h"
#include "observer/table_load/ob_table_load_schema.h"
namespace oceanbase
{
@ -24,26 +20,13 @@ namespace storage
{
using namespace common;
using namespace table;
using namespace sql;
using namespace observer;
/**
* ObDirectLoadInsertTableParam
*/
ObDirectLoadInsertTableParam::ObDirectLoadInsertTableParam()
: table_id_(OB_INVALID_ID),
dest_table_id_(OB_INVALID_ID),
schema_version_(0),
snapshot_version_(0),
execution_id_(0),
ddl_task_id_(0),
data_version_(0),
session_cnt_(0),
rowkey_column_count_(0),
column_count_(0),
online_opt_stat_gather_(false),
is_heap_table_(false)
: table_id_(OB_INVALID_ID), schema_version_(0), snapshot_version_(0), execution_id_(0), ddl_task_id_(0)
{
}
@ -53,25 +36,16 @@ ObDirectLoadInsertTableParam::~ObDirectLoadInsertTableParam()
bool ObDirectLoadInsertTableParam::is_valid() const
{
return OB_INVALID_ID != table_id_ && OB_INVALID_ID != dest_table_id_ && schema_version_ >= 0 &&
snapshot_version_ >= 0 && ls_partition_ids_.count() > 0;
return OB_INVALID_ID != table_id_ && schema_version_ >= 0 && snapshot_version_ >= 0 &&
ls_partition_ids_.count() > 0;
}
int ObDirectLoadInsertTableParam::assign(const ObDirectLoadInsertTableParam &other)
{
int ret = OB_SUCCESS;
table_id_ = other.table_id_;
dest_table_id_ = other.dest_table_id_;
schema_version_ = other.schema_version_;
snapshot_version_ = other.snapshot_version_;
data_version_ = other.data_version_;
session_cnt_ = other.session_cnt_;
rowkey_column_count_ = other.rowkey_column_count_;
column_count_ = other.column_count_;
online_opt_stat_gather_ = other.online_opt_stat_gather_;
is_heap_table_ = other.is_heap_table_;
col_descs_ = other.col_descs_;
cmp_funcs_ = other.cmp_funcs_;
if (OB_FAIL(ls_partition_ids_.assign(other.ls_partition_ids_))) {
LOG_WARN("fail to assign ls tablet ids", KR(ret));
}
@ -83,7 +57,7 @@ int ObDirectLoadInsertTableParam::assign(const ObDirectLoadInsertTableParam &oth
*/
ObDirectLoadInsertTableContext::ObDirectLoadInsertTableContext()
: allocator_("TLD_SqlStat"), tablet_finish_count_(0), table_row_count_(0), is_inited_(false)
: tablet_finish_count_(0), is_inited_(false)
{
}
@ -102,12 +76,6 @@ void ObDirectLoadInsertTableContext::reset()
}
ddl_ctrl_.context_id_ = 0;
}
for (int64_t i = 0; i < session_sql_ctx_array_.count(); ++i) {
ObTableLoadSqlStatistics *sql_statistics = session_sql_ctx_array_.at(i);
sql_statistics->~ObTableLoadSqlStatistics();
allocator_.free(sql_statistics);
}
session_sql_ctx_array_.reset();
is_inited_ = false;
}
@ -144,8 +112,6 @@ int ObDirectLoadInsertTableContext::init(const ObDirectLoadInsertTableParam &par
} else if (OB_FAIL(sstable_insert_mgr.create_table_context(table_insert_param,
ddl_ctrl_.context_id_))) {
LOG_WARN("fail to create table context", KR(ret), K(table_insert_param));
} else if (param_.online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) {
LOG_WARN("fail to init sql statistics", KR(ret));
} else {
is_inited_ = true;
}
@ -156,163 +122,6 @@ int ObDirectLoadInsertTableContext::init(const ObDirectLoadInsertTableParam &par
return ret;
}
int ObDirectLoadInsertTableContext::init_sql_statistics()
{
int ret = OB_SUCCESS;
ObOptTableStat *table_stat = nullptr;
for (int64_t i = 0; OB_SUCC(ret) && i < param_.session_cnt_; ++i) {
ObTableLoadSqlStatistics *sql_statistics = nullptr;
ObOptTableStat *table_stat = nullptr;
if (OB_ISNULL(sql_statistics = OB_NEWx(ObTableLoadSqlStatistics, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadSqlStatistics", KR(ret));
} else if (OB_FAIL(sql_statistics->allocate_table_stat(table_stat))) {
LOG_WARN("fail to allocate table stat", KR(ret));
} else if (OB_FAIL(session_sql_ctx_array_.push_back(sql_statistics))) {
LOG_WARN("fail to push back", KR(ret));
} else {
for (int64_t j = 0; OB_SUCC(ret) && j < param_.column_count_; ++j) {
ObOptOSGColumnStat *osg_col_stat = nullptr;
if (OB_FAIL(sql_statistics->allocate_col_stat(osg_col_stat))) {
LOG_WARN("fail to allocate col stat", KR(ret));
}
}
}
if (OB_FAIL(ret)) {
if (nullptr != sql_statistics) {
sql_statistics->~ObTableLoadSqlStatistics();
allocator_.free(sql_statistics);
sql_statistics = nullptr;
}
}
}
return ret;
}
int ObDirectLoadInsertTableContext::collect_sql_statistics(ObTableLoadSqlStatistics &sql_statistics)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadTabletMergeCtx not init", KR(ret), KP(this));
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, param_.dest_table_id_,
schema_guard, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(param_));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret));
} else {
sql_statistics.reset();
int64_t table_row_cnt = table_row_count_;
int64_t table_avg_len = 0;
int64_t col_cnt = param_.column_count_;
uint64_t partition_id = -1;
ObOptTableStat *table_stat = nullptr;
StatLevel stat_level = TABLE_LEVEL;
if (table_schema->get_part_level() == PARTITION_LEVEL_ZERO) {
partition_id = param_.dest_table_id_;
}
if (OB_FAIL(sql_statistics.allocate_table_stat(table_stat))) {
LOG_WARN("fail to allocate table stat", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) {
int64_t col_id = param_.is_heap_table_ ? i + 1 : i;
ObOptOSGColumnStat *osg_col_stat = nullptr;
if (OB_FAIL(sql_statistics.allocate_col_stat(osg_col_stat))) {
LOG_WARN("fail to allocate table stat", KR(ret));
}
// scan session_sql_ctx_array
for (int64_t j = 0; OB_SUCC(ret) && j < session_sql_ctx_array_.count(); ++j) {
ObOptOSGColumnStat *tmp_col_stat =
session_sql_ctx_array_.at(j)->get_col_stat_array().at(i);
if (OB_FAIL(osg_col_stat->merge_column_stat(*tmp_col_stat))) {
LOG_WARN("fail to merge column stat", KR(ret));
}
}
if (OB_SUCC(ret)) {
osg_col_stat->col_stat_->calc_avg_len();
table_avg_len += osg_col_stat->col_stat_->get_avg_len();
osg_col_stat->col_stat_->set_table_id(param_.dest_table_id_);
osg_col_stat->col_stat_->set_partition_id(partition_id);
osg_col_stat->col_stat_->set_stat_level(stat_level);
osg_col_stat->col_stat_->set_column_id(param_.col_descs_->at(col_id).col_id_);
osg_col_stat->col_stat_->set_num_distinct(
ObGlobalNdvEval::get_ndv_from_llc(osg_col_stat->col_stat_->get_llc_bitmap()));
if (OB_FAIL(osg_col_stat->set_min_max_datum_to_obj())) {
LOG_WARN("failed to set min max datum to obj", K(ret));
}
}
}
if (OB_SUCC(ret)) {
table_stat->set_table_id(param_.dest_table_id_);
table_stat->set_partition_id(partition_id);
table_stat->set_object_type(stat_level);
table_stat->set_row_count(table_row_cnt);
table_stat->set_avg_row_size(table_avg_len);
}
}
}
return ret;
}
int ObDirectLoadInsertTableContext::collect_obj(int64_t thread_idx, const ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
const int64_t extra_rowkey_cnt = ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
if (param_.online_opt_stat_gather_) {
if (OB_UNLIKELY(thread_idx >= session_sql_ctx_array_.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected thread idx", KR(ret), K(thread_idx), K(session_sql_ctx_array_.count()));
} else {
ObTableLoadSqlStatistics *sql_stat = session_sql_ctx_array_.at(thread_idx);
if (param_.is_heap_table_ ) {
for (int64_t i = 0; OB_SUCC(ret) && i < param_.column_count_; i++) {
const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt + 1];
const ObColDesc &col_desc = param_.col_descs_->at(i + 1);
const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i + 1).get_cmp_func();
ObOptOSGColumnStat *col_stat = sql_stat->get_col_stat_array().at(i);
bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type());
if (col_stat != nullptr && is_valid) {
if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) {
LOG_WARN("Failed to merge obj", K(ret), KP(col_stat));
}
}
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < param_.rowkey_column_count_; i++) {
const ObStorageDatum &datum = datum_row.storage_datums_[i];
const ObColDesc &col_desc = param_.col_descs_->at(i);
const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i).get_cmp_func();
ObOptOSGColumnStat *col_stat = sql_stat->get_col_stat_array().at(i);
bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type());
if (col_stat != nullptr && is_valid) {
if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) {
LOG_WARN("Failed to merge obj", K(ret), KP(col_stat));
}
}
}
for (int64_t i = param_.rowkey_column_count_; OB_SUCC(ret) && i < param_.column_count_; i++) {
const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt];
const ObColDesc &col_desc = param_.col_descs_->at(i);
const ObCmpFunc &cmp_func = param_.cmp_funcs_->at(i).get_cmp_func();
ObOptOSGColumnStat *col_stat = session_sql_ctx_array_.at(thread_idx)->get_col_stat_array().at(i);
bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type());
if (col_stat != nullptr && is_valid) {
if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) {
LOG_WARN("Failed to merge obj", K(ret), KP(col_stat));
}
}
}
}
}
}
return ret;
}
int ObDirectLoadInsertTableContext::add_sstable_slice(const ObTabletID &tablet_id,
const ObMacroDataSeq &start_seq,
ObNewRowIterator &iter,
@ -391,7 +200,7 @@ int ObDirectLoadInsertTableContext::notify_tablet_finish(const ObTabletID &table
return ret;
}
int ObDirectLoadInsertTableContext::commit(ObTableLoadSqlStatistics &sql_statistics)
int ObDirectLoadInsertTableContext::commit()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -405,9 +214,8 @@ int ObDirectLoadInsertTableContext::commit(ObTableLoadSqlStatistics &sql_statist
ObSSTableInsertManager &sstable_insert_mgr = ObSSTableInsertManager::get_instance();
if (OB_FAIL(sstable_insert_mgr.finish_table_context(ddl_ctrl_.context_id_, true))) {
LOG_WARN("fail to finish table context", KR(ret), K_(ddl_ctrl));
} else if (FALSE_IT(ddl_ctrl_.context_id_ = 0)) {
} else if (param_.online_opt_stat_gather_ && OB_FAIL(collect_sql_statistics(sql_statistics))) {
LOG_WARN("fail to collect sql stats", KR(ret));
} else {
ddl_ctrl_.context_id_ = 0;
}
}
return ret;

View File

@ -12,7 +12,6 @@
#pragma once
#include "share/table/ob_table_load_define.h"
#include "share/table/ob_table_load_sql_statistics.h"
#include "sql/engine/px/ob_sub_trans_ctrl.h"
namespace oceanbase
@ -28,25 +27,14 @@ public:
~ObDirectLoadInsertTableParam();
int assign(const ObDirectLoadInsertTableParam &other);
bool is_valid() const;
TO_STRING_KV(K_(table_id), K_(dest_table_id), K_(schema_version), K_(snapshot_version),
K_(execution_id), K_(ddl_task_id), K_(data_version), K_(session_cnt),
K_(rowkey_column_count), K_(column_count), K_(online_opt_stat_gather),
K_(is_heap_table), K_(ls_partition_ids));
TO_STRING_KV(K_(table_id), K_(schema_version), K_(snapshot_version), K_(ls_partition_ids), K_(execution_id), K_(ddl_task_id));
public:
uint64_t table_id_;
uint64_t dest_table_id_;
int64_t schema_version_;
int64_t snapshot_version_;
int64_t execution_id_;
int64_t ddl_task_id_;
int64_t data_version_;
int64_t session_cnt_;
int64_t rowkey_column_count_;
int64_t column_count_;
bool online_opt_stat_gather_;
bool is_heap_table_;
const common::ObIArray<share::schema::ObColDesc> *col_descs_;
const blocksstable::ObStoreCmpFuncs *cmp_funcs_;
common::ObArray<table::ObTableLoadLSIdAndPartitionId> ls_partition_ids_;
};
@ -57,7 +45,6 @@ public:
~ObDirectLoadInsertTableContext();
void reset();
int init(const ObDirectLoadInsertTableParam &param);
int collect_obj(int64_t thread_idx, const blocksstable::ObDatumRow &datum_row);
int add_sstable_slice(const common::ObTabletID &tablet_id,
const blocksstable::ObMacroDataSeq &start_seq,
common::ObNewRowIterator &iter,
@ -67,20 +54,12 @@ public:
ObSSTableInsertSliceWriter *&slice_writer,
common::ObIAllocator &allocator);
int notify_tablet_finish(const common::ObTabletID &tablet_id);
int commit(table::ObTableLoadSqlStatistics &sql_statistics);
void inc_row_count(int64_t row_count) { ATOMIC_AAF(&table_row_count_, row_count); }
int64_t get_row_count() const { return table_row_count_; }
TO_STRING_KV(K_(param), K_(tablet_finish_count), K_(table_row_count), K_(ddl_ctrl));
int commit();
TO_STRING_KV(K_(param), K_(ddl_ctrl));
private:
int init_sql_statistics();
int collect_sql_statistics(table::ObTableLoadSqlStatistics &sql_statistics);
private:
common::ObArenaAllocator allocator_;
ObDirectLoadInsertTableParam param_;
sql::ObDDLCtrl ddl_ctrl_;
int64_t tablet_finish_count_ CACHE_ALIGNED;
int64_t table_row_count_ CACHE_ALIGNED;
common::ObArray<table::ObTableLoadSqlStatistics *> session_sql_ctx_array_;
bool is_inited_;
};

View File

@ -50,8 +50,10 @@ ObDirectLoadMergeParam::ObDirectLoadMergeParam()
snapshot_version_(0),
datum_utils_(nullptr),
col_descs_(nullptr),
cmp_funcs_(nullptr),
is_heap_table_(false),
is_fast_heap_table_(false),
online_opt_stat_gather_(false),
insert_table_ctx_(nullptr),
dml_row_handler_(nullptr)
{
@ -65,7 +67,8 @@ bool ObDirectLoadMergeParam::is_valid() const
{
return OB_INVALID_ID != table_id_ && 0 < rowkey_column_num_ && 0 < store_column_count_ &&
snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != datum_utils_ &&
nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != dml_row_handler_;
nullptr != col_descs_ && nullptr != cmp_funcs_ && nullptr != insert_table_ctx_ &&
nullptr != dml_row_handler_;
}
/**
@ -188,6 +191,7 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam &param,
} else {
allocator_.set_tenant_id(MTL_ID());
param_ = param;
target_partition_id_ = target_ls_partition_id.part_tablet_id_.partition_id_;
tablet_id_ = ls_partition_id.part_tablet_id_.tablet_id_;
target_tablet_id_ = target_ls_partition_id.part_tablet_id_.tablet_id_;
is_inited_ = true;
@ -196,6 +200,95 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam &param,
return ret;
}
int ObDirectLoadTabletMergeCtx::collect_sql_statistics(
const ObIArray<ObDirectLoadFastHeapTable *> &fast_heap_table_array, ObTableLoadSqlStatistics &sql_statistics)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObDirectLoadTabletMergeCtx not init", KR(ret), KP(this));
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, param_.target_table_id_, schema_guard,
table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(param_));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret));
} else {
int64_t table_row_cnt = 0;
int64_t table_avg_len = 0;
int64_t col_cnt = param_.table_data_desc_.column_count_;
ObOptTableStat *table_stat = nullptr;
StatLevel stat_level;
if (table_schema->get_part_level() == PARTITION_LEVEL_ZERO) {
stat_level = TABLE_LEVEL;
} else if (table_schema->get_part_level() == PARTITION_LEVEL_ONE) {
stat_level = PARTITION_LEVEL;
} else if (table_schema->get_part_level() == PARTITION_LEVEL_TWO) {
stat_level = SUBPARTITION_LEVEL;
} else {
stat_level = INVALID_LEVEL;
}
if (OB_FAIL(sql_statistics.allocate_table_stat(table_stat))) {
LOG_WARN("fail to allocate table stat", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) {
int64_t col_id = param_.is_heap_table_ ? i + 1 : i;
int64_t row_count = 0;
ObOptOSGColumnStat *osg_col_stat = nullptr;
if (OB_FAIL(sql_statistics.allocate_col_stat(osg_col_stat))) {
LOG_WARN("fail to allocate table stat", KR(ret));
}
// scan task_array
for (int64_t j = 0; OB_SUCC(ret) && j < task_array_.count(); ++j) {
ObOptOSGColumnStat *tmp_col_stat = task_array_.at(j)->get_column_stat_array().at(i);
if (task_array_.at(j)->get_row_count() != 0) {
if (OB_FAIL(osg_col_stat->merge_column_stat(*tmp_col_stat))) {
LOG_WARN("fail to merge column stat", KR(ret));
} else {
row_count += task_array_.at(j)->get_row_count();
}
}
}
// scan fast heap table
for (int64_t j = 0; OB_SUCC(ret) && j < fast_heap_table_array.count(); ++j) {
ObOptOSGColumnStat *tmp_col_stat = fast_heap_table_array.at(j)->get_column_stat_array().at(i);
if (fast_heap_table_array.at(j)->get_row_count() != 0) {
if (OB_FAIL(osg_col_stat->merge_column_stat(*tmp_col_stat))) {
LOG_WARN("fail to merge column stat", KR(ret));
} else {
row_count += fast_heap_table_array.at(j)->get_row_count();
}
}
}
if (OB_SUCC(ret)) {
table_row_cnt = row_count;
osg_col_stat->col_stat_->calc_avg_len();
table_avg_len += osg_col_stat->col_stat_->get_avg_len();
osg_col_stat->col_stat_->set_table_id(param_.target_table_id_);
osg_col_stat->col_stat_->set_partition_id(target_partition_id_);
osg_col_stat->col_stat_->set_stat_level(stat_level);
osg_col_stat->col_stat_->set_column_id(param_.col_descs_->at(col_id).col_id_);
osg_col_stat->col_stat_->set_num_distinct(ObGlobalNdvEval::get_ndv_from_llc(osg_col_stat->col_stat_->get_llc_bitmap()));
if (OB_FAIL(osg_col_stat->set_min_max_datum_to_obj())) {
LOG_WARN("failed to set min max datum to obj", K(ret));
}
}
}
if (OB_SUCC(ret)) {
table_stat->set_table_id(param_.target_table_id_);
table_stat->set_partition_id(target_partition_id_);
table_stat->set_object_type(stat_level);
table_stat->set_row_count(table_row_cnt);
table_stat->set_avg_row_size(table_avg_len);
}
}
}
return ret;
}
int ObDirectLoadTabletMergeCtx::collect_dml_stat(const common::ObIArray<ObDirectLoadFastHeapTable *> &fast_heap_table_array, ObTableLoadDmlStat &dml_stats)
{
int ret = OB_SUCCESS;

View File

@ -51,7 +51,8 @@ public:
bool is_valid() const;
TO_STRING_KV(K_(table_id), K_(target_table_id), K_(rowkey_column_num), K_(store_column_count),
K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), KP_(col_descs),
K_(is_heap_table), K_(is_fast_heap_table), KP_(insert_table_ctx), KP_(dml_row_handler));
KP_(cmp_funcs), K_(is_heap_table), K_(is_fast_heap_table),
K_(online_opt_stat_gather), KP_(insert_table_ctx), KP_(dml_row_handler));
public:
uint64_t table_id_;
uint64_t target_table_id_;
@ -61,8 +62,10 @@ public:
storage::ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
const common::ObIArray<share::schema::ObColDesc> *col_descs_;
const blocksstable::ObStoreCmpFuncs *cmp_funcs_;
bool is_heap_table_;
bool is_fast_heap_table_;
bool online_opt_stat_gather_;
ObDirectLoadInsertTableContext *insert_table_ctx_;
ObDirectLoadDMLRowHandler *dml_row_handler_;
};
@ -106,6 +109,8 @@ public:
int build_aggregate_merge_task_for_multiple_heap_table(
const common::ObIArray<ObIDirectLoadPartitionTable *> &table_array);
int inc_finish_count(bool &is_ready);
int collect_sql_statistics(
const common::ObIArray<ObDirectLoadFastHeapTable *> &fast_heap_table_array, table::ObTableLoadSqlStatistics &sql_statistics);
int collect_dml_stat(const common::ObIArray<ObDirectLoadFastHeapTable *> &fast_heap_table_array,
table::ObTableLoadDmlStat &dml_stats);
const ObDirectLoadMergeParam &get_param() const { return param_; }
@ -115,7 +120,7 @@ public:
{
return task_array_;
}
TO_STRING_KV(K_(param), K_(tablet_id), K_(target_tablet_id));
TO_STRING_KV(K_(param), K_(target_partition_id), K_(tablet_id), K_(target_tablet_id));
private:
int init_sstable_array(const common::ObIArray<ObIDirectLoadPartitionTable *> &table_array);
int init_multiple_sstable_array(
@ -143,6 +148,7 @@ private:
private:
common::ObArenaAllocator allocator_;
ObDirectLoadMergeParam param_;
uint64_t target_partition_id_;
common::ObTabletID tablet_id_;
common::ObTabletID target_tablet_id_;
ObDirectLoadOriginTable origin_table_;

View File

@ -21,6 +21,7 @@
#include "storage/direct_load/ob_direct_load_multiple_heap_table.h"
#include "storage/direct_load/ob_direct_load_origin_table.h"
namespace oceanbase
{
namespace storage
@ -28,7 +29,6 @@ namespace storage
using namespace common;
using namespace blocksstable;
using namespace share;
using namespace table;
/**
* ObDirectLoadPartitionMergeTask
@ -48,9 +48,16 @@ ObDirectLoadPartitionMergeTask::ObDirectLoadPartitionMergeTask()
ObDirectLoadPartitionMergeTask::~ObDirectLoadPartitionMergeTask()
{
for (int64_t i = 0; i < column_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = column_stat_array_.at(i);
if (col_stat != nullptr) {
col_stat->~ObOptOSGColumnStat();
col_stat = nullptr;
}
}
}
int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx)
int ObDirectLoadPartitionMergeTask::process()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -63,7 +70,9 @@ int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx)
ObSSTableInsertSliceWriter *writer = nullptr;
ObMacroDataSeq block_start_seq;
block_start_seq.set_parallel_degree(parallel_idx_);
if (OB_FAIL(construct_row_iter(allocator_, row_iter))) {
if (merge_param_->online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) {
LOG_WARN("fail to init sql statistics", KR(ret));
} else if (OB_FAIL(construct_row_iter(allocator_, row_iter))) {
LOG_WARN("fail to construct row iter", KR(ret));
} else if (OB_FAIL(merge_param_->insert_table_ctx_->construct_sstable_slice_writer(
target_tablet_id, block_start_seq, writer, allocator_))) {
@ -85,7 +94,7 @@ int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx)
}
} else if (OB_FAIL(writer->append_row(*const_cast<ObDatumRow *>(datum_row)))) {
LOG_WARN("fail to append row", KR(ret), KPC(datum_row));
} else if (OB_FAIL(merge_param_->insert_table_ctx_->collect_obj(thread_idx, *datum_row))) {
} else if (merge_param_->online_opt_stat_gather_ && OB_FAIL(collect_obj(*datum_row))) {
LOG_WARN("fail to collect statistics", KR(ret));
} else {
++affected_rows_;
@ -94,8 +103,6 @@ int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx)
if (OB_SUCC(ret)) {
if (OB_FAIL(writer->close())) {
LOG_WARN("fail to close writer", KR(ret));
} else {
merge_param_->insert_table_ctx_->inc_row_count(affected_rows_);
}
}
LOG_INFO("add sstable slice end", KR(ret), K(target_tablet_id), K(tablet_id),
@ -124,6 +131,74 @@ int ObDirectLoadPartitionMergeTask::process(int64_t thread_idx)
return ret;
}
int ObDirectLoadPartitionMergeTask::init_sql_statistics()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret)&& i < merge_param_->table_data_desc_.column_count_; ++i) {
ObOptOSGColumnStat *col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_);
if (OB_ISNULL(col_stat)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate buffer", KR(ret));
} else if (OB_FAIL(column_stat_array_.push_back(col_stat))){
LOG_WARN("fail to push back", KR(ret));
}
if (OB_FAIL(ret)) {
if (col_stat != nullptr) {
col_stat->~ObOptOSGColumnStat();
allocator_.free(col_stat);
col_stat = nullptr;
}
}
}
return ret;
}
int ObDirectLoadPartitionMergeTask::collect_obj(const ObDatumRow &datum_row)
{
int ret = OB_SUCCESS;
const int64_t extra_rowkey_cnt = ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
if (merge_param_->is_heap_table_ ) {
for (int64_t i = 0; OB_SUCC(ret) && i < merge_param_->table_data_desc_.column_count_; i++) {
const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt + 1];
const ObColDesc &col_desc = merge_param_->col_descs_->at(i + 1);
const ObCmpFunc &cmp_func = merge_param_->cmp_funcs_->at(i + 1).get_cmp_func();
ObOptOSGColumnStat *col_stat = column_stat_array_.at(i);
bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type());
if (col_stat != nullptr && is_valid) {
if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) {
LOG_WARN("Failed to merge obj", K(ret), KP(col_stat));
}
}
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < merge_param_->rowkey_column_num_; i++) {
const ObStorageDatum &datum = datum_row.storage_datums_[i];
const ObColDesc &col_desc = merge_param_->col_descs_->at(i);
const ObCmpFunc &cmp_func = merge_param_->cmp_funcs_->at(i).get_cmp_func();
ObOptOSGColumnStat *col_stat = column_stat_array_.at(i);
bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type());
if (col_stat != nullptr && is_valid) {
if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) {
LOG_WARN("Failed to merge obj", K(ret), KP(col_stat));
}
}
}
for (int64_t i = merge_param_->rowkey_column_num_; OB_SUCC(ret) && i < merge_param_->table_data_desc_.column_count_; i++) {
const ObStorageDatum &datum = datum_row.storage_datums_[i + extra_rowkey_cnt];
const ObColDesc &col_desc = merge_param_->col_descs_->at(i);
const ObCmpFunc &cmp_func = merge_param_->cmp_funcs_->at(i).get_cmp_func();
ObOptOSGColumnStat *col_stat = column_stat_array_.at(i);
bool is_valid = ObColumnStatParam::is_valid_opt_col_type(col_desc.col_type_.get_type());
if (col_stat != nullptr && is_valid) {
if (OB_FAIL(col_stat->update_column_stat_info(&datum, col_desc.col_type_, cmp_func.cmp_func_))) {
LOG_WARN("Failed to merge obj", K(ret), KP(col_stat));
}
}
}
}
return ret;
}
void ObDirectLoadPartitionMergeTask::stop()
{
is_stop_ = true;

View File

@ -41,18 +41,26 @@ class ObDirectLoadPartitionMergeTask : public common::ObDLinkBase<ObDirectLoadPa
public:
ObDirectLoadPartitionMergeTask();
virtual ~ObDirectLoadPartitionMergeTask();
int process(int64_t thread_idx);
int process();
const common::ObIArray<ObOptOSGColumnStat*> &get_column_stat_array() const
{
return column_stat_array_;
}
int64_t get_row_count() const { return affected_rows_; }
void stop();
TO_STRING_KV(KPC_(merge_param), KPC_(merge_ctx), K_(parallel_idx));
protected:
virtual int construct_row_iter(common::ObIAllocator &allocator,
ObIStoreRowIterator *&row_iter) = 0;
private:
int init_sql_statistics();
int collect_obj(const blocksstable::ObDatumRow &datum_row);
protected:
const ObDirectLoadMergeParam *merge_param_;
ObDirectLoadTabletMergeCtx *merge_ctx_;
int64_t parallel_idx_;
int64_t affected_rows_;
common::ObArray<ObOptOSGColumnStat*> column_stat_array_;
common::ObArenaAllocator allocator_;
bool is_stop_;
bool is_inited_;

View File

@ -33,9 +33,9 @@ using namespace table;
ObDirectLoadTableStoreParam::ObDirectLoadTableStoreParam()
: snapshot_version_(0),
thread_idx_(-1),
datum_utils_(nullptr),
col_descs_(nullptr),
cmp_funcs_(nullptr),
file_mgr_(nullptr),
is_multiple_mode_(false),
is_fast_heap_table_(false),
@ -53,8 +53,8 @@ ObDirectLoadTableStoreParam::~ObDirectLoadTableStoreParam()
bool ObDirectLoadTableStoreParam::is_valid() const
{
return snapshot_version_ > 0 && thread_idx_ >= 0 && table_data_desc_.is_valid() &&
nullptr != datum_utils_ && nullptr != col_descs_ && nullptr != file_mgr_ &&
return snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != datum_utils_ &&
nullptr != col_descs_ && nullptr != cmp_funcs_ && nullptr != file_mgr_ &&
(!is_fast_heap_table_ ||
(nullptr != insert_table_ctx_ && nullptr != fast_heap_table_ctx_)) &&
nullptr != dml_row_handler_;
@ -112,10 +112,11 @@ int ObDirectLoadTableStoreBucket::init(const ObDirectLoadTableStoreParam &param,
fast_heap_table_build_param.table_data_desc_ = param.table_data_desc_;
fast_heap_table_build_param.datum_utils_ = param.datum_utils_;
fast_heap_table_build_param.col_descs_ = param.col_descs_;
fast_heap_table_build_param.cmp_funcs_ = param.cmp_funcs_;
fast_heap_table_build_param.insert_table_ctx_ = param.insert_table_ctx_;
fast_heap_table_build_param.fast_heap_table_ctx_ = param.fast_heap_table_ctx_;
fast_heap_table_build_param.dml_row_handler_ = param.dml_row_handler_;
fast_heap_table_build_param.thread_idx_ = param.thread_idx_;
fast_heap_table_build_param.online_opt_stat_gather_ = param.online_opt_stat_gather_;
ObDirectLoadFastHeapTableBuilder *fast_heap_table_builder = nullptr;
if (OB_ISNULL(fast_heap_table_builder =
table_builder_allocator_->alloc<ObDirectLoadFastHeapTableBuilder>())) {

View File

@ -34,19 +34,20 @@ public:
ObDirectLoadTableStoreParam();
~ObDirectLoadTableStoreParam();
bool is_valid() const;
TO_STRING_KV(K_(snapshot_version), K_(thread_idx), K_(table_data_desc), KP_(datum_utils),
KP_(col_descs), KP_(file_mgr), K_(is_multiple_mode), K_(is_fast_heap_table),
TO_STRING_KV(K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), KP_(col_descs),
KP_(cmp_funcs), KP_(file_mgr), K_(is_multiple_mode), K_(is_fast_heap_table),
KP_(insert_table_ctx), KP_(fast_heap_table_ctx), KP_(dml_row_handler),
KP_(extra_buf), K_(extra_buf_size));
public:
int64_t snapshot_version_;
int64_t thread_idx_;
ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
const common::ObIArray<share::schema::ObColDesc> *col_descs_;
const blocksstable::ObStoreCmpFuncs *cmp_funcs_;
ObDirectLoadTmpFileManager *file_mgr_;
bool is_multiple_mode_;
bool is_fast_heap_table_;
bool online_opt_stat_gather_;
ObDirectLoadInsertTableContext *insert_table_ctx_;
ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_;
ObDirectLoadDMLRowHandler *dml_row_handler_;