From 8325c17a844a13e05f4a1bb92583e8158002bf6b Mon Sep 17 00:00:00 2001 From: yongshige <598633031@qq.com> Date: Tue, 8 Aug 2023 10:18:34 +0000 Subject: [PATCH] fix use error row handler --- .../table_load/ob_table_load_coordinator_ctx.cpp | 15 +++++++++++++++ .../table_load/ob_table_load_coordinator_ctx.h | 2 ++ .../ob_table_load_error_row_handler.cpp | 15 +++++++++------ .../table_load/ob_table_load_error_row_handler.h | 5 ++++- .../table_load/ob_table_load_store_ctx.cpp | 2 +- .../ob_table_load_trans_bucket_writer.cpp | 2 +- 6 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index f01d0b16f3..fd75a56c8b 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -6,6 +6,7 @@ #include "observer/table_load/ob_table_load_coordinator_ctx.h" #include "observer/table_load/ob_table_load_coordinator_trans.h" +#include "observer/table_load/ob_table_load_error_row_handler.h" #include "observer/table_load/ob_table_load_table_ctx.h" #include "observer/table_load/ob_table_load_task_scheduler.h" #include "share/ob_autoincrement_service.h" @@ -28,6 +29,7 @@ ObTableLoadCoordinatorCtx::ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx) allocator_("TLD_CoordCtx"), task_scheduler_(nullptr), exec_ctx_(nullptr), + error_row_handler_(nullptr), sequence_schema_(&allocator_), last_trans_gid_(1024), next_session_id_(0), @@ -135,6 +137,14 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &idx_array, ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); } + // init error_row_handler_ + else if (OB_ISNULL(error_row_handler_ = + OB_NEWx(ObTableLoadErrorRowHandler, (&allocator_)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to new ObTableLoadErrorRowHandler", KR(ret)); + } else if (OB_FAIL(error_row_handler_->init(ctx_->param_, result_info_, ctx_->job_stat_))) { + LOG_WARN("fail to init error row handler", KR(ret)); + } // init session_ctx_array_ else if (OB_FAIL(init_session_ctx_array())) { LOG_WARN("fail to init session ctx array", KR(ret)); @@ -175,6 +185,11 @@ void ObTableLoadCoordinatorCtx::destroy() allocator_.free(task_scheduler_); task_scheduler_ = nullptr; } + if (nullptr != error_row_handler_) { + error_row_handler_->~ObTableLoadErrorRowHandler(); + allocator_.free(error_row_handler_); + error_row_handler_ = nullptr; + } for (TransMap::const_iterator iter = trans_map_.begin(); iter != trans_map_.end(); ++iter) { ObTableLoadCoordinatorTrans *trans = iter->second; abort_unless(0 == trans->get_ref_count()); diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.h b/src/observer/table_load/ob_table_load_coordinator_ctx.h index 39f95ab283..fd59402511 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.h +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.h @@ -28,6 +28,7 @@ class ObTableLoadTableCtx; class ObTableLoadTransCtx; class ObTableLoadCoordinatorTrans; class ObITableLoadTaskScheduler; +class ObTableLoadErrorRowHandler; class ObTableLoadCoordinatorCtx { @@ -119,6 +120,7 @@ public: common::ObArray idx_array_; ObTableLoadExecCtx *exec_ctx_; table::ObTableLoadResultInfo result_info_; + ObTableLoadErrorRowHandler *error_row_handler_; share::schema::ObSequenceSchema sequence_schema_; struct SessionContext { diff --git a/src/observer/table_load/ob_table_load_error_row_handler.cpp b/src/observer/table_load/ob_table_load_error_row_handler.cpp index a16c4c71df..b0d06a64ad 100644 --- a/src/observer/table_load/ob_table_load_error_row_handler.cpp +++ b/src/observer/table_load/ob_table_load_error_row_handler.cpp @@ -5,8 +5,9 @@ #define USING_LOG_PREFIX SERVER #include "observer/table_load/ob_table_load_error_row_handler.h" -#include "observer/table_load/ob_table_load_store_ctx.h" #include "observer/table_load/ob_table_load_table_ctx.h" +#include "observer/table_load/ob_table_load_struct.h" +#include "share/table/ob_table_load_define.h" namespace oceanbase { @@ -30,17 +31,19 @@ ObTableLoadErrorRowHandler::~ObTableLoadErrorRowHandler() { } -int ObTableLoadErrorRowHandler::init(ObTableLoadStoreCtx *store_ctx) +int ObTableLoadErrorRowHandler::init(const ObTableLoadParam ¶m, + table::ObTableLoadResultInfo &result_info, + sql::ObLoadDataStat *job_stat) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadErrorRowHandler init twice", KR(ret), KP(this)); } else { - dup_action_ = store_ctx->ctx_->param_.dup_action_; - max_error_row_count_ = store_ctx->ctx_->param_.max_error_row_count_; - result_info_ = &store_ctx->result_info_; - job_stat_ = store_ctx->ctx_->job_stat_; + dup_action_ = param.dup_action_; + max_error_row_count_ = param.max_error_row_count_; + result_info_ = &result_info; + job_stat_ = job_stat; is_inited_ = true; } return ret; diff --git a/src/observer/table_load/ob_table_load_error_row_handler.h b/src/observer/table_load/ob_table_load_error_row_handler.h index df4fadd8b6..10507a8a78 100644 --- a/src/observer/table_load/ob_table_load_error_row_handler.h +++ b/src/observer/table_load/ob_table_load_error_row_handler.h @@ -18,13 +18,16 @@ class ObTableLoadResultInfo; namespace observer { class ObTableLoadStoreCtx; +class ObTableLoadCoordinatorCtx; +class ObTableLoadParam; class ObTableLoadErrorRowHandler : public ObDirectLoadDMLRowHandler { public: ObTableLoadErrorRowHandler(); virtual ~ObTableLoadErrorRowHandler(); - int init(ObTableLoadStoreCtx *store_ctx); + int init(const ObTableLoadParam ¶m, table::ObTableLoadResultInfo &result_info, + sql::ObLoadDataStat *job_stat); int handle_insert_row(const blocksstable::ObDatumRow &row) override; int handle_update_row(const blocksstable::ObDatumRow &row) override; int handle_update_row(const blocksstable::ObDatumRow &old_row, diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 78fdc1f8bd..272a082e6c 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -202,7 +202,7 @@ int ObTableLoadStoreCtx::init( OB_NEWx(ObTableLoadErrorRowHandler, (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadErrorRowHandler", KR(ret)); - } else if (OB_FAIL(error_row_handler_->init(this))) { + } else if (OB_FAIL(error_row_handler_->init(ctx_->param_, result_info_, ctx_->job_stat_))) { LOG_WARN("fail to init error row handler", KR(ret)); } // init session_ctx_array_ diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp index 2a5648b23c..81be3aaec9 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp @@ -315,7 +315,7 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_ ObArray part_keys; ObArray row_idxs; ObTableLoadErrorRowHandler *error_row_handler = - trans_ctx_->ctx_->store_ctx_->error_row_handler_; + coordinator_ctx_->error_row_handler_; partition_ids.set_block_allocator(common::ModulePageAllocator(allocator)); for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) { ObNewRow part_key;