From 964cb5dd52d56957c537c3a4a241ed45ff8d26a4 Mon Sep 17 00:00:00 2001 From: leftgeek <1094669802@qq.com> Date: Tue, 18 Jun 2024 09:16:21 +0000 Subject: [PATCH] fix the bug of direct load in mv complete refresh --- .../ob_table_load_client_service.cpp | 62 ------------------- .../table_load/ob_table_load_client_service.h | 7 --- .../table_load/ob_table_load_manager.cpp | 45 -------------- .../table_load/ob_table_load_manager.h | 7 --- .../table_load/ob_table_load_redef_table.cpp | 30 ++++++--- .../table_load/ob_table_load_service.cpp | 13 ---- .../table_load/ob_table_load_service.h | 1 - .../table_load/ob_table_load_struct.h | 37 ----------- .../code_generator/ob_static_engine_cg.cpp | 2 +- src/sql/das/ob_das_insert_op.cpp | 1 + .../engine/cmd/ob_table_direct_insert_ctx.cpp | 3 + .../engine/cmd/ob_table_direct_insert_ctx.h | 16 ++--- .../cmd/ob_table_direct_insert_service.cpp | 14 +++-- .../cmd/ob_table_direct_insert_service.h | 6 +- .../static/ob_px_multi_part_insert_op.cpp | 19 +++--- src/storage/ls/ob_ls_tablet_service.cpp | 9 ++- src/storage/ls/ob_ls_tablet_service.h | 3 +- 17 files changed, 61 insertions(+), 214 deletions(-) diff --git a/src/observer/table_load/ob_table_load_client_service.cpp b/src/observer/table_load/ob_table_load_client_service.cpp index 2e1a55cf6e..4d2f09dcf5 100644 --- a/src/observer/table_load/ob_table_load_client_service.cpp +++ b/src/observer/table_load/ob_table_load_client_service.cpp @@ -55,9 +55,6 @@ int ObTableLoadClientService::init() if (OB_FAIL( client_task_map_.create(bucket_num, "TLD_ClientTask", "TLD_ClientTask", MTL_ID()))) { LOG_WARN("fail to create hashmap", KR(ret), K(bucket_num)); - } else if (OB_FAIL(client_task_index_map_.create(bucket_num, "TLD_ClientTask", "TLD_ClientTask", - MTL_ID()))) { - LOG_WARN("fail to create hashmap", KR(ret), K(bucket_num)); } else if (OB_FAIL(client_task_brief_map_.init("TLD_ClientBrief", MTL_ID()))) { LOG_WARN("fail to init link hashmap", KR(ret)); } else { @@ -181,23 +178,6 @@ int ObTableLoadClientService::get_task(const ObTableLoadUniqueKey &key, return ret; } -int ObTableLoadClientService::get_task(const ObTableLoadKey &key, - ObTableLoadClientTask *&client_task) -{ - int ret = OB_SUCCESS; - ObTableLoadService *service = nullptr; - if (OB_ISNULL(service = MTL(ObTableLoadService *))) { - ret = OB_ERR_SYS; - LOG_WARN("null table load service", KR(ret)); - } else { - if (OB_FAIL( - service->get_client_service().get_client_task_by_table_id(key.table_id_, client_task))) { - LOG_WARN("fail to get client task", KR(ret), K(key)); - } - } - return ret; -} - int ObTableLoadClientService::add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task) { @@ -209,7 +189,6 @@ int ObTableLoadClientService::add_client_task(const ObTableLoadUniqueKey &key, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(key), KP(client_task)); } else { - const uint64_t table_id = key.table_id_; obsys::ObWLockGuard guard(rwlock_); if (OB_FAIL(client_task_map_.set_refactored(key, client_task))) { if (OB_UNLIKELY(OB_HASH_EXIST != ret)) { @@ -217,15 +196,6 @@ int ObTableLoadClientService::add_client_task(const ObTableLoadUniqueKey &key, } else { ret = OB_ENTRY_EXIST; } - } - // force update client task index - else if (OB_FAIL(client_task_index_map_.set_refactored(table_id, client_task, 1))) { - LOG_WARN("fail to set refactored", KR(ret), K(table_id)); - // erase from client task map, avoid wild pointer is been use - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(client_task_map_.erase_refactored(key))) { - LOG_WARN("fail to erase refactored", KR(tmp_ret), K(key)); - } } else { client_task->inc_ref_count(); // hold by map } @@ -244,7 +214,6 @@ int ObTableLoadClientService::remove_client_task(const ObTableLoadUniqueKey &key ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(key)); } else { - const uint64_t table_id = key.table_id_; HashMapEraseIfEqual erase_if_equal(client_task); bool is_erased = false; obsys::ObWLockGuard guard(rwlock_); @@ -258,14 +227,6 @@ int ObTableLoadClientService::remove_client_task(const ObTableLoadUniqueKey &key ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected client task", KR(ret), KPC(client_task)); } - // try remove index - else if (OB_FAIL(client_task_index_map_.erase_if(table_id, erase_if_equal, is_erased))) { - if (OB_UNLIKELY(OB_HASH_NOT_EXIST == ret)) { - LOG_WARN("fail to get refactored", KR(ret), K(table_id)); - } else { - ret = OB_SUCCESS; - } - } if (OB_SUCC(ret)) { client_task->dec_ref_count(); } @@ -344,29 +305,6 @@ int ObTableLoadClientService::get_client_task(const ObTableLoadUniqueKey &key, return ret; } -int ObTableLoadClientService::get_client_task_by_table_id(uint64_t table_id, - ObTableLoadClientTask *&client_task) -{ - int ret = OB_SUCCESS; - client_task = nullptr; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadClientService not init", KR(ret), KP(this)); - } else { - obsys::ObRLockGuard guard(rwlock_); - if (OB_FAIL(client_task_index_map_.get_refactored(table_id, client_task))) { - if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { - LOG_WARN("fail to get refactored", KR(ret), K(table_id)); - } else { - ret = OB_ENTRY_NOT_EXIST; - } - } else { - client_task->inc_ref_count(); - } - } - return ret; -} - int64_t ObTableLoadClientService::get_client_task_count() const { obsys::ObRLockGuard guard(rwlock_); diff --git a/src/observer/table_load/ob_table_load_client_service.h b/src/observer/table_load/ob_table_load_client_service.h index 20b59bd15b..65baf99a16 100644 --- a/src/observer/table_load/ob_table_load_client_service.h +++ b/src/observer/table_load/ob_table_load_client_service.h @@ -45,13 +45,11 @@ public: static int add_task(ObTableLoadClientTask *client_task); static int remove_task(ObTableLoadClientTask *client_task); static int get_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task); - static int get_task(const ObTableLoadKey &key, ObTableLoadClientTask *&client_task); int add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task); int remove_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task); int get_all_client_task(common::ObIArray &client_task_array); int get_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task); - int get_client_task_by_table_id(uint64_t table_id, ObTableLoadClientTask *&client_task); int64_t get_client_task_count() const; void purge_client_task(); @@ -81,10 +79,6 @@ private: typedef common::hash::ObHashMap ClientTaskMap; - // table_id => client_task - typedef common::hash::ObHashMap - ClientTaskIndexMap; // key => client_task_brief typedef common::ObLinkHashMap ClientTaskBriefMap; @@ -121,7 +115,6 @@ private: private: mutable obsys::ObRWLock rwlock_; ClientTaskMap client_task_map_; - ClientTaskIndexMap client_task_index_map_; ClientTaskBriefMap client_task_brief_map_; // thread safety int64_t next_task_id_; bool is_inited_; diff --git a/src/observer/table_load/ob_table_load_manager.cpp b/src/observer/table_load/ob_table_load_manager.cpp index 5d26a18b96..e9553e68da 100644 --- a/src/observer/table_load/ob_table_load_manager.cpp +++ b/src/observer/table_load/ob_table_load_manager.cpp @@ -44,9 +44,6 @@ int ObTableLoadManager::init() if (OB_FAIL( table_ctx_map_.create(bucket_num, "TLD_TableCtxMgr", "TLD_TableCtxMgr", MTL_ID()))) { LOG_WARN("fail to create hashmap", KR(ret), K(bucket_num)); - } else if (OB_FAIL(table_ctx_index_map_.create(bucket_num, "TLD_TblCtxIMgr", "TLD_TblCtxIMgr", - MTL_ID()))) { - LOG_WARN("fail to create hashmap", KR(ret), K(bucket_num)); } else { is_inited_ = true; } @@ -68,7 +65,6 @@ int ObTableLoadManager::add_table_ctx(const ObTableLoadUniqueKey &key, ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected dirty table ctx", KR(ret), KP(table_ctx)); } else { - const uint64_t table_id = key.table_id_; obsys::ObWLockGuard guard(rwlock_); if (OB_FAIL(table_ctx_map_.set_refactored(key, table_ctx))) { if (OB_UNLIKELY(OB_HASH_EXIST != ret)) { @@ -76,15 +72,6 @@ int ObTableLoadManager::add_table_ctx(const ObTableLoadUniqueKey &key, } else { ret = OB_ENTRY_EXIST; } - } - // force update table ctx index - else if (OB_FAIL(table_ctx_index_map_.set_refactored(table_id, table_ctx, 1))) { - LOG_WARN("fail to set refactored", KR(ret), K(table_id)); - // erase from table ctx map, avoid wild pointer is been use - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(table_ctx_map_.erase_refactored(key))) { - LOG_WARN("fail to erase refactored", KR(tmp_ret), K(key)); - } } else { table_ctx->inc_ref_count(); } @@ -104,7 +91,6 @@ int ObTableLoadManager::remove_table_ctx(const ObTableLoadUniqueKey &key, LOG_WARN("invalid args", KR(ret), K(key), KP(table_ctx)); } else { { - const uint64_t table_id = key.table_id_; HashMapEraseIfEqual erase_if_equal(table_ctx); bool is_erased = false; obsys::ObWLockGuard guard(rwlock_); @@ -119,14 +105,6 @@ int ObTableLoadManager::remove_table_ctx(const ObTableLoadUniqueKey &key, ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected table ctx not in manager", KR(ret), K(key), KPC(table_ctx)); } - // try remove table ctx index - else if (OB_FAIL(table_ctx_index_map_.erase_if(table_id, erase_if_equal, is_erased))) { - if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { - LOG_WARN("fail to get refactored", KR(ret), K(table_id)); - } else { - ret = OB_SUCCESS; - } - } } if (OB_SUCC(ret)) { if (OB_FAIL(add_dirty_list(table_ctx))) { @@ -190,29 +168,6 @@ int ObTableLoadManager::get_table_ctx(const ObTableLoadUniqueKey &key, return ret; } -int ObTableLoadManager::get_table_ctx_by_table_id(uint64_t table_id, - ObTableLoadTableCtx *&table_ctx) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this)); - } else { - table_ctx = nullptr; - obsys::ObRLockGuard guard(rwlock_); - if (OB_FAIL(table_ctx_index_map_.get_refactored(table_id, table_ctx))) { - if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { - LOG_WARN("fail to get refactored", KR(ret), K(table_id)); - } else { - ret = OB_ENTRY_NOT_EXIST; - } - } else { - table_ctx->inc_ref_count(); - } - } - return ret; -} - int ObTableLoadManager::get_inactive_table_ctx_list( ObIArray &table_ctx_array) { diff --git a/src/observer/table_load/ob_table_load_manager.h b/src/observer/table_load/ob_table_load_manager.h index 895fa81bbc..d34b168af6 100644 --- a/src/observer/table_load/ob_table_load_manager.h +++ b/src/observer/table_load/ob_table_load_manager.h @@ -36,8 +36,6 @@ public: int get_all_table_ctx(common::ObIArray &table_ctx_array); // table ctx holds a reference count int get_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *&table_ctx); - // table ctx holds a reference count - int get_table_ctx_by_table_id(uint64_t table_id, ObTableLoadTableCtx *&table_ctx); // all table ctx hold a reference count int get_inactive_table_ctx_list(common::ObIArray &table_ctx_array); void put_table_ctx(ObTableLoadTableCtx *table_ctx); @@ -52,10 +50,6 @@ private: typedef common::hash::ObHashMap TableCtxMap; - // table_id => table_ctx - typedef common::hash::ObHashMap - TableCtxIndexMap; class HashMapEraseIfEqual { @@ -76,7 +70,6 @@ private: private: mutable obsys::ObRWLock rwlock_; TableCtxMap table_ctx_map_; - TableCtxIndexMap table_ctx_index_map_; // index of the latest task // for release table ctx in background mutable lib::ObMutex mutex_; common::ObDList dirty_list_; diff --git a/src/observer/table_load/ob_table_load_redef_table.cpp b/src/observer/table_load/ob_table_load_redef_table.cpp index 989c1a42cd..bddd86d618 100644 --- a/src/observer/table_load/ob_table_load_redef_table.cpp +++ b/src/observer/table_load/ob_table_load_redef_table.cpp @@ -35,16 +35,26 @@ int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(arg)); } else if (session_info.get_ddl_info().is_mview_complete_refresh()) { - res.task_id_ = session_info.get_cur_exec_ctx()->get_table_direct_insert_ctx().get_ddl_task_id(); - share::ObDDLTaskStatus status = share::ObDDLTaskStatus::PREPARE; - if (OB_FAIL(ObDDLUtil::get_data_information(arg.tenant_id_, - res.task_id_, - res.data_format_version_, - res.snapshot_version_, - status, - res.dest_table_id_, - res.schema_version_))) { - LOG_WARN("fail to get ddl task info", KR(ret), K(arg)); + ObExecContext *exec_ctx = session_info.get_cur_exec_ctx(); + const ObPhysicalPlanCtx *plan_ctx = nullptr; + const ObPhysicalPlan *plan = nullptr; + if (OB_ISNULL(exec_ctx) + || OB_ISNULL(plan_ctx = exec_ctx->get_physical_plan_ctx()) + || OB_ISNULL(plan = plan_ctx->get_phy_plan())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null physical plan (ctx)", KR(ret), KP(plan_ctx), KP(plan)); + } else { + res.task_id_ = plan->get_ddl_task_id(); + share::ObDDLTaskStatus status = share::ObDDLTaskStatus::PREPARE; + if (OB_FAIL(ObDDLUtil::get_data_information(arg.tenant_id_, + res.task_id_, + res.data_format_version_, + res.snapshot_version_, + status, + res.dest_table_id_, + res.schema_version_))) { + LOG_WARN("fail to get ddl task info", KR(ret), K(arg)); + } } } else { const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts(); diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index 43a29abb3a..892653f336 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -688,19 +688,6 @@ int ObTableLoadService::get_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTabl return ret; } -int ObTableLoadService::get_ctx(const ObTableLoadKey &key, ObTableLoadTableCtx *&table_ctx) -{ - int ret = OB_SUCCESS; - ObTableLoadService *service = nullptr; - if (OB_ISNULL(service = MTL(ObTableLoadService *))) { - ret = OB_ERR_SYS; - LOG_WARN("null table load service", KR(ret)); - } else { - ret = service->get_manager().get_table_ctx_by_table_id(key.table_id_, table_ctx); - } - return ret; -} - void ObTableLoadService::put_ctx(ObTableLoadTableCtx *table_ctx) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_service.h b/src/observer/table_load/ob_table_load_service.h index b855c10fa9..16654330dc 100644 --- a/src/observer/table_load/ob_table_load_service.h +++ b/src/observer/table_load/ob_table_load_service.h @@ -58,7 +58,6 @@ public: // get ctx static int get_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *&table_ctx); // get ctx by table_id - static int get_ctx(const ObTableLoadKey &key, ObTableLoadTableCtx *&table_ctx); static void put_ctx(ObTableLoadTableCtx *table_ctx); // for direct load control api diff --git a/src/observer/table_load/ob_table_load_struct.h b/src/observer/table_load/ob_table_load_struct.h index 08434c2f45..180afee3cd 100644 --- a/src/observer/table_load/ob_table_load_struct.h +++ b/src/observer/table_load/ob_table_load_struct.h @@ -28,43 +28,6 @@ class ObObj; namespace observer { -struct ObTableLoadKey -{ -public: - ObTableLoadKey() : tenant_id_(common::OB_INVALID_ID), table_id_(common::OB_INVALID_ID) {} - ObTableLoadKey(uint64_t tenant_id, uint64_t table_id) : tenant_id_(tenant_id), table_id_(table_id) {} - bool is_valid() const - { - return common::OB_INVALID_ID != tenant_id_ && common::OB_INVALID_ID != table_id_; - } - bool operator==(const ObTableLoadKey &other) const - { - return (tenant_id_ == other.tenant_id_ && table_id_ == other.table_id_); - } - bool operator!=(const ObTableLoadKey &other) const - { - return !(*this == other); - } - uint64_t hash() const - { - uint64_t hash_val = common::murmurhash(&tenant_id_, sizeof(tenant_id_), 0); - hash_val = common::murmurhash(&table_id_, sizeof(table_id_), hash_val); - return hash_val; - } - int compare(const ObTableLoadKey &other) const - { - if (tenant_id_ != other.tenant_id_) { - return (tenant_id_ > other.tenant_id_ ? 1 : -1); - } else { - return (table_id_ != other.table_id_ ? (table_id_ > other.table_id_ ? 1 : -1) : 0); - } - } - TO_STRING_KV(K_(tenant_id), K_(table_id)); -public: - uint64_t tenant_id_; - uint64_t table_id_; -}; - struct ObTableLoadUniqueKey { OB_UNIS_VERSION(1); diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 1d7d26fb57..8327c0a9d9 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -8090,7 +8090,7 @@ int ObStaticEngineCG::set_other_properties(const ObLogPlan &log_plan, ObPhysical int64_t ddl_task_id = 0; const ObOptParamHint *opt_params = &log_plan.get_stmt()->get_query_ctx()->get_global_hint().opt_params_; OZ(opt_params->get_integer_opt_param(ObOptParamHint::DDL_TASK_ID, ddl_task_id)); - log_plan.get_optimizer_context().get_exec_ctx()->get_table_direct_insert_ctx().set_ddl_task_id(ddl_task_id); + phy_plan.set_ddl_task_id(ddl_task_id); } } } diff --git a/src/sql/das/ob_das_insert_op.cpp b/src/sql/das/ob_das_insert_op.cpp index be156e24ec..12d65aeef2 100644 --- a/src/sql/das/ob_das_insert_op.cpp +++ b/src/sql/das/ob_das_insert_op.cpp @@ -53,6 +53,7 @@ int ObDASIndexDMLAdaptor::write_rows(cons int ret = OB_SUCCESS; ObAccessService *as = MTL(ObAccessService *); dml_param_.direct_insert_task_id_ = rtdef.direct_insert_task_id_; + dml_param_.ddl_task_id_ = rtdef.ddl_task_id_; if (ctdef.table_param_.get_data_table().is_mlog_table() && !ctdef.is_access_mlog_as_master_table_) { diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp index 91e6b01bab..d77d4a307d 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp @@ -19,6 +19,7 @@ #include "observer/table_load/ob_table_load_schema.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_struct.h" +#include "observer/table_load/ob_table_load_table_ctx.h" #include "sql/engine/ob_exec_context.h" namespace oceanbase @@ -37,6 +38,7 @@ ObTableDirectInsertCtx::~ObTableDirectInsertCtx() int ObTableDirectInsertCtx::init( ObExecContext *exec_ctx, + ObPhysicalPlan &phy_plan, const uint64_t table_id, const int64_t parallel, const bool is_incremental, @@ -126,6 +128,7 @@ int ObTableDirectInsertCtx::init( if (OB_FAIL(table_load_instance_->init(param, column_ids, load_exec_ctx_))) { LOG_WARN("failed to init direct loader", KR(ret)); } else { + phy_plan.set_ddl_task_id(table_load_instance_->get_table_ctx()->ddl_param_.task_id_); is_inited_ = true; LOG_DEBUG("succeeded to init direct loader", K(param)); } diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h index d307c673e3..5c7ce8efbd 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h @@ -34,12 +34,13 @@ public: table_load_instance_(nullptr), is_inited_(false), is_direct_(false), - is_online_gather_statistics_(false), - ddl_task_id_(0) {} + is_online_gather_statistics_(false){} ~ObTableDirectInsertCtx(); - TO_STRING_KV(K_(is_inited)); + TO_STRING_KV(K_(is_inited), K_(is_direct), + K_(is_online_gather_statistics)); public: int init(sql::ObExecContext *exec_ctx, + sql::ObPhysicalPlan &phy_plan, const uint64_t table_id, const int64_t parallel, const bool is_incremental, @@ -59,14 +60,6 @@ public: is_online_gather_statistics_ = is_online_gather_statistics; } - void set_ddl_task_id(const int64_t ddl_task_id) { - ddl_task_id_ = ddl_task_id; - } - - int64_t get_ddl_task_id() const { - return ddl_task_id_; - } - private: int get_compressor_type(const uint64_t tenant_id, const uint64_t table_id, const int64_t parallel, ObCompressorType &compressor_type); @@ -76,7 +69,6 @@ private: bool is_inited_; bool is_direct_; //indict whether the plan is direct load plan including insert into append and load data direct bool is_online_gather_statistics_; - int64_t ddl_task_id_; }; } // namespace observer } // namespace oceanbase diff --git a/src/sql/engine/cmd/ob_table_direct_insert_service.cpp b/src/sql/engine/cmd/ob_table_direct_insert_service.cpp index c616af58d8..cd6abfaab9 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_service.cpp +++ b/src/sql/engine/cmd/ob_table_direct_insert_service.cpp @@ -62,7 +62,7 @@ int ObTableDirectInsertService::start_direct_insert(ObExecContext &ctx, ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx(); uint64_t table_id = phy_plan.get_append_table_id(); int64_t parallel = phy_plan.get_px_dop(); - if (OB_FAIL(table_direct_insert_ctx.init(&ctx, table_id, parallel, is_inc_direct_load, is_inc_replace, is_insert_overwrite))) { + if (OB_FAIL(table_direct_insert_ctx.init(&ctx, phy_plan, table_id, parallel, is_inc_direct_load, is_inc_replace, is_insert_overwrite))) { LOG_WARN("failed to init table direct insert ctx", KR(ret), K(table_id), K(parallel), K(is_inc_direct_load), K(is_inc_replace), K(is_insert_overwrite)); } @@ -95,11 +95,12 @@ int ObTableDirectInsertService::finish_direct_insert(ObExecContext &ctx, } int ObTableDirectInsertService::open_task(const uint64_t table_id, - const int64_t task_id, + const int64_t px_task_id, + const int64_t ddl_task_id, ObTableLoadTableCtx *&table_ctx) { int ret = OB_SUCCESS; - ObTableLoadKey key(MTL_ID(), table_id); + ObTableLoadUniqueKey key(table_id, ddl_task_id); if (OB_NOT_NULL(table_ctx)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("table_ctx should be null", KR(ret), KP(table_ctx)); @@ -115,7 +116,7 @@ int ObTableDirectInsertService::open_task(const uint64_t table_id, LOG_WARN("not the master of store", KR(ret), K(key)); } else { table::ObTableLoadTransId trans_id; - trans_id.segment_id_ = task_id; + trans_id.segment_id_ = px_task_id; trans_id.trans_gid_ = 1; ObTableLoadStore store(table_ctx); if (OB_FAIL(store.init())) { @@ -132,14 +133,15 @@ int ObTableDirectInsertService::open_task(const uint64_t table_id, } int ObTableDirectInsertService::close_task(const uint64_t table_id, - const int64_t task_id, + const int64_t px_task_id, + const int64_t ddl_task_id, ObTableLoadTableCtx *table_ctx, const int error_code) { int ret = OB_SUCCESS; if (OB_NOT_NULL(table_ctx)) { table::ObTableLoadTransId trans_id; - trans_id.segment_id_ = task_id; + trans_id.segment_id_ = px_task_id; trans_id.trans_gid_ = 1; if (OB_SUCC(error_code)) { ObTableLoadStore store(table_ctx); diff --git a/src/sql/engine/cmd/ob_table_direct_insert_service.h b/src/sql/engine/cmd/ob_table_direct_insert_service.h index ad77af5dfa..6507d3c260 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_service.h +++ b/src/sql/engine/cmd/ob_table_direct_insert_service.h @@ -37,10 +37,12 @@ public: static int finish_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan, const bool commit); // each insert-task is processed in a single thread and is wrapped by a table load trans static int open_task(const uint64_t table_id, - const int64_t task_id, + const int64_t px_task_id, + const int64_t ddl_task_id, observer::ObTableLoadTableCtx *&table_ctx); static int close_task(const uint64_t table_id, - const int64_t task_id, + const int64_t px_task_id, + const int64_t ddl_task_id, observer::ObTableLoadTableCtx *table_ctx, const int error_code = OB_SUCCESS); }; diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp index 6200051162..a4813f0056 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp @@ -52,12 +52,15 @@ int ObPxMultiPartInsertOp::inner_open() if (OB_SUCC(ret)) { const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan(); if (ObTableDirectInsertService::is_direct_insert(*plan)) { - int64_t task_id = ctx_.get_px_task_id() + 1; - if (OB_FAIL(ObTableDirectInsertService::open_task(plan->get_append_table_id(), task_id, table_ctx_))) { + int64_t px_task_id = ctx_.get_px_task_id() + 1; + int64_t ddl_task_id = plan->get_ddl_task_id(); + if (OB_FAIL(ObTableDirectInsertService::open_task( + plan->get_append_table_id(), px_task_id, ddl_task_id, table_ctx_))) { LOG_WARN("failed to open table direct insert task", KR(ret), - K(plan->get_append_table_id()), K(task_id)); + K(plan->get_append_table_id()), K(px_task_id), K(ddl_task_id)); } else { - ins_rtdef_.das_rtdef_.direct_insert_task_id_ = task_id; + ins_rtdef_.das_rtdef_.direct_insert_task_id_ = px_task_id; + ins_rtdef_.das_rtdef_.ddl_task_id_ = ddl_task_id; } } } @@ -107,14 +110,16 @@ int ObPxMultiPartInsertOp::inner_close() int tmp_ret = OB_SUCCESS; const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan(); if (ObTableDirectInsertService::is_direct_insert(*plan)) { - int64_t task_id = ctx_.get_px_task_id() + 1; + int64_t px_task_id = ctx_.get_px_task_id() + 1; + int64_t ddl_task_id = plan->get_ddl_task_id(); int error_code = (static_cast(input_))->get_error_code(); if (OB_TMP_FAIL(ObTableDirectInsertService::close_task(plan->get_append_table_id(), - task_id, + px_task_id, + ddl_task_id, table_ctx_, error_code))) { LOG_WARN("failed to close table direct insert task", KR(tmp_ret), - K(plan->get_append_table_id()), K(task_id), K(error_code)); + K(plan->get_append_table_id()), K(px_task_id), K(ddl_task_id), K(error_code)); } } if (OB_FAIL(ObTableModifyOp::inner_close())) { diff --git a/src/storage/ls/ob_ls_tablet_service.cpp b/src/storage/ls/ob_ls_tablet_service.cpp index cba3813bb5..41346bfe2b 100644 --- a/src/storage/ls/ob_ls_tablet_service.cpp +++ b/src/storage/ls/ob_ls_tablet_service.cpp @@ -2631,6 +2631,7 @@ int ObLSTabletService::insert_rows( } else if (dml_param.is_direct_insert()) { // direct-insert mode if (OB_FAIL(direct_insert_rows(dml_param.table_param_->get_data_table().get_table_id(), dml_param.direct_insert_task_id_, + dml_param.ddl_task_id_, ctx.tablet_id_, column_ids, row_iter, @@ -2638,6 +2639,7 @@ int ObLSTabletService::insert_rows( LOG_WARN("failed to insert rows direct", KR(ret), K(dml_param.table_param_->get_data_table().get_table_id()), K(dml_param.direct_insert_task_id_), + K(dml_param.ddl_task_id_), K(ctx.tablet_id_), K(column_ids)); } @@ -2732,7 +2734,8 @@ int ObLSTabletService::insert_rows( int ObLSTabletService::direct_insert_rows( const uint64_t table_id, - const int64_t task_id, + const int64_t px_task_id, + const int64_t ddl_task_id, const ObTabletID &tablet_id, const ObIArray &column_ids, ObNewRowIterator *row_iter, @@ -2740,14 +2743,14 @@ int ObLSTabletService::direct_insert_rows( { int ret = OB_SUCCESS; ObTableLoadTableCtx *table_ctx = nullptr; - ObTableLoadKey key(MTL_ID(), table_id); + ObTableLoadUniqueKey key(table_id, ddl_task_id); if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) { LOG_WARN("fail to get table ctx", KR(ret), K(key)); } else { int64_t row_count = 0; ObNewRow *rows = nullptr; table::ObTableLoadTransId trans_id; - trans_id.segment_id_ = task_id; + trans_id.segment_id_ = px_task_id; trans_id.trans_gid_ = 1; ObTableLoadStore store(table_ctx); ObTableLoadStoreTransPXWriter writer; diff --git a/src/storage/ls/ob_ls_tablet_service.h b/src/storage/ls/ob_ls_tablet_service.h index 28de6ffcfb..79d751818a 100644 --- a/src/storage/ls/ob_ls_tablet_service.h +++ b/src/storage/ls/ob_ls_tablet_service.h @@ -774,7 +774,8 @@ private: bool &is_same); private: int direct_insert_rows(const uint64_t table_id, - const int64_t task_id, + const int64_t px_task_id, + const int64_t ddl_task_id, const common::ObTabletID &tablet_id, const common::ObIArray &column_ids, common::ObNewRowIterator *row_iter,