diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index 297fc6cf70..0f37fa5177 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -211,6 +211,7 @@ ob_set_subtarget(ob_server table_load table_load/ob_table_load_service.cpp table_load/ob_table_load_store_ctx.cpp table_load/ob_table_load_store_trans.cpp + table_load/ob_table_load_store_trans_px_writer.cpp table_load/ob_table_load_store.cpp table_load/ob_table_load_struct.cpp table_load/ob_table_load_table_compactor.cpp diff --git a/src/observer/table_load/ob_table_load_bucket.cpp b/src/observer/table_load/ob_table_load_bucket.cpp index baaa380668..dfc291cc3c 100644 --- a/src/observer/table_load/ob_table_load_bucket.cpp +++ b/src/observer/table_load/ob_table_load_bucket.cpp @@ -40,7 +40,6 @@ int ObTableLoadBucket::init(const ObAddr &leader_addr) { int ObTableLoadBucket::add_row(const ObTabletID &tablet_id, const ObTableLoadObjRow &obj_row, - int64_t count, int64_t batch_size, bool &flag) { diff --git a/src/observer/table_load/ob_table_load_bucket.h b/src/observer/table_load/ob_table_load_bucket.h index abc91ab59c..9a9526ca85 100644 --- a/src/observer/table_load/ob_table_load_bucket.h +++ b/src/observer/table_load/ob_table_load_bucket.h @@ -32,7 +32,6 @@ public: int add_row(const common::ObTabletID &tablet_id, const table::ObTableLoadObjRow &obj_row, - int64_t count, int64_t batch_size, bool &flag); diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index a4fb7abf3b..18703b1c83 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -335,57 +335,6 @@ int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user return ret; } -int ObTableLoadClientTask::get_column_idxs(ObIArray &column_idxs) const -{ - int ret = OB_SUCCESS; - const uint64_t tenant_id = param_.get_tenant_id(); - const uint64_t table_id = param_.get_table_id(); - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - ObArray column_ids; // in user define order - ObArray column_descs; // in storage order - bool found_column = true; - column_idxs.reset(); - if (OB_FAIL( - ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, column_ids))) { - LOG_WARN("failed to get all column idx", K(ret)); - } else if (OB_UNLIKELY(column_ids.empty())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected empty column idxs", KR(ret)); - } else if (OB_FAIL(table_schema->get_column_ids(column_descs))) { - LOG_WARN("fail to get column descs", KR(ret)); - } - for (int64_t i = 0; OB_SUCC(ret) && OB_LIKELY(found_column) && i < column_descs.count(); ++i) { - const ObColDesc &col_desc = column_descs.at(i); - const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_); - if (OB_ISNULL(col_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null column schema", KR(ret), K(col_desc)); - } else { - found_column = col_schema->is_hidden(); - } - // 在用户定义的列数组中找到对应的列 - for (int64_t j = 0; OB_SUCC(ret) && OB_LIKELY(!found_column) && j < column_ids.count(); ++j) { - const int64_t column_id = column_ids.at(j); - if (col_desc.col_id_ == column_id) { - found_column = true; - if (OB_FAIL(column_idxs.push_back(j))) { - LOG_WARN("fail to push back column desc", KR(ret), K(column_idxs), K(i), K(col_desc), - K(j), K(column_ids)); - } - } - } - } - if (OB_SUCC(ret) && OB_UNLIKELY(!found_column)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected column not found", KR(ret), K(column_ids), K(column_descs), - K(column_idxs)); - } - return ret; -} - int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us) { int ret = OB_SUCCESS; @@ -618,32 +567,44 @@ int ObTableLoadClientTask::init_instance() ret = OB_NOT_INIT; LOG_WARN("ObTableLoadClientTask not init", KR(ret)); } else { + const uint64_t tenant_id = param_.get_tenant_id(); + const uint64_t table_id = param_.get_table_id(); + ObDirectLoadMethod::Type method = ObDirectLoadMethod::FULL; + ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::NORMAL; omt::ObTenant *tenant = nullptr; - ObArray column_idxs; + ObSchemaGetterGuard schema_guard; + ObArray column_ids; if (OB_FAIL(GCTX.omt_->get_tenant(param_.get_tenant_id(), tenant))) { LOG_WARN("fail to get tenant handle", KR(ret), K(param_.get_tenant_id())); - } else if (OB_FAIL(get_column_idxs(column_idxs))) { - LOG_WARN("failed to get column idxs", K(ret)); - } else if (OB_UNLIKELY(column_idxs.empty())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected empty column idxs", KR(ret)); + } else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) { + LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_FAIL(ObTableLoadService::check_support_direct_load(schema_guard, + table_id, + method, + insert_mode))) { + LOG_WARN("fail to check support direct load", KR(ret)); + } else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(schema_guard, + tenant_id, + table_id, + column_ids))) { + LOG_WARN("fail to get user column ids", KR(ret)); } else { ObTableLoadParam load_param; - load_param.tenant_id_ = param_.get_tenant_id(); - load_param.table_id_ = param_.get_table_id(); + load_param.tenant_id_ = tenant_id; + load_param.table_id_ = table_id; load_param.parallel_ = param_.get_parallel(); load_param.session_count_ = load_param.parallel_; load_param.batch_size_ = 100; load_param.max_error_row_count_ = param_.get_max_error_row_count(); - load_param.column_count_ = column_idxs.count(); + load_param.column_count_ = column_ids.count(); load_param.need_sort_ = true; load_param.px_mode_ = false; load_param.online_opt_stat_gather_ = false; // 支持统计信息收集需要构造ObExecContext load_param.dup_action_ = param_.get_dup_action(); - load_param.method_ = ObDirectLoadMethod::FULL; - load_param.insert_mode_ = ObDirectLoadInsertMode::NORMAL; + load_param.method_ = method; + load_param.insert_mode_ = insert_mode; const ObTableLoadTableCtx *tmp_ctx = nullptr; - if (OB_FAIL(instance_.init(load_param, column_idxs, exec_ctx_))) { + if (OB_FAIL(instance_.init(load_param, column_ids, exec_ctx_))) { LOG_WARN("fail to init instance", KR(ret)); } else if (OB_ISNULL(tmp_ctx = instance_.get_table_ctx())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/observer/table_load/ob_table_load_client_task.h b/src/observer/table_load/ob_table_load_client_task.h index 6a4f959693..8de4da928c 100644 --- a/src/observer/table_load/ob_table_load_client_task.h +++ b/src/observer/table_load/ob_table_load_client_task.h @@ -107,7 +107,6 @@ private: sql::ObFreeSessionCtx &free_session_ctx); int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us); - int get_column_idxs(ObIArray &column_idxs) const; int init_instance(); int commit_instance(); void destroy_instance(); diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 0bcf6683a4..054ec67e33 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -77,14 +77,15 @@ bool ObTableLoadCoordinator::is_ctx_inited(ObTableLoadTableCtx *ctx) return ret; } -int ObTableLoadCoordinator::init_ctx(ObTableLoadTableCtx *ctx, const ObIArray &idx_array, +int ObTableLoadCoordinator::init_ctx(ObTableLoadTableCtx *ctx, + const ObIArray &column_ids, ObTableLoadExecCtx *exec_ctx) { int ret = OB_SUCCESS; if (OB_ISNULL(ctx)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid agrs", KR(ret)); - } else if (OB_FAIL(ctx->init_coordinator_ctx(idx_array, exec_ctx))) { + } else if (OB_FAIL(ctx->init_coordinator_ctx(column_ids, exec_ctx))) { LOG_WARN("fail to init coordinator ctx", KR(ret)); } return ret; diff --git a/src/observer/table_load/ob_table_load_coordinator.h b/src/observer/table_load/ob_table_load_coordinator.h index 6504b8fab9..352f48c40c 100644 --- a/src/observer/table_load/ob_table_load_coordinator.h +++ b/src/observer/table_load/ob_table_load_coordinator.h @@ -44,7 +44,8 @@ class ObTableLoadCoordinator public: ObTableLoadCoordinator(ObTableLoadTableCtx *ctx); static bool is_ctx_inited(ObTableLoadTableCtx *ctx); - static int init_ctx(ObTableLoadTableCtx *ctx, const common::ObIArray &idx_array, + static int init_ctx(ObTableLoadTableCtx *ctx, + const common::ObIArray &column_ids, ObTableLoadExecCtx *exec_ctx); static void abort_ctx(ObTableLoadTableCtx *ctx); int init(); diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index a43e4d4cd8..6a32d58b24 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -94,7 +94,7 @@ int ObTableLoadCoordinatorCtx::init_partition_location() return ret; } -int ObTableLoadCoordinatorCtx::init(const ObIArray &idx_array, +int ObTableLoadCoordinatorCtx::init(const ObIArray &column_ids, ObTableLoadExecCtx *exec_ctx) { int ret = OB_SUCCESS; @@ -102,20 +102,22 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &idx_array, ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadCoordinatorCtx init twice", KR(ret), KP(this)); } else if (OB_UNLIKELY( - idx_array.count() != ctx_->param_.column_count_ || nullptr == exec_ctx || + column_ids.count() != ctx_->param_.column_count_ || nullptr == exec_ctx || !exec_ctx->is_valid() || (ctx_->param_.online_opt_stat_gather_ && nullptr == exec_ctx->get_exec_ctx()))) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(ctx_->param_), K(idx_array.count()), KPC(exec_ctx)); + LOG_WARN("invalid args", KR(ret), K(ctx_->param_), K(column_ids), KPC(exec_ctx)); } else { if (OB_FAIL(target_schema_.init(ctx_->param_.tenant_id_, ctx_->ddl_param_.dest_table_id_))) { LOG_WARN("fail to init table load schema", KR(ret), K(ctx_->param_.tenant_id_), K(ctx_->ddl_param_.dest_table_id_)); } - // init idx array - else if (OB_FAIL(idx_array_.assign(idx_array))) { - LOG_WARN("failed to assign idx array", KR(ret), K(idx_array)); - } else if (OB_FAIL(init_partition_location())) { + // init column idxs + else if (OB_FAIL(init_column_idxs(column_ids))) { + LOG_WARN("fail to init column idxs", KR(ret), K(column_ids)); + } + // init partition_location_ + else if (OB_FAIL(init_partition_location())) { LOG_WARN("fail to init partition location", KR(ret)); } // init partition_calc_ @@ -353,6 +355,34 @@ int ObTableLoadCoordinatorCtx::alloc_trans(const ObTableLoadSegmentID &segment_i return ret; } +int ObTableLoadCoordinatorCtx::init_column_idxs(const ObIArray &column_ids) +{ + int ret = OB_SUCCESS; + idx_array_.reset(); + const ObIArray &column_descs = ctx_->schema_.column_descs_; + bool found_column = true; + for (int64_t i = 0; OB_SUCC(ret) && OB_LIKELY(found_column) && i < column_descs.count(); ++i) { + const ObColDesc &col_desc = column_descs.at(i); + found_column = (ctx_->schema_.is_heap_table_ && i == 0); // skip hidden pk in heap table + // 在源数据的列数组中找到对应的列 + for (int64_t j = 0; OB_SUCC(ret) && OB_LIKELY(!found_column) && j < column_ids.count(); ++j) { + const uint64_t column_id = column_ids.at(j); + if (col_desc.col_id_ == column_id) { + found_column = true; + if (OB_FAIL(idx_array_.push_back(j))) { + LOG_WARN("fail to push back column idx", KR(ret), K(idx_array_), K(i), K(col_desc), K(j), + K(column_ids)); + } + } + } + } + if (OB_SUCC(ret) && OB_UNLIKELY(!found_column)) { + ret = OB_SCHEMA_NOT_UPTODATE; + LOG_WARN("column not found", KR(ret), K(idx_array_), K(column_descs), K(column_ids)); + } + return ret; +} + int ObTableLoadCoordinatorCtx::generate_autoinc_params(AutoincParam &autoinc_param) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.h b/src/observer/table_load/ob_table_load_coordinator_ctx.h index 84a775b240..09bb425ba7 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.h +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.h @@ -43,7 +43,7 @@ class ObTableLoadCoordinatorCtx public: ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx); ~ObTableLoadCoordinatorCtx(); - int init(const common::ObIArray &idx_array, ObTableLoadExecCtx *exec_ctx); + int init(const common::ObIArray &column_ids, ObTableLoadExecCtx *exec_ctx); void stop(); void destroy(); bool is_valid() const { return is_inited_; } @@ -119,6 +119,7 @@ private: int alloc_trans_ctx(const table::ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx); int alloc_trans(const table::ObTableLoadSegmentID &segment_id, ObTableLoadCoordinatorTrans *&trans); + int init_column_idxs(const common::ObIArray &column_ids); int init_session_ctx_array(); int generate_autoinc_params(share::AutoincParam &autoinc_param); int init_sequence(); diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index b65b3afb9c..791bfb7916 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -63,7 +63,8 @@ void ObTableLoadInstance::destroy() } } -int ObTableLoadInstance::init(ObTableLoadParam ¶m, const ObIArray &idx_array, +int ObTableLoadInstance::init(ObTableLoadParam ¶m, + const ObIArray &column_ids, ObTableLoadExecCtx *execute_ctx) { int ret = OB_SUCCESS; @@ -84,18 +85,18 @@ int ObTableLoadInstance::init(ObTableLoadParam ¶m, const ObIArray & else if (OB_FAIL(ObTableLoadService::check_tenant())) { LOG_WARN("fail to check tenant", KR(ret), K(param.tenant_id_)); } - // check support + // start stmt + else if (OB_FAIL(start_stmt(param))) { + LOG_WARN("fail to start stmt", KR(ret), K(param)); + } + // double check support for concurrency of direct load and ddl else if (OB_FAIL(ObTableLoadService::check_support_direct_load(param.table_id_, param.method_, param.insert_mode_))) { LOG_WARN("fail to check support direct load", KR(ret), K(param)); } - // start stmt - else if (OB_FAIL(start_stmt(param))) { - LOG_WARN("fail to start stmt", KR(ret), K(param)); - } // start direct load - else if (OB_FAIL(start_direct_load(param, idx_array))) { + else if (OB_FAIL(start_direct_load(param, column_ids))) { LOG_WARN("fail to start direct load", KR(ret)); } // start trans @@ -479,7 +480,7 @@ int ObTableLoadInstance::abort_redef_table() } int ObTableLoadInstance::start_direct_load(const ObTableLoadParam ¶m, - const ObIArray &idx_array) + const ObIArray &column_ids) { int ret = OB_SUCCESS; ObTableLoadTableCtx *table_ctx = nullptr; @@ -492,7 +493,7 @@ int ObTableLoadInstance::start_direct_load(const ObTableLoadParam ¶m, LOG_WARN("fail to alloc table ctx", KR(ret), K(param)); } else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info))) { LOG_WARN("fail to init table ctx", KR(ret)); - } else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, idx_array, execute_ctx_))) { + } else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, execute_ctx_))) { LOG_WARN("fail to coordinator init ctx", KR(ret)); } else if (OB_FAIL(ObTableLoadService::add_ctx(table_ctx))) { LOG_WARN("fail to add ctx", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_instance.h b/src/observer/table_load/ob_table_load_instance.h index 3b3f28cbc2..66746e1d03 100644 --- a/src/observer/table_load/ob_table_load_instance.h +++ b/src/observer/table_load/ob_table_load_instance.h @@ -37,7 +37,9 @@ public: ObTableLoadInstance(); ~ObTableLoadInstance(); void destroy(); - int init(ObTableLoadParam ¶m, const common::ObIArray &idx_array, + // column_ids不包含堆表的hidden pk + int init(ObTableLoadParam ¶m, + const common::ObIArray &column_ids, ObTableLoadExecCtx *execute_ctx); int write(int32_t session_id, const table::ObTableLoadObjRowArray &obj_rows); int commit(); @@ -62,7 +64,7 @@ private: int abort_redef_table(); private: // direct load - int start_direct_load(const ObTableLoadParam ¶m, const common::ObIArray &idx_array); + int start_direct_load(const ObTableLoadParam ¶m, const common::ObIArray &column_ids); int wait_begin_finish(); int end_direct_load(const bool commit); int add_tx_result_to_user_session(); diff --git a/src/observer/table_load/ob_table_load_schema.cpp b/src/observer/table_load/ob_table_load_schema.cpp index 4525fade3d..6a276b141f 100644 --- a/src/observer/table_load/ob_table_load_schema.cpp +++ b/src/observer/table_load/ob_table_load_schema.cpp @@ -110,11 +110,11 @@ int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t table_id, return ret; } -int ObTableLoadSchema::get_user_column_names(const ObTableSchema *table_schema, - ObIAllocator &allocator, - ObIArray &column_names) +int ObTableLoadSchema::get_user_column_schemas(const ObTableSchema *table_schema, + ObIArray &column_schemas) { int ret = OB_SUCCESS; + column_schemas.reset(); if (OB_ISNULL(table_schema)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(table_schema)); @@ -133,47 +133,108 @@ int ObTableLoadSchema::get_user_column_names(const ObTableSchema *table_schema, ret = OB_ERR_UNEXPECTED; LOG_WARN("The column is null", KR(ret)); } else if (column_schema->is_hidden()) { - // 不显示隐藏pk - } else { - ObString column_name; - if (OB_FAIL( - ob_write_string(allocator, column_schema->get_column_name_str(), column_name))) { - LOG_WARN("fail to write string", KR(ret), K(column_name)); - } else if (OB_FAIL(column_names.push_back(column_name))) { - LOG_WARN("fail to push back column name", KR(ret)); - } + // 不显示隐藏主键列 + } else if (column_schema->is_unused()) { + // 不显示快速删除列 + } else if (column_schema->is_shadow_column()) { + // 不显示shadow列 + } else if (OB_FAIL(column_schemas.push_back(column_schema))) { + LOG_WARN("fail to push back column schema", KR(ret)); } } } return ret; } -int ObTableLoadSchema::get_user_column_ids(const ObTableSchema *table_schema, - ObIArray &column_ids) +int ObTableLoadSchema::get_user_column_schemas(ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t table_id, + ObIArray &column_schemas) +{ + int ret = OB_SUCCESS; + const ObTableSchema *table_schema = nullptr; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || OB_INVALID_ID == table_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); + } else { + ret = get_user_column_schemas(table_schema, column_schemas); + } + return ret; +} + +int ObTableLoadSchema::get_user_column_ids(ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t table_id, + ObIArray &column_ids) { int ret = OB_SUCCESS; column_ids.reset(); - if (OB_ISNULL(table_schema)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), KP(table_schema)); + ObArray column_schemas; + if (OB_FAIL(get_user_column_schemas(schema_guard, tenant_id, table_id, column_schemas))) { + LOG_WARN("fail to get user column schemas", KR(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < column_schemas.count(); ++i) { + const ObColumnSchemaV2 *column_schema = column_schemas.at(i); + if (OB_FAIL(column_ids.push_back(column_schema->get_column_id()))) { + LOG_WARN("fail to push back column id", KR(ret)); + } + } + return ret; +} + +int ObTableLoadSchema::get_user_column_count(ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t table_id, + int64_t &column_count) +{ + int ret = OB_SUCCESS; + column_count = 0; + ObArray column_schemas; + if (OB_FAIL(get_user_column_schemas(schema_guard, tenant_id, table_id, column_schemas))) { + LOG_WARN("fail to get user column schemas", KR(ret)); } else { - ObColumnIterByPrevNextID iter(*table_schema); - const ObColumnSchemaV2 *column_schema = NULL; - while (OB_SUCC(ret)) { - if (OB_FAIL(iter.next(column_schema))) { - if (OB_UNLIKELY(OB_ITER_END != ret)) { - LOG_WARN("fail to iterate all table columns", KR(ret)); - } else { - ret = OB_SUCCESS; - break; - } - } else if (OB_ISNULL(column_schema)) { + column_count = column_schemas.count(); + } + return ret; +} + +int ObTableLoadSchema::get_column_ids(ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t table_id, + ObIArray &column_ids, + bool contain_hidden_pk_column) +{ + int ret = OB_SUCCESS; + column_ids.reset(); + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || OB_INVALID_ID == table_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(tenant_id), K(table_id)); + } else { + const ObTableSchema *table_schema = nullptr; + ObArray column_descs; + if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_FAIL(table_schema->get_column_ids(column_descs))) { + STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < column_descs.count(); ++i) { + const ObColDesc &col_desc = column_descs.at(i); + const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_); + if (OB_ISNULL(col_schema)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("The column is null", KR(ret)); - } else if (column_schema->is_hidden()) { - // 不显示隐藏pk - } else if (OB_FAIL(column_ids.push_back(column_schema->get_column_id()))) { - LOG_WARN("fail to push back column id", KR(ret)); + LOG_WARN("unexpected null column schema", KR(ret), K(col_desc)); + } else if (ObColumnSchemaV2::is_hidden_pk_column_id(col_schema->get_column_id()) && + !contain_hidden_pk_column) { + } else if (OB_FAIL(column_ids.push_back(col_schema->get_column_id()))) { + LOG_WARN("failed to push back column id", KR(ret), K(i)); } } } diff --git a/src/observer/table_load/ob_table_load_schema.h b/src/observer/table_load/ob_table_load_schema.h index 244d6ec345..6bdd1b56cb 100644 --- a/src/observer/table_load/ob_table_load_schema.h +++ b/src/observer/table_load/ob_table_load_schema.h @@ -38,11 +38,25 @@ public: static int get_table_schema(uint64_t tenant_id, uint64_t table_id, share::schema::ObSchemaGetterGuard &schema_guard, const share::schema::ObTableSchema *&table_schema); - static int get_user_column_names(const share::schema::ObTableSchema *table_schema, - common::ObIAllocator &allocator, - common::ObIArray &column_names); - static int get_user_column_ids(const share::schema::ObTableSchema *table_schema, - common::ObIArray &column_ids); + static int get_user_column_schemas(const share::schema::ObTableSchema *table_schema, + ObIArray &column_schemas); + static int get_user_column_schemas(share::schema::ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t table_id, + ObIArray &column_schemas); + static int get_user_column_ids(share::schema::ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t table_id, + common::ObIArray &column_ids); + static int get_user_column_count(share::schema::ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t table_id, + int64_t &column_count); + static int get_column_ids(share::schema::ObSchemaGetterGuard &schema_guard, + uint64_t tenant_id, + uint64_t table_id, + common::ObIArray &column_ids, + bool contain_hidden_pk_column = false); static int check_has_udt_column(const share::schema::ObTableSchema *table_schema, bool &bret); static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value); static int get_lob_meta_tid(const uint64_t tenant_id, diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index 5edec52cba..b423689a2d 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -435,27 +435,68 @@ int ObTableLoadService::check_support_direct_load( const ObDirectLoadInsertMode::Type insert_mode) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(OB_INVALID_ID == table_id || - !ObDirectLoadMethod::is_type_valid(method) || - !ObDirectLoadInsertMode::is_type_valid(insert_mode))) { + if (OB_UNLIKELY(OB_INVALID_ID == table_id)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(table_id), K(method), K(insert_mode)); + LOG_WARN("invalid args", KR(ret), K(table_id)); } else { const uint64_t tenant_id = MTL_ID(); ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema = nullptr; + if (OB_FAIL( + ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else { + ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode); + } + } + return ret; +} + +int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_guard, + uint64_t table_id, + const ObDirectLoadMethod::Type method, + const ObDirectLoadInsertMode::Type insert_mode) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_ID == table_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(table_id)); + } else { + const uint64_t tenant_id = MTL_ID(); + const ObTableSchema *table_schema = nullptr; + if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("table schema is null", KR(ret)); + } else { + ret = check_support_direct_load(schema_guard, table_schema, method, insert_mode); + } + } + return ret; +} + +int ObTableLoadService::check_support_direct_load(ObSchemaGetterGuard &schema_guard, + const ObTableSchema *table_schema, + const ObDirectLoadMethod::Type method, + const ObDirectLoadInsertMode::Type insert_mode) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(nullptr == table_schema || + !ObDirectLoadMethod::is_type_valid(method) || + !ObDirectLoadInsertMode::is_type_valid(insert_mode))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(table_schema), K(method), K(insert_mode)); + } else { + const uint64_t tenant_id = MTL_ID(); bool trigger_enabled = false; bool has_udt_column = false; bool has_fts_index = false; bool has_multivalue_index = false; bool has_invisible_column = false; bool has_unused_column = false; - if (OB_FAIL( - ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } // check if it is a user table - else if (!table_schema->is_user_table()) { + if (!table_schema->is_user_table()) { ret = OB_NOT_SUPPORTED; if (lib::is_oracle_mode() && table_schema->is_tmp_table()) { LOG_WARN("direct-load does not support oracle temporary table", KR(ret)); @@ -550,11 +591,11 @@ int ObTableLoadService::check_support_direct_load( ret = OB_NOT_SUPPORTED; LOG_WARN("incremental direct-load does not support table with foreign keys", KR(ret)); FORWARD_USER_ERROR_MSG(ret, "incremental direct-load does not support table with foreign keys"); - } else if (ObDirectLoadMethod::is_full(method)) { // full direct-load - if (OB_UNLIKELY(!ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected insert mode for full direct-load", KR(ret), K(method), K(insert_mode)); - } + } + } else if (ObDirectLoadMethod::is_full(method)) { // full direct-load + if (OB_UNLIKELY(!ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected insert mode for full direct-load", KR(ret), K(method), K(insert_mode)); } } } diff --git a/src/observer/table_load/ob_table_load_service.h b/src/observer/table_load/ob_table_load_service.h index db5687abc3..7da62d9f9d 100644 --- a/src/observer/table_load/ob_table_load_service.h +++ b/src/observer/table_load/ob_table_load_service.h @@ -35,7 +35,17 @@ class ObTableLoadService public: static int mtl_init(ObTableLoadService *&service); static int check_tenant(); - static int check_support_direct_load(const uint64_t table_id, + // 旁路导入内核获取加表锁后的schema进行检查 + static int check_support_direct_load(uint64_t table_id, + const storage::ObDirectLoadMethod::Type method, + const storage::ObDirectLoadInsertMode::Type insert_mode); + // 业务层指定schema_guard进行检查 + static int check_support_direct_load(share::schema::ObSchemaGetterGuard &schema_guard, + uint64_t table_id, + const storage::ObDirectLoadMethod::Type method, + const storage::ObDirectLoadInsertMode::Type insert_mode); + static int check_support_direct_load(share::schema::ObSchemaGetterGuard &schema_guard, + const share::schema::ObTableSchema *table_schema, const storage::ObDirectLoadMethod::Type method, const storage::ObDirectLoadInsertMode::Type insert_mode); static ObTableLoadTableCtx *alloc_ctx(); diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index a3a2e4f3f3..d60eeaff80 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -18,6 +18,7 @@ #include "observer/table_load/ob_table_load_stat.h" #include "observer/table_load/ob_table_load_store_ctx.h" #include "observer/table_load/ob_table_load_store_trans.h" +#include "observer/table_load/ob_table_load_store_trans_px_writer.h" #include "observer/table_load/ob_table_load_table_ctx.h" #include "observer/table_load/ob_table_load_task.h" #include "observer/table_load/ob_table_load_task_scheduler.h" @@ -1056,36 +1057,17 @@ int ObTableLoadStore::px_finish_trans(const ObTableLoadTransId &trans_id) return ret; } -int ObTableLoadStore::px_check_for_write(const ObTabletID &tablet_id) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); - } else { - bool is_exist = false; - for (int64_t i = 0; i < store_ctx_->ls_partition_ids_.count(); ++i) { - const ObTableLoadLSIdAndPartitionId &ls_part_id = store_ctx_->ls_partition_ids_.at(i); - if (ls_part_id.part_tablet_id_.tablet_id_ == tablet_id) { - is_exist = true; - break; - } - } - if (OB_UNLIKELY(!is_exist)) { - ret = OB_NOT_MASTER; - LOG_WARN("not partition master", KR(ret), K(tablet_id), K(store_ctx_->ls_partition_ids_)); - } - } - return ret; -} - -int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id, - const ObTabletID &tablet_id, const ObIArray &row_array) +int ObTableLoadStore::px_get_trans_writer(const ObTableLoadTransId &trans_id, + ObTableLoadStoreTransPXWriter &writer) { int ret = OB_SUCCESS; + writer.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(!trans_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(trans_id)); } else { ObTableLoadStoreTrans *trans = nullptr; ObTableLoadTransStoreWriter *store_writer = nullptr; @@ -1096,16 +1078,8 @@ int ObTableLoadStore::px_write(const ObTableLoadTransId &trans_id, LOG_WARN("unexpected trans id", KR(ret), K(trans_id), KPC(trans)); } else if (OB_FAIL(trans->get_store_writer(store_writer))) { LOG_WARN("fail to get store writer", KR(ret)); - } else { - if (OB_SUCC(trans->check_trans_status(ObTableLoadTransStatusType::RUNNING)) || - OB_SUCC(trans->check_trans_status(ObTableLoadTransStatusType::FROZEN))) { - int32_t session_id = 1; // in px mode, each trans contains only 1 session - if (OB_FAIL(store_writer->write(session_id, tablet_id, row_array))) { - LOG_WARN("fail to write store", KR(ret)); - } else { - LOG_DEBUG("succeed to write store", K(trans_id), K(tablet_id)); - } - } + } else if (OB_FAIL(writer.init(store_ctx_, trans, store_writer))) { + LOG_WARN("fail to init writer", KR(ret)); } if (OB_NOT_NULL(trans)) { if (OB_NOT_NULL(store_writer)) { diff --git a/src/observer/table_load/ob_table_load_store.h b/src/observer/table_load/ob_table_load_store.h index ac70f95634..283f52cb45 100644 --- a/src/observer/table_load/ob_table_load_store.h +++ b/src/observer/table_load/ob_table_load_store.h @@ -29,6 +29,7 @@ namespace observer class ObTableLoadTableCtx; class ObTableLoadStoreCtx; class ObTableLoadStoreTrans; +class ObTableLoadStoreTransPXWriter; class ObTableLoadStore { @@ -93,10 +94,7 @@ private: public: int px_start_trans(const table::ObTableLoadTransId &trans_id); int px_finish_trans(const table::ObTableLoadTransId &trans_id); - int px_check_for_write(const ObTabletID &tablet_id); - int px_write(const table::ObTableLoadTransId &trans_id, - const ObTabletID &tablet_id, - const common::ObIArray &row_array); + int px_get_trans_writer(const table::ObTableLoadTransId &trans_id, ObTableLoadStoreTransPXWriter &writer); static int px_abandon_trans(ObTableLoadTableCtx *ctx, const table::ObTableLoadTransId &trans_id); private: int px_flush(ObTableLoadStoreTrans *trans); diff --git a/src/observer/table_load/ob_table_load_store_trans_px_writer.cpp b/src/observer/table_load/ob_table_load_store_trans_px_writer.cpp new file mode 100644 index 0000000000..aa3d48a53e --- /dev/null +++ b/src/observer/table_load/ob_table_load_store_trans_px_writer.cpp @@ -0,0 +1,203 @@ +/** + * Copyright (c) 2024 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SERVER + +#include "observer/table_load/ob_table_load_store_trans_px_writer.h" +#include "observer/table_load/ob_table_load_store_ctx.h" +#include "observer/table_load/ob_table_load_store_trans.h" +#include "observer/table_load/ob_table_load_table_ctx.h" +#include "observer/table_load/ob_table_load_trans_store.h" + +namespace oceanbase +{ +namespace observer +{ +using namespace common; +using namespace share::schema; +using namespace table; + +ObTableLoadStoreTransPXWriter::ObTableLoadStoreTransPXWriter() + : store_ctx_(nullptr), + trans_(nullptr), + writer_(nullptr), + column_count_(0), + is_heap_table_(false), + can_write_(false), + is_inited_(false) +{ +} + +ObTableLoadStoreTransPXWriter::~ObTableLoadStoreTransPXWriter() +{ + reset(); +} + +void ObTableLoadStoreTransPXWriter::reset() +{ + is_inited_ = false; + can_write_ = false; + is_heap_table_ = false; + column_count_ = 0; + if (nullptr != store_ctx_) { + if (nullptr != trans_) { + if (nullptr != writer_) { + trans_->put_store_writer(writer_); + writer_ = nullptr; + } + store_ctx_->put_trans(trans_); + trans_ = nullptr; + } + store_ctx_ = nullptr; + } +} + +int ObTableLoadStoreTransPXWriter::init(ObTableLoadStoreCtx *store_ctx, + ObTableLoadStoreTrans *trans, + ObTableLoadTransStoreWriter *writer) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("ObTableLoadStoreTransPXWriter init twice", KR(ret), KP(this)); + } else if (OB_UNLIKELY(nullptr == store_ctx || nullptr == trans || nullptr == writer)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(store_ctx), KP(trans), KP(writer)); + } else if (OB_FAIL(store_ctx->check_status(ObTableLoadStatusType::LOADING))) { + LOG_WARN("fail to check status", KR(ret)); + } else if (OB_FAIL(trans->check_trans_status(ObTableLoadTransStatusType::RUNNING))) { + LOG_WARN("fail to check trans status", KR(ret)); + } else { + store_ctx_ = store_ctx; + trans_ = trans; + writer_ = writer; + trans_->inc_ref_count(); + writer_->inc_ref_count(); + is_inited_ = true; + } + return ret; +} + +int ObTableLoadStoreTransPXWriter::prepare_write(const ObTabletID &tablet_id, + const ObIArray &column_ids) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadStoreTransPXWriter not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(!tablet_id.is_valid() || column_ids.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(tablet_id), K(column_ids)); + } else if (OB_UNLIKELY(can_write_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("already can write", KR(ret), KPC(this)); + } else { + if (OB_FAIL(check_tablet(tablet_id))) { + LOG_WARN("fail to check tablet", KR(ret)); + } else if (OB_FAIL(check_columns(column_ids))) { + LOG_WARN("fail to check columns", KR(ret)); + } else { + tablet_id_ = tablet_id; + column_count_ = column_ids.count(); + is_heap_table_ = store_ctx_->ctx_->schema_.is_heap_table_; + can_write_ = true; + } + } + return ret; +} + +int ObTableLoadStoreTransPXWriter::check_tablet(const ObTabletID &tablet_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!tablet_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(tablet_id)); + } else if (OB_ISNULL(store_ctx_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected store ctx is null", KR(ret), KPC(this)); + } else { + bool tablet_found = false; + for (int64_t i = 0; i < store_ctx_->ls_partition_ids_.count(); ++i) { + const ObTableLoadLSIdAndPartitionId &ls_part_id = store_ctx_->ls_partition_ids_.at(i); + if (ls_part_id.part_tablet_id_.tablet_id_ == tablet_id) { + tablet_found = true; + break; + } + } + if (OB_UNLIKELY(!tablet_found)) { + ret = OB_TABLET_NOT_EXIST; + LOG_WARN("tablet id not found", KR(ret), K(tablet_id), K(store_ctx_->ls_partition_ids_)); + } + } + return ret; +} + +int ObTableLoadStoreTransPXWriter::check_columns(const ObIArray &column_ids) +{ + int ret = OB_SUCCESS; + ObTableLoadTableCtx *table_ctx = nullptr; + if (OB_UNLIKELY(column_ids.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(column_ids)); + } else if (OB_ISNULL(store_ctx_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected store ctx is null", KR(ret), KPC(this)); + } else if (OB_ISNULL(table_ctx = store_ctx_->ctx_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected table ctx is null", KR(ret), KPC(store_ctx_)); + } else { + if (OB_UNLIKELY(table_ctx->schema_.column_descs_.count() != column_ids.count())) { + ret = OB_SCHEMA_NOT_UPTODATE; + LOG_WARN("column count not match", KR(ret), K(table_ctx->schema_.column_descs_), + K(column_ids)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < column_ids.count(); ++i) { + const ObColDesc &col_desc = table_ctx->schema_.column_descs_.at(i); + const uint64_t column_id = column_ids.at(i); + if (OB_UNLIKELY(col_desc.col_id_ != column_id)) { + ret = OB_SCHEMA_NOT_UPTODATE; + LOG_WARN("column id not match", KR(ret), K(i), K(col_desc), K(column_id), + K(table_ctx->schema_.column_descs_), K(column_ids)); + } + } + } + return ret; +} + +int ObTableLoadStoreTransPXWriter::write(const ObNewRow &row) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadStoreTransPXWriter not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(!can_write_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("can not write", KR(ret), KPC(this)); + } else if (OB_UNLIKELY(!row.is_valid() || row.count_ != column_count_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(row), K(column_count_)); + } else { + ObNewRow new_row; + if (is_heap_table_) { + new_row.assign(row.cells_ + 1, row.count_ - 1); + } else { + new_row.assign(row.cells_, row.count_); + } + if (OB_FAIL(writer_->px_write(tablet_id_, new_row))) { + LOG_WARN("fail to px write", KR(ret), K(row), K(new_row)); + } + } + return ret; +} + +} // namespace observer +} // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_store_trans_px_writer.h b/src/observer/table_load/ob_table_load_store_trans_px_writer.h new file mode 100644 index 0000000000..d6ff26fd7f --- /dev/null +++ b/src/observer/table_load/ob_table_load_store_trans_px_writer.h @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2024 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#pragma once + +#include "common/ob_tablet_id.h" + +namespace oceanbase +{ +namespace common +{ +class ObNewRow; +} // namespace common +namespace observer +{ +class ObTableLoadStoreCtx; +class ObTableLoadStoreTrans; +class ObTableLoadTransStoreWriter; + +class ObTableLoadStoreTransPXWriter +{ +public: + ObTableLoadStoreTransPXWriter(); + ~ObTableLoadStoreTransPXWriter(); + + void reset(); + int init(ObTableLoadStoreCtx *store_ctx, + ObTableLoadStoreTrans *trans, + ObTableLoadTransStoreWriter *writer); + int prepare_write(const common::ObTabletID &tablet_id, + const common::ObIArray &column_ids); + int write(const common::ObNewRow &row); + + TO_STRING_KV(KP_(store_ctx), + KP_(trans), + KP_(writer), + K_(tablet_id), + K_(column_count), + K_(is_heap_table), + K_(can_write), + K_(is_inited)); + +private: + int check_tablet(const common::ObTabletID &tablet_id); + int check_columns(const common::ObIArray &column_ids); + +private: + ObTableLoadStoreCtx *store_ctx_; + ObTableLoadStoreTrans *trans_; + ObTableLoadTransStoreWriter *writer_; + ObTabletID tablet_id_; + int64_t column_count_; + bool is_heap_table_; + bool can_write_; + bool is_inited_; +}; + +} // namespace observer +} // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_table_ctx.cpp b/src/observer/table_load/ob_table_load_table_ctx.cpp index 3f399746e2..96a90540f5 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.cpp +++ b/src/observer/table_load/ob_table_load_table_ctx.cpp @@ -72,7 +72,7 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDD } else if (OB_UNLIKELY(param.column_count_ != (schema_.is_heap_table_ ? (schema_.store_column_count_ - 1) : schema_.store_column_count_))) { - ret = OB_ERR_UNEXPECTED; + ret = OB_SCHEMA_NOT_UPTODATE; LOG_WARN("unexpected column count", KR(ret), K(param.column_count_), K(schema_.store_column_count_), K(schema_.is_heap_table_)); } else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", param_.tenant_id_))) { LOG_WARN("fail to init allocator", KR(ret)); @@ -163,7 +163,7 @@ void ObTableLoadTableCtx::unregister_job_stat() } } -int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray &idx_array, +int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray &column_ids, ObTableLoadExecCtx *exec_ctx) { int ret = OB_SUCCESS; @@ -178,7 +178,7 @@ int ObTableLoadTableCtx::init_coordinator_ctx(const ObIArray &idx_array if (OB_ISNULL(coordinator_ctx = OB_NEWx(ObTableLoadCoordinatorCtx, (&allocator_), this))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadCoordinatorCtx", KR(ret)); - } else if (OB_FAIL(coordinator_ctx->init(idx_array, exec_ctx))) { + } else if (OB_FAIL(coordinator_ctx->init(column_ids, exec_ctx))) { LOG_WARN("fail to init coordinator ctx", KR(ret)); } else if (OB_FAIL(coordinator_ctx->set_status_inited())) { LOG_WARN("fail to set coordinator status inited", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_table_ctx.h b/src/observer/table_load/ob_table_load_table_ctx.h index dc7567116a..be692f211c 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.h +++ b/src/observer/table_load/ob_table_load_table_ctx.h @@ -64,7 +64,7 @@ public: K_(is_dirty), K_(is_inited)); public: - int init_coordinator_ctx(const common::ObIArray &idx_array, + int init_coordinator_ctx(const common::ObIArray &column_ids, ObTableLoadExecCtx *exec_ctx); int init_store_ctx( const table::ObTableLoadArray &partition_id_array, diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp index 8cf6127e02..2ce06fa14d 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp @@ -305,10 +305,15 @@ int ObTableLoadTransBucketWriter::write_for_non_partitioned(SessionContext &sess const int64_t row_count = obj_rows.count(); ObTableLoadBucket *load_bucket = &session_ctx.load_bucket_; for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) { + const ObTableLoadObjRow &row = obj_rows.at(i); bool need_write = false; - if (OB_FAIL(load_bucket->add_row(session_ctx.partition_id_.tablet_id_, - obj_rows.at(i), param_.column_count_, - param_.batch_size_, need_write))) { + if (OB_UNLIKELY(row.count_ != param_.column_count_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected column count not match", KR(ret), K(row.count_), K(param_.column_count_)); + } else if (OB_FAIL(load_bucket->add_row(session_ctx.partition_id_.tablet_id_, + row, + param_.batch_size_, + need_write))) { LOG_WARN("fail to add row", KR(ret)); } else if (need_write && OB_FAIL(write_load_bucket(session_ctx, load_bucket))) { LOG_WARN("fail to write partition bucket", KR(ret)); @@ -333,10 +338,14 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_ part_keys.set_block_allocator(common::ModulePageAllocator(allocator)); row_idxs.set_block_allocator(common::ModulePageAllocator(allocator)); for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) { + const ObTableLoadObjRow &row = obj_rows.at(i); ObNewRow part_key; part_key.count_ = part_key_obj_count; part_key.cells_ = static_cast(allocator.alloc(sizeof(ObObj) * part_key_obj_count)); - if (OB_ISNULL(part_key.cells_)) { + if (OB_UNLIKELY(row.count_ != param_.column_count_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected column count not match", KR(ret), K(row.count_), K(param_.column_count_)); + } else if (OB_ISNULL(part_key.cells_)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", KR(ret)); } else if (OB_FAIL(coordinator_ctx_->partition_calc_.get_part_key(obj_rows.at(i), part_key))) { @@ -373,9 +382,10 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_ } else if (OB_FAIL(get_load_bucket(session_ctx, partition_id, load_bucket))) { LOG_WARN("fail to get partition bucket", KR(ret), K(session_ctx.session_id_), K(partition_id)); - } else if (OB_FAIL(load_bucket->add_row( - partition_id.tablet_id_, row, - param_.column_count_, param_.batch_size_, need_write))) { + } else if (OB_FAIL(load_bucket->add_row(partition_id.tablet_id_, + row, + param_.batch_size_, + need_write))) { LOG_WARN("fail to add row", KR(ret)); } else if (need_write && OB_FAIL(write_load_bucket(session_ctx, load_bucket))) { LOG_WARN("fail to write partition bucket", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index b91ce4ec33..bf80a428fc 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -317,39 +317,37 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id, return ret; } -int ObTableLoadTransStoreWriter::write(int32_t session_id, - const ObTabletID &tablet_id, const ObIArray &row_array) +int ObTableLoadTransStoreWriter::px_write(const ObTabletID &tablet_id, const ObNewRow &row) { int ret = OB_SUCCESS; - int32_t session_count = param_.px_mode_? 1 : param_.write_session_count_; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTableLoadTransStoreWriter not init", KR(ret)); - } else if (OB_UNLIKELY(session_id < 1 || session_id > session_count) || - row_array.empty()) { + } else if (OB_UNLIKELY(!tablet_id.is_valid() || !row.is_valid() || + row.count_ != table_data_desc_->column_count_)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(session_id), K(row_array.empty())); + LOG_WARN("invalid args", KR(ret), K(tablet_id), K(row), KPC(table_data_desc_)); } else { ObTableLoadSequenceNo seq_no(0); // pdml导入的行目前不存在主键冲突,先都用一个默认的seq_no - SessionContext &session_ctx = session_ctx_array_[session_id - 1]; - for (int64_t i = 0; OB_SUCC(ret) && i < row_array.count(); ++i) { - const ObNewRow &row = row_array.at(i); - for (int64_t j = 0; OB_SUCC(ret) && (j < table_data_desc_->column_count_); ++j) { - const ObObj &obj = row.cells_[j]; - if (OB_FAIL(check_support_obj(obj))) { - LOG_WARN("failed to check support obj", KR(ret), K(obj)); - } else if (OB_FAIL(session_ctx.datum_row_.storage_datums_[j].from_obj_enhance(obj))) { - LOG_WARN("fail to from obj enhance", KR(ret), K(obj)); - } - } - if (OB_SUCC(ret)) { - if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, tablet_id, seq_no, session_ctx.datum_row_))) { - LOG_WARN("fail to write row", KR(ret), K(session_id), K(tablet_id)); - } + SessionContext &session_ctx = session_ctx_array_[0]; + for (int64_t i = 0; OB_SUCC(ret) && i < table_data_desc_->column_count_; ++i) { + ObStorageDatum &datum = session_ctx.datum_row_.storage_datums_[i]; + const ObObj &obj = row.cells_[i]; + if (OB_FAIL(check_support_obj(obj))) { + LOG_WARN("failed to check support obj", KR(ret), K(obj)); + } else if (OB_FAIL(datum.from_obj_enhance(obj))) { + LOG_WARN("fail to from obj enhance", KR(ret), K(obj)); } } if (OB_SUCC(ret)) { - ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store_.processed_rows_, row_array.count()); + if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, + tablet_id, + seq_no, + session_ctx.datum_row_))) { + LOG_WARN("fail to write row", KR(ret), K(tablet_id)); + } else { + ATOMIC_AAF(&trans_ctx_->ctx_->job_stat_->store_.processed_rows_, 1); + } } } return ret; diff --git a/src/observer/table_load/ob_table_load_trans_store.h b/src/observer/table_load/ob_table_load_trans_store.h index d8621bf876..5aced261fc 100644 --- a/src/observer/table_load/ob_table_load_trans_store.h +++ b/src/observer/table_load/ob_table_load_trans_store.h @@ -74,8 +74,7 @@ public: public: // 只在对应工作线程中调用, 串行执行 int write(int32_t session_id, const table::ObTableLoadTabletObjRowArray &row_array); - int write(int32_t session_id, const ObTabletID &tablet_id, - const common::ObIArray &row_array); + int px_write(const ObTabletID &tablet_id, const common::ObNewRow &row); int flush(int32_t session_id); int clean_up(int32_t session_id); public: diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index 05b5f6b61b..8651c91c7e 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -339,7 +339,11 @@ static inline bool is_direct_load_retry_err(const int ret) || ret == OB_TASK_EXPIRED || ret == OB_REPLICA_NOT_READABLE || ret == OB_TRANS_CTX_NOT_EXIST + || ret == OB_SCHEMA_ERROR || ret == OB_SCHEMA_EAGAIN + || ret == OB_SCHEMA_NOT_UPTODATE + || ret == OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH + || ret == OB_ERR_REMOTE_SCHEMA_NOT_FULL ; } diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 2f41c73263..c41938bffb 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -69,7 +69,7 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam() method_(ObDirectLoadMethod::INVALID_METHOD), insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE) { - store_column_idxs_.set_tenant_id(MTL_ID()); + column_ids_.set_tenant_id(MTL_ID()); } bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const @@ -90,7 +90,7 @@ bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const (storage::ObDirectLoadInsertMode::INC_REPLACE == insert_mode_ ? sql::ObLoadDupActionType::LOAD_REPLACE == dup_action_ : true) && - data_access_param_.is_valid() && !store_column_idxs_.empty(); + data_access_param_.is_valid() && !column_ids_.empty(); } /** @@ -1738,6 +1738,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) load_stmt_ = &load_stmt; const ObLoadArgument &load_args = load_stmt_->get_load_arguments(); ObSQLSessionInfo *session = nullptr; + ObSchemaGetterGuard *schema_guard = nullptr; int64_t total_line_count = 0; if (OB_UNLIKELY(load_args.file_iter_.count() > ObTableLoadSequenceNo::MAX_DATA_ID)) { @@ -1745,7 +1746,8 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) LOG_WARN("not support file counts more than 65535", KR(ret), K(load_args.file_iter_.count())); FORWARD_USER_ERROR_MSG(ret, "not support file counts %ld more than 65535", load_args.file_iter_.count()); } else if (OB_ISNULL(session = ctx.get_my_session()) || OB_ISNULL(ctx.get_stmt_factory()) || - OB_ISNULL(ctx.get_stmt_factory()->get_query_ctx())) { + OB_ISNULL(ctx.get_stmt_factory()->get_query_ctx()) || OB_ISNULL(ctx_->get_sql_ctx()) || + OB_ISNULL(schema_guard = ctx_->get_sql_ctx()->schema_guard_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ctx is unexpected", KR(ret), K(ctx)); } else if (OB_FAIL(plan_.set_vars(ctx.get_stmt_factory()->get_query_ctx()->variables_))) { @@ -1782,6 +1784,11 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) if (OB_SUCC(ret)) { if (OB_FAIL(init_execute_param())) { LOG_WARN("fail to init execute param", KR(ret), K(ctx), K(load_stmt)); + } else if (OB_FAIL(ObTableLoadService::check_support_direct_load(*schema_guard, + execute_param_.table_id_, + execute_param_.method_, + execute_param_.insert_mode_))) { + LOG_WARN("fail to check support direct load", KR(ret)); } else if (OB_FAIL(init_execute_context())) { LOG_WARN("fail to init execute context", KR(ret), K(ctx), K(load_stmt)); } else { @@ -1960,66 +1967,29 @@ int ObLoadDataDirectImpl::init_execute_param() data_access_param.file_cs_type_ = load_args.file_cs_type_; data_access_param.access_info_ = load_args.access_info_; } - // store_column_idxs_ + // column_ids_ if (OB_SUCC(ret)) { - if (OB_FAIL(init_store_column_idxs(execute_param_.store_column_idxs_))) { - LOG_WARN("fail to init store column idxs", KR(ret)); - } - } - return ret; -} - -int ObLoadDataDirectImpl::init_store_column_idxs(ObIArray &store_column_idxs) -{ - int ret = OB_SUCCESS; - const ObLoadArgument &load_args = load_stmt_->get_load_arguments(); - const ObIArray &field_or_var_list = - load_stmt_->get_field_or_var_list(); - const uint64_t tenant_id = load_args.tenant_id_; - const uint64_t table_id = load_args.table_id_; - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - ObArray column_descs; - column_descs.set_tenant_id(MTL_ID()); - if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, - schema_guard))) { - LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id)); - } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_TABLE_NOT_EXIST; - LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(table_schema->get_column_ids(column_descs))) { - STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema)); - } else { - bool found_column = true; - for (int64_t i = 0; OB_SUCC(ret) && OB_LIKELY(found_column) && i < column_descs.count(); ++i) { - const ObColDesc &col_desc = column_descs.at(i); - const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_); - if (OB_ISNULL(col_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null column schema", KR(ret), K(col_desc)); - } else { - found_column = col_schema->is_hidden(); - } - // 在源数据的列数组中找到对应的列 - for (int64_t j = 0; OB_SUCC(ret) && OB_LIKELY(!found_column) && j < field_or_var_list.count(); - ++j) { - const ObLoadDataStmt::FieldOrVarStruct &field_or_var_struct = field_or_var_list.at(j); - if (col_desc.col_id_ == field_or_var_struct.column_id_) { - found_column = true; - if (OB_FAIL(store_column_idxs.push_back(j))) { - LOG_WARN("fail to push back column desc", KR(ret), K(store_column_idxs), K(i), - K(col_desc), K(j), K(field_or_var_struct)); - } - } - } - } - if (OB_SUCC(ret) && OB_UNLIKELY(!found_column)) { + ObSchemaGetterGuard *schema_guard = ctx_->get_sql_ctx()->schema_guard_; + int64_t column_count = 0; + execute_param_.column_ids_.reset(); + if (OB_FAIL(ObTableLoadSchema::get_user_column_count(*schema_guard, + execute_param_.tenant_id_, + execute_param_.table_id_, + column_count))) { + LOG_WARN("fail to get user column count", KR(ret)); + } else if (OB_UNLIKELY(column_count != field_or_var_list.count())) { ret = OB_NOT_SUPPORTED; - LOG_WARN("not supported incomplete column data", KR(ret), K(store_column_idxs), - K(column_descs), K(field_or_var_list)); - FORWARD_USER_ERROR_MSG(ret, "not supported incomplete column data"); + LOG_WARN("not contain all columns is not supported", KR(ret), K(column_count), + K(field_or_var_list)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < field_or_var_list.count(); ++i) { + const ObLoadDataStmt::FieldOrVarStruct &field_or_var_struct = field_or_var_list.at(i); + if (OB_UNLIKELY(!field_or_var_struct.is_table_column_)) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("var is not supported", KR(ret), K(field_or_var_struct), K(i), K(field_or_var_list)); + } else if (OB_FAIL(execute_param_.column_ids_.push_back(field_or_var_struct.column_id_))) { + LOG_WARN("fail to push back column id", KR(ret)); + } } } return ret; @@ -2037,14 +2007,15 @@ int ObLoadDataDirectImpl::init_execute_context() load_param.session_count_ = execute_param_.parallel_; load_param.batch_size_ = execute_param_.batch_row_count_; load_param.max_error_row_count_ = execute_param_.max_error_rows_; - load_param.column_count_ = execute_param_.store_column_idxs_.count(); + load_param.column_count_ = execute_param_.column_ids_.count(); load_param.need_sort_ = execute_param_.need_sort_; load_param.dup_action_ = execute_param_.dup_action_; load_param.px_mode_ = false; load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_; load_param.method_ = execute_param_.method_; load_param.insert_mode_ = execute_param_.insert_mode_; - if (OB_FAIL(direct_loader_.init(load_param, execute_param_.store_column_idxs_, + if (OB_FAIL(direct_loader_.init(load_param, + execute_param_.column_ids_, &execute_ctx_.exec_ctx_))) { LOG_WARN("fail to init direct loader", KR(ret)); } else if (OB_FAIL(init_logger())) { diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index 27b7168fc9..7e35267415 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -92,7 +92,7 @@ private: "method", storage::ObDirectLoadMethod::get_type_string(method_), "insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_), K_(data_access_param), - K_(store_column_idxs)); + K_(column_ids)); public: uint64_t tenant_id_; uint64_t database_id_; @@ -112,7 +112,7 @@ private: storage::ObDirectLoadMethod::Type method_; storage::ObDirectLoadInsertMode::Type insert_mode_; DataAccessParam data_access_param_; - common::ObArray store_column_idxs_; // Mapping of stored columns to source data columns + ObArray column_ids_; }; struct LoadExecuteContext @@ -440,7 +440,6 @@ private: private: int init_file_iter(); // init execute param - int init_store_column_idxs(common::ObIArray &store_column_idxs); int init_execute_param(); // init execute context int init_logger(); diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp index b9d24cbd8e..65c3e11627 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp @@ -13,12 +13,13 @@ #define USING_LOG_PREFIX SQL_ENG #include "sql/engine/cmd/ob_table_direct_insert_ctx.h" -#include "sql/engine/ob_exec_context.h" -#include "observer/table_load/ob_table_load_exec_ctx.h" -#include "observer/table_load/ob_table_load_struct.h" -#include "observer/table_load/ob_table_load_instance.h" -#include "share/schema/ob_schema_getter_guard.h" #include "observer/omt/ob_tenant.h" +#include "observer/table_load/ob_table_load_exec_ctx.h" +#include "observer/table_load/ob_table_load_instance.h" +#include "observer/table_load/ob_table_load_schema.h" +#include "observer/table_load/ob_table_load_service.h" +#include "observer/table_load/ob_table_load_struct.h" +#include "sql/engine/ob_exec_context.h" namespace oceanbase { @@ -42,12 +43,27 @@ int ObTableDirectInsertCtx::init( const bool enable_inc_replace) { int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + ObSQLSessionInfo *session_info = nullptr; + ObSchemaGetterGuard *schema_guard = nullptr; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObTableDirectInsertCtx init twice", KR(ret)); } else if (OB_ISNULL(exec_ctx)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("exec_ctx cannot be null", KR(ret)); + } else if (OB_ISNULL(session_info = exec_ctx->get_my_session())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected session info is null", KR(ret)); + } else if (OB_ISNULL(exec_ctx->get_sql_ctx())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected sql ctx is null", KR(ret)); + } else if (OB_ISNULL(schema_guard = exec_ctx->get_sql_ctx()->schema_guard_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected schema guard is null", KR(ret)); + } else if (OB_UNLIKELY(session_info->get_ddl_info().is_mview_complete_refresh() && enable_inc_replace)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected mview complete refresh enable inc replace", KR(ret)); } else { is_direct_ = true; if (OB_ISNULL(load_exec_ctx_ = OB_NEWx(ObTableLoadSqlExecCtx, &exec_ctx->get_allocator()))) { @@ -59,24 +75,37 @@ int ObTableDirectInsertCtx::init( LOG_WARN("fail to new ObTableLoadInstance", KR(ret)); } else { load_exec_ctx_->exec_ctx_ = exec_ctx; - ObSEArray store_column_idxs; + ObArray column_ids; omt::ObTenant *tenant = nullptr; - ObSQLSessionInfo *sesssion_info = exec_ctx->get_my_session(); - if (OB_UNLIKELY(sesssion_info->get_ddl_info().is_mview_complete_refresh() && enable_inc_replace)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected mview complete refresh enable inc replace", KR(ret)); - } else if (OB_FAIL(GCTX.omt_->get_tenant(MTL_ID(), tenant))) { + ObDirectLoadMethod::Type method = (is_incremental ? ObDirectLoadMethod::INCREMENTAL : ObDirectLoadMethod::FULL); + ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::INVALID_INSERT_MODE; + if (session_info->get_ddl_info().is_mview_complete_refresh()) { + insert_mode = ObDirectLoadInsertMode::OVERWRITE; + } else if (enable_inc_replace) { + insert_mode = ObDirectLoadInsertMode::INC_REPLACE; + } else { + insert_mode = ObDirectLoadInsertMode::NORMAL; + } + if (OB_FAIL(GCTX.omt_->get_tenant(MTL_ID(), tenant))) { LOG_WARN("fail to get tenant handle", KR(ret), K(MTL_ID())); - } else if (OB_FAIL(init_store_column_idxs(MTL_ID(), table_id, store_column_idxs))) { + } else if (OB_FAIL(ObTableLoadService::check_support_direct_load(*schema_guard, + table_id, + method, + insert_mode))) { + LOG_WARN("fail to check support direct load", KR(ret)); + } else if (OB_FAIL(ObTableLoadSchema::get_column_ids(*schema_guard, + tenant_id, + table_id, + column_ids))) { LOG_WARN("failed to init store column idxs", KR(ret)); } else { ObTableLoadParam param; - param.column_count_ = store_column_idxs.count(); param.tenant_id_ = MTL_ID(); param.table_id_ = table_id; param.batch_size_ = 100; param.parallel_ = parallel; param.session_count_ = parallel; + param.column_count_ = column_ids.count(); param.px_mode_ = true; param.online_opt_stat_gather_ = true; param.need_sort_ = true; @@ -84,15 +113,10 @@ int ObTableDirectInsertCtx::init( param.dup_action_ = (enable_inc_replace ? sql::ObLoadDupActionType::LOAD_REPLACE : sql::ObLoadDupActionType::LOAD_STOP_ON_DUP); param.online_opt_stat_gather_ = is_online_gather_statistics_; - param.method_ = (is_incremental ? ObDirectLoadMethod::INCREMENTAL : ObDirectLoadMethod::FULL); - if (sesssion_info->get_ddl_info().is_mview_complete_refresh()) { - param.insert_mode_ = ObDirectLoadInsertMode::OVERWRITE; - } else if (enable_inc_replace) { - param.insert_mode_ = ObDirectLoadInsertMode::INC_REPLACE; - } else { - param.insert_mode_ = ObDirectLoadInsertMode::NORMAL; - } - if (OB_FAIL(table_load_instance_->init(param, store_column_idxs, load_exec_ctx_))) { + param.method_ = method; + + param.insert_mode_ = insert_mode; + if (OB_FAIL(table_load_instance_->init(param, column_ids, load_exec_ctx_))) { LOG_WARN("failed to init direct loader", KR(ret)); } else { is_inited_ = true; @@ -147,40 +171,5 @@ void ObTableDirectInsertCtx::destroy() is_online_gather_statistics_ = false; } -int ObTableDirectInsertCtx::init_store_column_idxs(const uint64_t tenant_id, - const uint64_t table_id, ObIArray &store_column_idxs) -{ - int ret = OB_SUCCESS; - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - ObSEArray column_descs; - - if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, - schema_guard))) { - LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id)); - } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_TABLE_NOT_EXIST; - LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); - } else if (OB_FAIL(table_schema->get_column_ids(column_descs))) { - STORAGE_LOG(WARN, "fail to get column descs", KR(ret), KPC(table_schema)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && (i < column_descs.count()); ++i) { - const ObColDesc &col_desc = column_descs.at(i); - const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_); - if (OB_ISNULL(col_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null column schema", KR(ret), K(col_desc)); - } else if (col_schema->is_hidden()) { - } else if (OB_FAIL(store_column_idxs.push_back(i))) { - LOG_WARN("failed to push back store column idxs", KR(ret), K(i)); - } - } - } - - return ret; -} - } // namespace sql } // namespace oceanbase diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h index ce965c8030..a3d2055c49 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h @@ -66,9 +66,6 @@ public: return ddl_task_id_; } -private: - int init_store_column_idxs(const uint64_t tenant_id, const uint64_t table_id, - common::ObIArray &store_column_idxs); private: observer::ObTableLoadSqlExecCtx *load_exec_ctx_; observer::ObTableLoadInstance *table_load_instance_; diff --git a/src/storage/ls/ob_ls_tablet_service.cpp b/src/storage/ls/ob_ls_tablet_service.cpp index 945b1d4f0f..2a33196bb7 100644 --- a/src/storage/ls/ob_ls_tablet_service.cpp +++ b/src/storage/ls/ob_ls_tablet_service.cpp @@ -69,10 +69,9 @@ #include "storage/slog/ob_storage_log_struct.h" #include "storage/slog/ob_storage_logger.h" #include "share/ob_lob_access_utils.h" -#include "observer/table_load/ob_table_load_table_ctx.h" -#include "observer/table_load/ob_table_load_coordinator.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_store.h" +#include "observer/table_load/ob_table_load_store_trans_px_writer.h" #include "observer/ob_server_event_history_table_operator.h" #include "storage/high_availability/ob_storage_ha_utils.h" #include "storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.h" @@ -2667,19 +2666,17 @@ int ObLSTabletService::insert_rows( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(ctx), K(dml_param), K(column_ids), KP(row_iter)); } else if (dml_param.is_direct_insert()) { // direct-insert mode - const ObTableSchemaParam::Columns &columns = dml_param.table_param_->get_data_table().get_columns(); - bool is_heap_table = columns.at(0)->is_hidden(); if (OB_FAIL(direct_insert_rows(dml_param.table_param_->get_data_table().get_table_id(), dml_param.direct_insert_task_id_, ctx.tablet_id_, - is_heap_table, + column_ids, row_iter, afct_num))) { LOG_WARN("failed to insert rows direct", KR(ret), K(dml_param.table_param_->get_data_table().get_table_id()), K(dml_param.direct_insert_task_id_), K(ctx.tablet_id_), - K(is_heap_table)); + K(column_ids)); } } else { ObArenaAllocator lob_allocator(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); @@ -2774,7 +2771,7 @@ int ObLSTabletService::direct_insert_rows( const uint64_t table_id, const int64_t task_id, const ObTabletID &tablet_id, - const bool is_heap_table, + const ObIArray &column_ids, ObNewRowIterator *row_iter, int64_t &affected_rows) { @@ -2790,35 +2787,26 @@ int ObLSTabletService::direct_insert_rows( trans_id.segment_id_ = task_id; trans_id.trans_gid_ = 1; ObTableLoadStore store(table_ctx); + ObTableLoadStoreTransPXWriter writer; if (OB_FAIL(store.init())) { LOG_WARN("fail to init store", KR(ret)); - } else if (OB_FAIL(store.px_check_for_write(tablet_id))) { - LOG_WARN("fail to check for write", KR(ret), K(tablet_id)); + } else if (OB_FAIL(store.px_get_trans_writer(trans_id, writer))) { + LOG_WARN("fail to get trans writer", KR(ret), K(trans_id)); + } else if (OB_FAIL(writer.prepare_write(tablet_id, column_ids))) { + LOG_WARN("fail to prepare write", KR(ret), K(tablet_id), K(column_ids)); } while (OB_SUCC(ret) && OB_SUCC(get_next_rows(row_iter, rows, row_count))) { - if (row_count <= 0) { + if (OB_UNLIKELY(row_count <= 0)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("row_count should be greater than 0", K(ret)); } else { - common::ObArray row_array; for (int64_t i = 0; OB_SUCC(ret) && (i < row_count); ++i) { - ObNewRow new_row; - if (is_heap_table) { - new_row.assign(rows[i].cells_ + 1, rows[i].count_ - 1); + const ObNewRow &row = rows[i]; + if (OB_FAIL(writer.write(row))) { + LOG_WARN("fail to write", KR(ret), K(i), K(row)); } else { - new_row.assign(rows[i].cells_, rows[i].count_); - } - if (OB_FAIL(row_array.push_back(new_row))) { - LOG_WARN("failed to push back row to row_array", KR(ret), K(i), K(new_row)); - } - } - if (OB_SUCC(ret)) { - if (OB_FAIL(store.px_write(trans_id, tablet_id, row_array))) { - LOG_WARN("failed to write to store", KR(ret), K(trans_id), - K(table_id), K(tablet_id), K(row_array)); - } else { - affected_rows += row_count; + ++affected_rows; } } } diff --git a/src/storage/ls/ob_ls_tablet_service.h b/src/storage/ls/ob_ls_tablet_service.h index d633803dd4..890291ea25 100644 --- a/src/storage/ls/ob_ls_tablet_service.h +++ b/src/storage/ls/ob_ls_tablet_service.h @@ -807,7 +807,7 @@ private: int direct_insert_rows(const uint64_t table_id, const int64_t task_id, const common::ObTabletID &tablet_id, - const bool is_heap_table, + const common::ObIArray &column_ids, common::ObNewRowIterator *row_iter, int64_t &affected_rows); static int check_is_gencol_check_failed(const ObRelativeTable &data_table,