diff --git a/src/sql/engine/cmd/ob_table_direct_insert_service.cpp b/src/sql/engine/cmd/ob_table_direct_insert_service.cpp index 1e39a50ec2..8d010bd924 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_service.cpp +++ b/src/sql/engine/cmd/ob_table_direct_insert_service.cpp @@ -74,12 +74,16 @@ int ObTableDirectInsertService::finish_direct_insert(ObExecContext &ctx, return ret; } -int ObTableDirectInsertService::open_task(const uint64_t table_id, const int64_t task_id) +int ObTableDirectInsertService::open_task(const uint64_t table_id, + const int64_t task_id, + ObTableLoadTableCtx *&table_ctx) { int ret = OB_SUCCESS; - ObTableLoadTableCtx *table_ctx = nullptr; ObTableLoadKey key(MTL_ID(), table_id); - if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) { + if (OB_NOT_NULL(table_ctx)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("table_ctx should be null", KR(ret), KP(table_ctx)); + } else if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) { if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { LOG_WARN("fail to get table ctx", KR(ret), K(key)); } else { @@ -100,7 +104,7 @@ int ObTableDirectInsertService::open_task(const uint64_t table_id, const int64_t LOG_WARN("fail to start direct load trans", KR(ret), K(trans_id)); } } - if (OB_NOT_NULL(table_ctx)) { + if (OB_FAIL(ret) && OB_NOT_NULL(table_ctx)) { ObTableLoadService::put_ctx(table_ctx); table_ctx = nullptr; } @@ -109,14 +113,11 @@ int ObTableDirectInsertService::open_task(const uint64_t table_id, const int64_t int ObTableDirectInsertService::close_task(const uint64_t table_id, const int64_t task_id, + ObTableLoadTableCtx *table_ctx, const int error_code) { int ret = OB_SUCCESS; - ObTableLoadTableCtx *table_ctx = nullptr; - ObTableLoadKey key(MTL_ID(), table_id); - if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) { - LOG_WARN("fail to get table ctx", KR(ret), K(key), K(table_id)); - } else { + if (OB_NOT_NULL(table_ctx)) { table::ObTableLoadTransId trans_id; trans_id.segment_id_ = task_id; trans_id.trans_gid_ = 1; @@ -134,10 +135,7 @@ int ObTableDirectInsertService::close_task(const uint64_t table_id, LOG_WARN("fail to abandon direct load trans", KR(tmp_ret), K(trans_id)); } } - } - if (OB_NOT_NULL(table_ctx)) { ObTableLoadService::put_ctx(table_ctx); - table_ctx = nullptr; } return ret; } diff --git a/src/sql/engine/cmd/ob_table_direct_insert_service.h b/src/sql/engine/cmd/ob_table_direct_insert_service.h index ef0788e997..78854cb1cf 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_service.h +++ b/src/sql/engine/cmd/ob_table_direct_insert_service.h @@ -8,6 +8,10 @@ namespace oceanbase { +namespace observer +{ +class ObTableLoadTableCtx; +} namespace sql { class ObExecContext; @@ -24,10 +28,13 @@ public: static int commit_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan); static int finish_direct_insert(ObExecContext &ctx, ObPhysicalPlan &plan); // each insert-task is processed in a single thread and is wrapped by a table load trans - static int open_task(const uint64_t table_id, const int64_t task_id); + static int open_task(const uint64_t table_id, + const int64_t task_id, + observer::ObTableLoadTableCtx *&table_ctx); static int close_task(const uint64_t table_id, - const int64_t task_id, - const int error_code = OB_SUCCESS); + const int64_t task_id, + observer::ObTableLoadTableCtx *table_ctx, + const int error_code = OB_SUCCESS); }; } // namespace sql } // namespace oceanbase \ No newline at end of file diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp index ecbd2f8762..a0fd252048 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp @@ -52,7 +52,7 @@ int ObPxMultiPartInsertOp::inner_open() const ObPhysicalPlan *plan = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan(); if (ObTableDirectInsertService::is_direct_insert(*plan)) { int64_t task_id = ctx_.get_px_task_id() + 1; - if (OB_FAIL(ObTableDirectInsertService::open_task(plan->get_append_table_id(), task_id))) { + if (OB_FAIL(ObTableDirectInsertService::open_task(plan->get_append_table_id(), task_id, table_ctx_))) { LOG_WARN("failed to open table direct insert task", KR(ret), K(plan->get_append_table_id()), K(task_id)); } else { @@ -110,6 +110,7 @@ int ObPxMultiPartInsertOp::inner_close() int error_code = (static_cast(input_))->get_error_code(); if (OB_TMP_FAIL(ObTableDirectInsertService::close_task(plan->get_append_table_id(), task_id, + table_ctx_, error_code))) { LOG_WARN("failed to close table direct insert task", KR(tmp_ret), K(plan->get_append_table_id()), K(task_id), K(error_code)); diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.h b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.h index 873dfd6137..eac99d0ba8 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.h +++ b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.h @@ -19,6 +19,10 @@ namespace oceanbase { +namespace observer +{ +class ObTableLoadTableCtx; +} namespace sql { class ObPxMultiPartInsertOpInput : public ObPxMultiPartModifyOpInput @@ -81,7 +85,8 @@ public: ObOpInput *input) : ObTableModifyOp(exec_ctx, spec, input), data_driver_(&ObOperator::get_eval_ctx(), exec_ctx.get_allocator(), op_monitor_info_), - ins_rtdef_() + ins_rtdef_(), + table_ctx_(nullptr) { } @@ -101,6 +106,7 @@ private: protected: ObPDMLOpDataDriver data_driver_; ObInsRtDef ins_rtdef_; + observer::ObTableLoadTableCtx *table_ctx_; DISALLOW_COPY_AND_ASSIGN(ObPxMultiPartInsertOp); };