fix use error row handler

This commit is contained in:
yongshige
2023-08-08 10:18:34 +00:00
committed by ob-robot
parent b047318590
commit 8325c17a84
6 changed files with 32 additions and 9 deletions

View File

@ -6,6 +6,7 @@
#include "observer/table_load/ob_table_load_coordinator_ctx.h"
#include "observer/table_load/ob_table_load_coordinator_trans.h"
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_task_scheduler.h"
#include "share/ob_autoincrement_service.h"
@ -28,6 +29,7 @@ ObTableLoadCoordinatorCtx::ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx)
allocator_("TLD_CoordCtx"),
task_scheduler_(nullptr),
exec_ctx_(nullptr),
error_row_handler_(nullptr),
sequence_schema_(&allocator_),
last_trans_gid_(1024),
next_session_id_(0),
@ -135,6 +137,14 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray<int64_t> &idx_array,
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
}
// init error_row_handler_
else if (OB_ISNULL(error_row_handler_ =
OB_NEWx(ObTableLoadErrorRowHandler, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadErrorRowHandler", KR(ret));
} else if (OB_FAIL(error_row_handler_->init(ctx_->param_, result_info_, ctx_->job_stat_))) {
LOG_WARN("fail to init error row handler", KR(ret));
}
// init session_ctx_array_
else if (OB_FAIL(init_session_ctx_array())) {
LOG_WARN("fail to init session ctx array", KR(ret));
@ -175,6 +185,11 @@ void ObTableLoadCoordinatorCtx::destroy()
allocator_.free(task_scheduler_);
task_scheduler_ = nullptr;
}
if (nullptr != error_row_handler_) {
error_row_handler_->~ObTableLoadErrorRowHandler();
allocator_.free(error_row_handler_);
error_row_handler_ = nullptr;
}
for (TransMap::const_iterator iter = trans_map_.begin(); iter != trans_map_.end(); ++iter) {
ObTableLoadCoordinatorTrans *trans = iter->second;
abort_unless(0 == trans->get_ref_count());

View File

@ -28,6 +28,7 @@ class ObTableLoadTableCtx;
class ObTableLoadTransCtx;
class ObTableLoadCoordinatorTrans;
class ObITableLoadTaskScheduler;
class ObTableLoadErrorRowHandler;
class ObTableLoadCoordinatorCtx
{
@ -119,6 +120,7 @@ public:
common::ObArray<int64_t> idx_array_;
ObTableLoadExecCtx *exec_ctx_;
table::ObTableLoadResultInfo result_info_;
ObTableLoadErrorRowHandler *error_row_handler_;
share::schema::ObSequenceSchema sequence_schema_;
struct SessionContext
{

View File

@ -5,8 +5,9 @@
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_error_row_handler.h"
#include "observer/table_load/ob_table_load_store_ctx.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_struct.h"
#include "share/table/ob_table_load_define.h"
namespace oceanbase
{
@ -30,17 +31,19 @@ ObTableLoadErrorRowHandler::~ObTableLoadErrorRowHandler()
{
}
int ObTableLoadErrorRowHandler::init(ObTableLoadStoreCtx *store_ctx)
int ObTableLoadErrorRowHandler::init(const ObTableLoadParam &param,
table::ObTableLoadResultInfo &result_info,
sql::ObLoadDataStat *job_stat)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadErrorRowHandler init twice", KR(ret), KP(this));
} else {
dup_action_ = store_ctx->ctx_->param_.dup_action_;
max_error_row_count_ = store_ctx->ctx_->param_.max_error_row_count_;
result_info_ = &store_ctx->result_info_;
job_stat_ = store_ctx->ctx_->job_stat_;
dup_action_ = param.dup_action_;
max_error_row_count_ = param.max_error_row_count_;
result_info_ = &result_info;
job_stat_ = job_stat;
is_inited_ = true;
}
return ret;

View File

@ -18,13 +18,16 @@ class ObTableLoadResultInfo;
namespace observer
{
class ObTableLoadStoreCtx;
class ObTableLoadCoordinatorCtx;
class ObTableLoadParam;
class ObTableLoadErrorRowHandler : public ObDirectLoadDMLRowHandler
{
public:
ObTableLoadErrorRowHandler();
virtual ~ObTableLoadErrorRowHandler();
int init(ObTableLoadStoreCtx *store_ctx);
int init(const ObTableLoadParam &param, table::ObTableLoadResultInfo &result_info,
sql::ObLoadDataStat *job_stat);
int handle_insert_row(const blocksstable::ObDatumRow &row) override;
int handle_update_row(const blocksstable::ObDatumRow &row) override;
int handle_update_row(const blocksstable::ObDatumRow &old_row,

View File

@ -202,7 +202,7 @@ int ObTableLoadStoreCtx::init(
OB_NEWx(ObTableLoadErrorRowHandler, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadErrorRowHandler", KR(ret));
} else if (OB_FAIL(error_row_handler_->init(this))) {
} else if (OB_FAIL(error_row_handler_->init(ctx_->param_, result_info_, ctx_->job_stat_))) {
LOG_WARN("fail to init error row handler", KR(ret));
}
// init session_ctx_array_

View File

@ -315,7 +315,7 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_
ObArray<ObNewRow> part_keys;
ObArray<int64_t> row_idxs;
ObTableLoadErrorRowHandler *error_row_handler =
trans_ctx_->ctx_->store_ctx_->error_row_handler_;
coordinator_ctx_->error_row_handler_;
partition_ids.set_block_allocator(common::ModulePageAllocator(allocator));
for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) {
ObNewRow part_key;