diff --git a/src/observer/table_load/ob_table_load_begin_processor.cpp b/src/observer/table_load/ob_table_load_begin_processor.cpp index 75c6f8c1c2..a53245da39 100644 --- a/src/observer/table_load/ob_table_load_begin_processor.cpp +++ b/src/observer/table_load/ob_table_load_begin_processor.cpp @@ -112,7 +112,7 @@ int ObTableLoadBeginP::process() param.px_mode_ = false; param.online_opt_stat_gather_ = false; param.data_type_ = static_cast(arg_.config_.flag_.data_type_); - param.dup_action_ = ObLoadDupActionType::LOAD_STOP_ON_DUP; + param.dup_action_ = static_cast(arg_.config_.flag_.dup_action_); if (OB_FAIL(param.normalize())) { LOG_WARN("fail to normalize param", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_error_row_handler.cpp b/src/observer/table_load/ob_table_load_error_row_handler.cpp index eb98d58427..a16c4c71df 100644 --- a/src/observer/table_load/ob_table_load_error_row_handler.cpp +++ b/src/observer/table_load/ob_table_load_error_row_handler.cpp @@ -5,49 +5,23 @@ #define USING_LOG_PREFIX SERVER #include "observer/table_load/ob_table_load_error_row_handler.h" -#include "observer/table_load/ob_table_load_schema.h" -#include "observer/table_load/ob_table_load_stat.h" +#include "observer/table_load/ob_table_load_store_ctx.h" #include "observer/table_load/ob_table_load_table_ctx.h" -#include "observer/table_load/ob_table_load_utils.h" -#include "sql/engine/cmd/ob_load_data_utils.h" -#include "share/rc/ob_tenant_base.h" namespace oceanbase { -using namespace common; -using namespace common::hash; -using namespace table; -using namespace lib; -using namespace share; -using namespace share::schema; -using namespace blocksstable; -using namespace sql; namespace observer { -ObTableLoadErrorRowHandler::PartitionRowkey::~PartitionRowkey() -{ - if (OB_NOT_NULL(last_rowkey_.get_datum_ptr())) { - allocator_.free((void *)(last_rowkey_.get_datum_ptr())); - } -} - -ObTableLoadErrorRowHandler::PartitionRowkeyMap::~PartitionRowkeyMap() -{ - auto release_map_entry = [this](HashMapPair &entry) { - ObTableLoadErrorRowHandler::PartitionRowkey *part_rowkey = entry.second; - part_rowkey->~PartitionRowkey(); - allocator_.free((void *)part_rowkey); - return 0; - }; - map_.foreach_refactored(release_map_entry); -} +using namespace blocksstable; +using namespace common; +using namespace sql; ObTableLoadErrorRowHandler::ObTableLoadErrorRowHandler() - : capacity_(0), - session_cnt_(0), - safe_allocator_(row_allocator_), - error_row_cnt_(0), - repeated_row_cnt_(0), + : dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE), + max_error_row_count_(0), + result_info_(nullptr), + job_stat_(nullptr), + error_row_count_(0), is_inited_(false) { } @@ -56,249 +30,162 @@ ObTableLoadErrorRowHandler::~ObTableLoadErrorRowHandler() { } -int ObTableLoadErrorRowHandler::init(ObTableLoadTableCtx *const ctx) +int ObTableLoadErrorRowHandler::init(ObTableLoadStoreCtx *store_ctx) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadErrorRowHandler init twice", KR(ret), KP(this)); } else { - param_ = ctx->param_; - job_stat_ = ctx->job_stat_; - datum_utils_ = &(ctx->schema_.datum_utils_); - col_descs_ = &(ctx->schema_.column_descs_); - capacity_ = ctx->param_.max_error_row_count_; - rowkey_column_num_ = ctx->schema_.rowkey_column_count_; - session_cnt_ = ctx->param_.session_count_; - if (OB_FAIL(session_maps_.prepare_allocate(session_cnt_))) { - LOG_WARN("failed to pre allocate session maps", K(ret), K(session_cnt_)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && (i < session_maps_.count()); ++i) { - if (OB_FAIL(session_maps_.at(i).map_.create(1024, "TLD_err_chk_map", "TLD_err_chk_map", - ctx->param_.tenant_id_))) { - LOG_WARN("fail to create map", KR(ret), K(ctx->param_.tenant_id_)); - } else { - ObArenaAllocator &allocator = session_maps_.at(i).allocator_; - allocator.set_label("TLD_err_chk"); - allocator.set_tenant_id(MTL_ID()); - } - } - } - } - if (OB_SUCC(ret)) { + dup_action_ = store_ctx->ctx_->param_.dup_action_; + max_error_row_count_ = store_ctx->ctx_->param_.max_error_row_count_; + result_info_ = &store_ctx->result_info_; + job_stat_ = store_ctx->ctx_->job_stat_; is_inited_ = true; } return ret; } -int ObTableLoadErrorRowHandler::inner_append_error_row(const ObNewRow &row, - ObIArray &error_row_array) +int ObTableLoadErrorRowHandler::handle_insert_row(const blocksstable::ObDatumRow &row) +{ + UNUSED(row); + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else { + ATOMIC_INC(&result_info_->rows_affected_); + } + return ret; +} + +int ObTableLoadErrorRowHandler::handle_update_row(const ObDatumRow &row) +{ + UNUSED(row); + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else { + if (ObLoadDupActionType::LOAD_STOP_ON_DUP == dup_action_) { + if (0 == max_error_row_count_) { + ret = OB_ERR_PRIMARY_KEY_DUPLICATE; + } else { + ObMutexGuard guard(mutex_); + if (error_row_count_ >= max_error_row_count_) { + ret = OB_ERR_TOO_MANY_ROWS; + LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count), + K_(error_row_count)); + } else { + ++error_row_count_; + } + } + ATOMIC_INC(&job_stat_->detected_error_rows_); + } else if (ObLoadDupActionType::LOAD_REPLACE == dup_action_) { + ATOMIC_AAF(&result_info_->rows_affected_, 2); + ATOMIC_INC(&result_info_->deleted_); + } else if (ObLoadDupActionType::LOAD_IGNORE == dup_action_) { + ATOMIC_INC(&result_info_->skipped_); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected dup action", KR(ret), K_(dup_action)); + } + } + return ret; +} + +int ObTableLoadErrorRowHandler::handle_update_row(const ObDatumRow &old_row, + const ObDatumRow &new_row, + const ObDatumRow *&result_row) { int ret = OB_SUCCESS; - ObNewRow new_row; - ObMutexGuard guard(append_row_mutex_); - if (error_row_cnt_ >= capacity_) { - ret = OB_ERR_TOO_MANY_ROWS; - LOG_WARN("error row count reaches its maximum value", K(ret), K(capacity_), K(error_row_cnt_)); - } else if (OB_FAIL(ObTableLoadUtils::deep_copy(row, new_row, safe_allocator_))) { - LOG_WARN("failed to deep copy new row", K(ret), K(row)); - } else if (OB_FAIL(error_row_array.push_back(new_row))) { - LOG_WARN("failed to push back error row", K(ret), K(new_row)); + result_row = nullptr; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); } else { - error_row_cnt_++; + if (ObLoadDupActionType::LOAD_STOP_ON_DUP == dup_action_) { + if (0 == max_error_row_count_) { + ret = OB_ERR_PRIMARY_KEY_DUPLICATE; + } else { + ObMutexGuard guard(mutex_); + if (error_row_count_ >= max_error_row_count_) { + ret = OB_ERR_TOO_MANY_ROWS; + LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count), + K_(error_row_count)); + } else { + ++error_row_count_; + } + } + if (OB_SUCC(ret)) { + result_row = &old_row; + } + ATOMIC_INC(&job_stat_->detected_error_rows_); + } else if (ObLoadDupActionType::LOAD_IGNORE == dup_action_) { + result_row = &old_row; + ATOMIC_INC(&result_info_->skipped_); + } else if (ObLoadDupActionType::LOAD_REPLACE == dup_action_) { + result_row = &new_row; + ATOMIC_INC(&result_info_->deleted_); + ATOMIC_AAF(&result_info_->rows_affected_, 2); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected dup action", KR(ret), K_(dup_action)); + } + } + return ret; +} + +int ObTableLoadErrorRowHandler::handle_error_row(int error_code, const ObNewRow &row) +{ + UNUSED(row); + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (max_error_row_count_ == 0) { + ret = error_code; + } else { + ObMutexGuard guard(mutex_); + if (error_row_count_ >= max_error_row_count_) { + ret = OB_ERR_TOO_MANY_ROWS; + LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count), + K_(error_row_count)); + } else { + ++error_row_count_; + } ATOMIC_INC(&job_stat_->detected_error_rows_); } return ret; } -int ObTableLoadErrorRowHandler::inner_append_repeated_row(const ObNewRow &row, - ObIArray &repeated_row_array) -{ - int ret = OB_SUCCESS; - ObNewRow new_row; - ObMutexGuard guard(append_row_mutex_); - if (repeated_row_cnt_ >= DEFAULT_REPEATED_ERROR_ROW_COUNT) { - ret = OB_ERR_TOO_MANY_ROWS; - LOG_WARN("repeated row count reaches its maximum value", K(ret), K(capacity_), - K(repeated_row_cnt_)); - } else if (OB_FAIL(ObTableLoadUtils::deep_copy(row, new_row, safe_allocator_))) { - LOG_WARN("failed to deep copy new row", K(ret), K(row)); - } else if (OB_FAIL(repeated_row_array.push_back(new_row))) { - LOG_WARN("failed to push back error row", K(ret), K(new_row)); - } else { - repeated_row_cnt_ ++; - } - return ret; -} - -int ObTableLoadErrorRowHandler::append_error_row(const ObNewRow &row) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else if (OB_FAIL(inner_append_error_row(row, error_row_array_))) { - LOG_WARN("failed to append row to str error row array", K(ret), K(row)); - } - return ret; -} - -int ObTableLoadErrorRowHandler::append_error_row(const ObDatumRow &row) +int ObTableLoadErrorRowHandler::handle_error_row(int error_code, const ObDatumRow &row) { + UNUSED(row); int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); + } else if (max_error_row_count_ == 0) { + ret = error_code; } else { - ObNewRow new_row; - ObObjBufArray obj_buf; - ObArenaAllocator allocator; - if (OB_FAIL(obj_buf.init(&allocator))) { - LOG_WARN("fail to init obj buf", KR(ret)); - } else if (OB_FAIL(obj_buf.reserve(row.count_))) { - LOG_WARN("Failed to reserve buf for obj buf", K(ret), K(row.count_)); + ObMutexGuard guard(mutex_); + if (error_row_count_ >= max_error_row_count_) { + ret = OB_ERR_TOO_MANY_ROWS; + LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count), + K_(error_row_count)); } else { - new_row.cells_ = obj_buf.get_data(); - new_row.count_ = row.count_; - for (int64_t i = 0; OB_SUCC(ret) && i < (*col_descs_).count(); i++) { - if (OB_FAIL(row.storage_datums_[i].to_obj_enhance(new_row.cells_[i], (*col_descs_).at(i).col_type_))) { - LOG_WARN("Failed to transform datum to obj", K(ret), K(i), K(row.storage_datums_[i])); - } - } - if (OB_SUCC(ret)) { - if (OB_FAIL(inner_append_error_row(new_row, error_new_row_array_))) { - LOG_WARN("failed to append row to error row array", K(ret), K(new_row)); - } - } + ++error_row_count_; } + ATOMIC_INC(&job_stat_->detected_error_rows_); } return ret; } -int ObTableLoadErrorRowHandler::append_repeated_row(const common::ObNewRow &row) +uint64_t ObTableLoadErrorRowHandler::get_error_row_count() const { - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else { - if (param_.dup_action_ == ObLoadDupActionType::LOAD_STOP_ON_DUP) { - if (OB_FAIL(inner_append_error_row(row, error_row_array_))) { - LOG_WARN("failed to append row to error row array", K(ret), K(row)); - } - } else { - if (OB_FAIL(inner_append_repeated_row(row, repeated_row_array_))) { - LOG_WARN("failed to append row to error row array", K(ret), K(row)); - } - } - } - return ret; -} - -int ObTableLoadErrorRowHandler::append_repeated_row(const blocksstable::ObDatumRow &row) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else { - ObNewRow new_row; - ObObjBufArray obj_buf; - ObArenaAllocator allocator; - if (OB_FAIL(obj_buf.init(&allocator))) { - LOG_WARN("fail to init obj buf", KR(ret)); - } else if (OB_FAIL(obj_buf.reserve(row.count_))) { - LOG_WARN("Failed to reserve buf for obj buf", K(ret), K(row.count_)); - } else { - new_row.cells_ = obj_buf.get_data(); - new_row.count_ = row.count_; - for (int64_t i = 0; OB_SUCC(ret) && i < (*col_descs_).count(); i++) { - if (OB_FAIL(row.storage_datums_[i].to_obj_enhance(new_row.cells_[i], (*col_descs_).at(i).col_type_))) { - LOG_WARN("Failed to transform datum to obj", K(ret), K(i), K(row.storage_datums_[i])); - } - } - if (OB_SUCC(ret)) { - if (param_.dup_action_ == ObLoadDupActionType::LOAD_STOP_ON_DUP) { - if (OB_FAIL(inner_append_error_row(new_row, error_new_row_array_))) { - LOG_WARN("failed to append row to error row array", K(ret), K(new_row)); - } - } else { - if (OB_FAIL(inner_append_repeated_row(new_row, repeated_new_row_array_))) { - LOG_WARN("failed to append row to error row array", K(ret), K(new_row)); - } - } - } - } - } - return ret; -} - -// TODO: convert each obj to string -int ObTableLoadErrorRowHandler::get_all_error_rows(ObTableLoadArray &obj_array) -{ - int ret = OB_NOT_IMPLEMENT; - return ret; -} - -int ObTableLoadErrorRowHandler::check_rowkey_order(int32_t session_id, const ObTabletID &tablet_id, - const ObDatumRow &datum_row) -{ - OB_TABLE_LOAD_STATISTICS_TIME_COST(check_rowkey_order_time_us); - int ret = OB_SUCCESS; - int cmp_ret = 1; - ObTableLoadErrorRowHandler::PartitionRowkey *last_part_rowkey = nullptr; - ObDatumRowkey rowkey; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else if ((session_id < 1) || (session_id > session_cnt_)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid session id", K(session_id), K(session_cnt_)); - } else if (OB_FAIL(rowkey.assign(datum_row.storage_datums_, rowkey_column_num_))) { - LOG_WARN("failed to assign to rowkey", K(ret), KPC(datum_row.storage_datums_), - K(rowkey_column_num_)); - } else { - ObTableLoadErrorRowHandler::PartitionRowkeyMap &partition_map = - session_maps_.at(session_id - 1); - if (OB_FAIL(partition_map.map_.get_refactored(tablet_id, last_part_rowkey))) { - if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { - LOG_WARN("fail to get refactored", KR(ret), K(tablet_id)); - } else { - // allocate a new last_part_rowkey for the new partition - if (OB_ISNULL(last_part_rowkey = OB_NEWx(ObTableLoadErrorRowHandler::PartitionRowkey, - (&(partition_map.allocator_))))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to new ObDatumRowkey", KR(ret)); - } else if (OB_FAIL(partition_map.map_.set_refactored(tablet_id, last_part_rowkey))) { - LOG_WARN("fail to add last rowkey to map", KR(ret), K(session_id), K(tablet_id)); - } - } - } - if (OB_SUCC(ret)) { - ObDatumRowkey &last_rowkey = last_part_rowkey->last_rowkey_; - if (OB_FAIL(rowkey.compare(last_rowkey, *datum_utils_, cmp_ret))) { - LOG_WARN("fail to compare rowkey to last rowkey", KR(ret), K(rowkey), K(last_rowkey)); - } else if (cmp_ret > 0) { - // free last rowkey - last_part_rowkey->allocator_.reuse(); - // overwrite last rowkey - // TODO: deep copy one row each batch instead of each row - if (OB_FAIL( - ObTableLoadUtils::deep_copy(rowkey, last_rowkey, last_part_rowkey->allocator_))) { - LOG_WARN("failed to deep copy rowkey to last rowkey", K(ret), K(rowkey), K(last_rowkey)); - } - } else if (cmp_ret == 0) { - ret = OB_ERR_PRIMARY_KEY_DUPLICATE; - LOG_WARN("rowkey == last rowkey", K(ret), K(cmp_ret), K(session_id), K(tablet_id), - K(last_rowkey), K(rowkey)); - } else { - ret = OB_ROWKEY_ORDER_ERROR; - LOG_WARN("rowkey < last rowkey", K(ret), K(cmp_ret), K(session_id), K(tablet_id), - K(last_rowkey), K(rowkey)); - } - } - } - return ret; + ObMutexGuard guard(mutex_); + return error_row_count_; } } // namespace observer diff --git a/src/observer/table_load/ob_table_load_error_row_handler.h b/src/observer/table_load/ob_table_load_error_row_handler.h index bf506af3cd..df4fadd8b6 100644 --- a/src/observer/table_load/ob_table_load_error_row_handler.h +++ b/src/observer/table_load/ob_table_load_error_row_handler.h @@ -5,91 +5,42 @@ #pragma once #include "common/row/ob_row.h" -#include "observer/table_load/ob_table_load_struct.h" -#include "share/table/ob_table_load_array.h" -#include "share/table/ob_table_load_define.h" -#include "share/table/ob_table_load_row.h" #include "sql/engine/cmd/ob_load_data_utils.h" #include "storage/blocksstable/ob_datum_row.h" -#include "storage/blocksstable/ob_datum_rowkey.h" -#include "sql/resolver/cmd/ob_load_data_stmt.h" -#include "share/rc/ob_tenant_base.h" +#include "storage/direct_load/ob_direct_load_dml_row_handler.h" namespace oceanbase { +namespace table +{ +class ObTableLoadResultInfo; +} // namespace table namespace observer { +class ObTableLoadStoreCtx; -class ObTableLoadTableCtx; - -class ObTableLoadErrorRowHandler +class ObTableLoadErrorRowHandler : public ObDirectLoadDMLRowHandler { public: - static const int64_t DEFAULT_REPEATED_ERROR_ROW_COUNT = 100; ObTableLoadErrorRowHandler(); - ~ObTableLoadErrorRowHandler(); - int init(ObTableLoadTableCtx *const ctx); - int append_error_row(const common::ObNewRow &row); - int append_error_row(const blocksstable::ObDatumRow &row); - int append_repeated_row(const common::ObNewRow &row); - int append_repeated_row(const blocksstable::ObDatumRow &row); - int get_all_error_rows(table::ObTableLoadArray &obj_array); - uint64_t get_error_row_cnt() const { return error_row_cnt_; } - int check_rowkey_order(int32_t session_id, const common::ObTabletID &tablet_id, - const blocksstable::ObDatumRow &datum_row); - sql::ObLoadDupActionType get_action() const {return param_.dup_action_;} - uint64_t get_capacity() const { return capacity_; } - TO_STRING_KV(K_(capacity), K_(error_row_cnt), K_(repeated_row_cnt), K_(session_cnt), K_(is_inited)); + virtual ~ObTableLoadErrorRowHandler(); + int init(ObTableLoadStoreCtx *store_ctx); + int handle_insert_row(const blocksstable::ObDatumRow &row) override; + int handle_update_row(const blocksstable::ObDatumRow &row) override; + int handle_update_row(const blocksstable::ObDatumRow &old_row, + const blocksstable::ObDatumRow &new_row, + const blocksstable::ObDatumRow *&result_row) override; + int handle_error_row(int error_code, const common::ObNewRow &row); + int handle_error_row(int error_code, const blocksstable::ObDatumRow &row); + uint64_t get_error_row_count() const; + TO_STRING_KV(K_(dup_action), K_(max_error_row_count), K_(error_row_count)); private: - int inner_append_error_row(const common::ObNewRow &row, - common::ObIArray &error_row_array); - int inner_append_repeated_row(const common::ObNewRow &row, - common::ObIArray &repeated_row_array); - class PartitionRowkey - { - public: - PartitionRowkey() : allocator_("TLD_err_chk") - { - allocator_.set_tenant_id(MTL_ID()); - last_rowkey_.set_min_rowkey(); - } - ~PartitionRowkey(); - TO_STRING_KV(K(last_rowkey_)); - public: - common::ObArenaAllocator allocator_; - blocksstable::ObDatumRowkey last_rowkey_; - }; - class PartitionRowkeyMap - { - public: - PartitionRowkeyMap() : map_() {} - ~PartitionRowkeyMap(); - PartitionRowkeyMap(const PartitionRowkeyMap &other) {} - PartitionRowkeyMap &operator=(const PartitionRowkeyMap &other) { return *this; } - TO_STRING_KV(K(map_.size())); - // all partitions are written sequentially within one session - // thus they can share the same allocator - common::ObArenaAllocator allocator_; - common::hash::ObHashMap map_; - }; -private: - observer::ObTableLoadParam param_; + sql::ObLoadDupActionType dup_action_; + uint64_t max_error_row_count_; + table::ObTableLoadResultInfo *result_info_; sql::ObLoadDataStat *job_stat_; - const oceanbase::blocksstable::ObStorageDatumUtils *datum_utils_; - const common::ObIArray *col_descs_; - uint64_t capacity_; // maximum allowed error row count - int64_t rowkey_column_num_; - int32_t session_cnt_; - common::ObArray session_maps_; - common::ObArenaAllocator row_allocator_; //just for safe allocator - common::ObSafeArenaAllocator safe_allocator_; //该分配器是线程安全的 - mutable lib::ObMutex append_row_mutex_; - uint64_t error_row_cnt_; - uint64_t repeated_row_cnt_; - common::ObArray error_row_array_; - common::ObArray error_new_row_array_; - common::ObArray repeated_row_array_; - common::ObArray repeated_new_row_array_; + mutable lib::ObMutex mutex_; + uint64_t error_row_count_; bool is_inited_; }; diff --git a/src/observer/table_load/ob_table_load_mem_compactor.cpp b/src/observer/table_load/ob_table_load_mem_compactor.cpp index c70aa0443b..f4185dc08b 100644 --- a/src/observer/table_load/ob_table_load_mem_compactor.cpp +++ b/src/observer/table_load/ob_table_load_mem_compactor.cpp @@ -5,6 +5,7 @@ #define USING_LOG_PREFIX SERVER #include "observer/table_load/ob_table_load_mem_compactor.h" +#include "observer/table_load/ob_table_load_error_row_handler.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_stat.h" #include "observer/table_load/ob_table_load_store_ctx.h" @@ -273,9 +274,8 @@ int ObTableLoadMemCompactor::inner_init() mem_ctx_.need_sort_ = param_->need_sort_; mem_ctx_.mem_load_task_count_ = param_->session_count_; mem_ctx_.column_count_ = param_->column_count_; - mem_ctx_.error_row_handler_ = store_ctx_->error_row_handler_; + mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_; mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_; - mem_ctx_.result_info_ = &(store_ctx_->result_info_); } if (OB_SUCC(ret)) { if (OB_FAIL(mem_ctx_.init())) { diff --git a/src/observer/table_load/ob_table_load_merger.cpp b/src/observer/table_load/ob_table_load_merger.cpp index b3f5c842ee..290c027c5c 100644 --- a/src/observer/table_load/ob_table_load_merger.cpp +++ b/src/observer/table_load/ob_table_load_merger.cpp @@ -5,6 +5,7 @@ #define USING_LOG_PREFIX SERVER #include "observer/table_load/ob_table_load_merger.h" +#include "observer/table_load/ob_table_load_error_row_handler.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_stat.h" #include "observer/table_load/ob_table_load_store_ctx.h" @@ -198,8 +199,7 @@ int ObTableLoadMerger::build_merge_ctx() merge_param.is_fast_heap_table_ = store_ctx_->is_fast_heap_table_; merge_param.online_opt_stat_gather_ = param_.online_opt_stat_gather_; merge_param.insert_table_ctx_ = store_ctx_->insert_table_ctx_; - merge_param.error_row_handler_ = store_ctx_->error_row_handler_; - merge_param.result_info_ = &(store_ctx_->result_info_); + merge_param.dml_row_handler_ = store_ctx_->error_row_handler_; if (OB_FAIL(merge_ctx_.init(merge_param, store_ctx_->ls_partition_ids_, store_ctx_->target_ls_partition_ids_))) { LOG_WARN("fail to init merge ctx", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp index e1632c1b36..c01c5b22e2 100644 --- a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp @@ -5,6 +5,7 @@ #define USING_LOG_PREFIX SERVER #include "observer/table_load/ob_table_load_multiple_heap_table_compactor.h" +#include "observer/table_load/ob_table_load_error_row_handler.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_stat.h" #include "observer/table_load/ob_table_load_store_ctx.h" @@ -316,9 +317,8 @@ int ObTableLoadMultipleHeapTableCompactor::inner_init() mem_ctx_.need_sort_ = param_->need_sort_; mem_ctx_.mem_load_task_count_ = param_->session_count_; mem_ctx_.column_count_ = param_->column_count_; - mem_ctx_.error_row_handler_ = store_ctx_->error_row_handler_; + mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_; mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_; - mem_ctx_.result_info_ = &(store_ctx_->result_info_); if (OB_SUCC(ret)) { if (OB_FAIL(mem_ctx_.init())) { diff --git a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp index 2162f2ca31..4cbca67acb 100644 --- a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp +++ b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp @@ -5,6 +5,7 @@ #define USING_LOG_PREFIX SERVER #include "observer/table_load/ob_table_load_parallel_merge_ctx.h" +#include "observer/table_load/ob_table_load_error_row_handler.h" #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_stat.h" #include "observer/table_load/ob_table_load_store_ctx.h" @@ -355,8 +356,7 @@ public: ObDirectLoadMultipleSSTableScanMergeParam scan_merge_param; scan_merge_param.table_data_desc_ = parallel_merge_ctx_->store_ctx_->table_data_desc_; scan_merge_param.datum_utils_ = &(ctx_->schema_.datum_utils_); - scan_merge_param.error_row_handler_ = parallel_merge_ctx_->store_ctx_->error_row_handler_; - scan_merge_param.result_info_ = &(parallel_merge_ctx_->store_ctx_->result_info_); + scan_merge_param.dml_row_handler_ = parallel_merge_ctx_->store_ctx_->error_row_handler_; for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ctx_->merge_sstable_count_; ++i) { ObDirectLoadMultipleSSTable *sstable = tablet_ctx_->sstables_.at(i); if (OB_FAIL(sstable_array_.push_back(sstable))) { diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 73185a0b15..48962eb44f 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -201,10 +201,8 @@ int ObTableLoadStoreCtx::init( OB_NEWx(ObTableLoadErrorRowHandler, (&allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadErrorRowHandler", KR(ret)); - } else if (OB_FAIL(error_row_handler_->init(ctx_))) { - LOG_WARN("fail to init error row handler", KR(ret), K(ctx_->param_.tenant_id_), - K(ctx_->param_.table_id_), K(ctx_->param_.max_error_row_count_), - K(ctx_->param_.session_count_)); + } else if (OB_FAIL(error_row_handler_->init(this))) { + LOG_WARN("fail to init error row handler", KR(ret)); } // init session_ctx_array_ else if (OB_FAIL(init_session_ctx_array())) { diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index 7bd7311d2e..7e94e18af7 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -208,7 +208,7 @@ int ObTableLoadTransStoreWriter::init_session_ctx_array() param.online_opt_stat_gather_ = trans_ctx_->ctx_->param_.online_opt_stat_gather_; param.insert_table_ctx_ = trans_ctx_->ctx_->store_ctx_->insert_table_ctx_; param.fast_heap_table_ctx_ = trans_ctx_->ctx_->store_ctx_->fast_heap_table_ctx_; - param.result_info_ = &(trans_ctx_->ctx_->store_ctx_->result_info_); + param.dml_row_handler_ = trans_ctx_->ctx_->store_ctx_->error_row_handler_; for (int64_t i = 0; OB_SUCC(ret) && i < session_count; ++i) { SessionContext *session_ctx = session_ctx_array_ + i; if (param_.px_mode_) { @@ -413,11 +413,10 @@ int ObTableLoadTransStoreWriter::cast_row(ObArenaAllocator &cast_allocator, } } if (OB_FAIL(ret)) { - int tmp_ret = OB_SUCCESS; ObTableLoadErrorRowHandler *error_row_handler = trans_ctx_->ctx_->store_ctx_->error_row_handler_; - if (OB_TMP_FAIL(error_row_handler->append_error_row(row))) { - LOG_WARN("failed to append error row", K(ret), K(row)); + if (OB_FAIL(error_row_handler->handle_error_row(ret, row))) { + LOG_WARN("failed to handle error row", K(ret), K(row)); } else { ret = OB_EAGAIN; } @@ -462,36 +461,23 @@ int ObTableLoadTransStoreWriter::handle_identity_column(const ObColumnSchemaV2 * } int ObTableLoadTransStoreWriter::write_row_to_table_store(ObDirectLoadTableStore &table_store, - const ObTabletID &tablet_id, - const ObDatumRow &datum_row) + const ObTabletID &tablet_id, + const ObDatumRow &datum_row) { int ret = OB_SUCCESS; if (OB_FAIL(table_store.append_row(tablet_id, datum_row))) { LOG_WARN("fail to append row", KR(ret), K(datum_row)); } if (OB_FAIL(ret)) { - ObTableLoadErrorRowHandler *error_row_handler = trans_ctx_->ctx_->store_ctx_->error_row_handler_; + ObTableLoadErrorRowHandler *error_row_handler = + trans_ctx_->ctx_->store_ctx_->error_row_handler_; if (OB_LIKELY(OB_ERR_PRIMARY_KEY_DUPLICATE == ret)) { - int tmp_ret = OB_SUCCESS; - if (trans_ctx_->ctx_->param_.dup_action_ == ObLoadDupActionType::LOAD_REPLACE) { - ATOMIC_AAF(&trans_ctx_->ctx_->store_ctx_->result_info_.rows_affected_, 2); - ATOMIC_INC(&trans_ctx_->ctx_->store_ctx_->result_info_.deleted_); - } else if (trans_ctx_->ctx_->param_.dup_action_ == ObLoadDupActionType::LOAD_IGNORE) { - ATOMIC_INC(&trans_ctx_->ctx_->store_ctx_->result_info_.skipped_); - } else if (trans_ctx_->ctx_->param_.dup_action_ == ObLoadDupActionType::LOAD_STOP_ON_DUP) { - if (OB_TMP_FAIL(error_row_handler->append_error_row(datum_row))) { - LOG_WARN("failed to append repeated row", K(ret), K(tablet_id), K(datum_row)); - } - } - if (OB_LIKELY(OB_SUCCESS == tmp_ret)) { - ret = OB_SUCCESS; + if (OB_FAIL(error_row_handler->handle_update_row(datum_row))) { + LOG_WARN("fail to handle update row", KR(ret), K(datum_row)); } } else if (OB_LIKELY(OB_ROWKEY_ORDER_ERROR == ret)) { - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(error_row_handler->append_error_row(datum_row))) { - LOG_WARN("failed to append error row", K(ret), K(tablet_id), K(datum_row)); - } else { - ret = OB_SUCCESS; + if (OB_FAIL(error_row_handler->handle_error_row(ret, datum_row))) { + LOG_WARN("fail to handle error row", KR(ret), K(tablet_id), K(datum_row)); } } } diff --git a/src/share/table/ob_table_load_define.h b/src/share/table/ob_table_load_define.h index 90b1f3aa31..cf3024bde2 100644 --- a/src/share/table/ob_table_load_define.h +++ b/src/share/table/ob_table_load_define.h @@ -27,20 +27,22 @@ struct ObTableLoadFlag public: static const uint64_t BIT_IS_NEED_SORT = 1; static const uint64_t BIT_DATA_TYPE = 2; - static const uint64_t BIT_RESERVED = 61; + static const uint64_t BIT_DUP_ACTION_TYPE = 2; + static const uint64_t BIT_RESERVED = 59; union { uint64_t flag_; struct { - uint64_t is_need_sort_ : BIT_IS_NEED_SORT; - uint64_t data_type_ : BIT_DATA_TYPE; - uint64_t reserved_ : BIT_RESERVED; + uint64_t is_need_sort_ : BIT_IS_NEED_SORT; + uint64_t data_type_ : BIT_DATA_TYPE; + uint64_t dup_action_ : BIT_DUP_ACTION_TYPE; + uint64_t reserved_ : BIT_RESERVED; }; }; ObTableLoadFlag() : flag_(0) {} void reset() { flag_ = 0; } - TO_STRING_KV(K_(is_need_sort), K_(data_type)); + TO_STRING_KV(K_(is_need_sort), K_(data_type), K_(dup_action)); }; struct ObTableLoadConfig final diff --git a/src/storage/direct_load/ob_direct_load_data_fuse.cpp b/src/storage/direct_load/ob_direct_load_data_fuse.cpp index b03761b3d2..f24def1908 100644 --- a/src/storage/direct_load/ob_direct_load_data_fuse.cpp +++ b/src/storage/direct_load/ob_direct_load_data_fuse.cpp @@ -12,7 +12,6 @@ namespace storage { using namespace common; using namespace blocksstable; -using namespace observer; using namespace share; using namespace sql; @@ -23,8 +22,7 @@ using namespace sql; ObDirectLoadDataFuseParam::ObDirectLoadDataFuseParam() : store_column_count_(0), datum_utils_(nullptr), - error_row_handler_(nullptr), - result_info_(nullptr) + dml_row_handler_(nullptr) { } @@ -35,7 +33,7 @@ ObDirectLoadDataFuseParam::~ObDirectLoadDataFuseParam() bool ObDirectLoadDataFuseParam::is_valid() const { return tablet_id_.is_valid() && store_column_count_ > 0 && table_data_desc_.is_valid() && - nullptr != datum_utils_ && nullptr != error_row_handler_ && nullptr != result_info_; + nullptr != datum_utils_ && nullptr != dml_row_handler_; } /** @@ -223,50 +221,35 @@ int ObDirectLoadDataFuse::inner_get_next_row(const ObDatumRow *&datum_row) if (OB_FAIL(rows_merger_.pop())) { LOG_WARN("fail to pop item", KR(ret)); } else if (item->iter_idx_ == LOAD_IDX) { - ATOMIC_INC(¶m_.result_info_->rows_affected_); + if (OB_FAIL(param_.dml_row_handler_->handle_insert_row(*datum_row))) { + LOG_WARN("fail to handle insert row", KR(ret), KPC(datum_row)); + } } } } else { const Item *item = nullptr; + const ObDatumRow *old_row = nullptr; + const ObDatumRow *new_row = nullptr; while (OB_SUCC(ret) && !rows_merger_.empty()) { if (OB_FAIL(rows_merger_.top(item))) { LOG_WARN("fail to rebuild", KR(ret)); } else { - if (ObLoadDupActionType::LOAD_STOP_ON_DUP == param_.dup_action_) { - if (item->iter_idx_ == ORIGIN_IDX) { - datum_row = item->datum_row_; - } else { - ObTableLoadErrorRowHandler *error_row_handler = param_.error_row_handler_; - if (OB_FAIL(error_row_handler->append_error_row(*item->datum_row_))) { - if ((OB_ERR_TOO_MANY_ROWS == ret) - && (0 == param_.error_row_handler_->get_capacity())){ - ret = OB_ERR_PRIMARY_KEY_DUPLICATE; - } - LOG_WARN("fail to append row to error row handler", KR(ret), KPC(item->datum_row_)); - } - } - } else if (ObLoadDupActionType::LOAD_IGNORE == param_.dup_action_) { - if (item->iter_idx_ == ORIGIN_IDX) { - datum_row = item->datum_row_; - } else { - ATOMIC_INC(¶m_.result_info_->skipped_); - } - } else if (ObLoadDupActionType::LOAD_REPLACE == param_.dup_action_) { - if (item->iter_idx_ == ORIGIN_IDX) { - ATOMIC_INC(¶m_.result_info_->deleted_); - ATOMIC_AAF(¶m_.result_info_->rows_affected_, 2); - } else { - datum_row = item->datum_row_; - } + if (item->iter_idx_ == ORIGIN_IDX) { + old_row = item->datum_row_; + } else { + new_row = item->datum_row_; } - if (OB_SUCC(ret)) { - consumers_[consumer_cnt_++] = item->iter_idx_; - if (OB_FAIL(rows_merger_.pop())) { - LOG_WARN("fail to pop item", KR(ret)); - } + consumers_[consumer_cnt_++] = item->iter_idx_; + if (OB_FAIL(rows_merger_.pop())) { + LOG_WARN("fail to pop item", KR(ret)); } } } + if (OB_SUCC(ret)) { + if (OB_FAIL(param_.dml_row_handler_->handle_update_row(*old_row, *new_row, datum_row))) { + LOG_WARN("fail to handle update row", KR(ret), KPC(old_row), KPC(new_row)); + } + } } return ret; } @@ -329,8 +312,7 @@ int ObDirectLoadSSTableDataFuse::init(const ObDirectLoadDataFuseParam ¶m, scan_merge_param.tablet_id_ = param.tablet_id_; scan_merge_param.table_data_desc_ = param.table_data_desc_; scan_merge_param.datum_utils_ = param.datum_utils_; - scan_merge_param.error_row_handler_ = param.error_row_handler_; - scan_merge_param.result_info_ = param.result_info_; + scan_merge_param.dml_row_handler_ = param.dml_row_handler_; if (OB_FAIL(scan_merge_.init(scan_merge_param, sstable_array, range))) { LOG_WARN("fail to init scan merge", KR(ret)); } @@ -401,8 +383,7 @@ int ObDirectLoadMultipleSSTableDataFuse::init( ObDirectLoadMultipleSSTableScanMergeParam scan_merge_param; scan_merge_param.table_data_desc_ = param.table_data_desc_; scan_merge_param.datum_utils_ = param.datum_utils_; - scan_merge_param.error_row_handler_ = param.error_row_handler_; - scan_merge_param.result_info_ = param.result_info_; + scan_merge_param.dml_row_handler_ = param.dml_row_handler_; if (OB_FAIL(scan_merge_.init(scan_merge_param, sstable_array, range_))) { LOG_WARN("fail to init scan merge", KR(ret)); } diff --git a/src/storage/direct_load/ob_direct_load_data_fuse.h b/src/storage/direct_load/ob_direct_load_data_fuse.h index b35533a9f9..4fdba798e1 100644 --- a/src/storage/direct_load/ob_direct_load_data_fuse.h +++ b/src/storage/direct_load/ob_direct_load_data_fuse.h @@ -4,9 +4,9 @@ #pragma once -#include "observer/table_load/ob_table_load_error_row_handler.h" #include "share/table/ob_table_load_define.h" #include "sql/resolver/cmd/ob_load_data_stmt.h" +#include "storage/direct_load/ob_direct_load_dml_row_handler.h" #include "storage/direct_load/ob_direct_load_multiple_datum_range.h" #include "storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h" #include "storage/direct_load/ob_direct_load_origin_table.h" @@ -26,15 +26,13 @@ public: ~ObDirectLoadDataFuseParam(); bool is_valid() const; TO_STRING_KV(K_(tablet_id), K_(store_column_count), K_(table_data_desc), KP_(datum_utils), - KP_(error_row_handler), KP_(result_info)); + KP_(dml_row_handler)); public: common::ObTabletID tablet_id_; int64_t store_column_count_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; - observer::ObTableLoadErrorRowHandler *error_row_handler_; - sql::ObLoadDupActionType dup_action_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; }; class ObDirectLoadDataFuse diff --git a/src/storage/direct_load/ob_direct_load_dml_row_handler.h b/src/storage/direct_load/ob_direct_load_dml_row_handler.h new file mode 100644 index 0000000000..6de19f1ead --- /dev/null +++ b/src/storage/direct_load/ob_direct_load_dml_row_handler.h @@ -0,0 +1,31 @@ +// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved. +// Author: +// suzhi.yt <> + +#pragma once + +#include "storage/blocksstable/ob_datum_row.h" + +namespace oceanbase +{ +namespace storage +{ + +class ObDirectLoadDMLRowHandler +{ +public: + ObDirectLoadDMLRowHandler() = default; + virtual ~ObDirectLoadDMLRowHandler() = default; + // handle rows direct insert into sstable + virtual int handle_insert_row(const blocksstable::ObDatumRow &row) = 0; + // handle rows with the same primary key in the imported data + virtual int handle_update_row(const blocksstable::ObDatumRow &row) = 0; + // handle rows with the same primary key between the imported data and the original data + virtual int handle_update_row(const blocksstable::ObDatumRow &old_row, + const blocksstable::ObDatumRow &new_row, + const blocksstable::ObDatumRow *&result_row) = 0; + DECLARE_PURE_VIRTUAL_TO_STRING; +}; + +} // namespace storage +} // namespace oceanbase diff --git a/src/storage/direct_load/ob_direct_load_easy_queue.h b/src/storage/direct_load/ob_direct_load_easy_queue.h index bf0ce53704..44ba859601 100644 --- a/src/storage/direct_load/ob_direct_load_easy_queue.h +++ b/src/storage/direct_load/ob_direct_load_easy_queue.h @@ -60,5 +60,5 @@ private: }; -} // namespace observer +} // namespace storage } // namespace oceanbase diff --git a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp index d44112b017..9201fe7e2e 100644 --- a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp +++ b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp @@ -14,7 +14,6 @@ namespace storage { using namespace common; using namespace blocksstable; -using namespace observer; /** * ObDirectLoadExternalMultiPartitionTableBuildParam diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp index a226ec85e4..461353c5a4 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp @@ -9,6 +9,7 @@ #include "share/stat/ob_stat_define.h" #include "share/table/ob_table_load_define.h" #include "storage/ddl/ob_direct_insert_sstable_ctx.h" +#include "storage/direct_load/ob_direct_load_dml_row_handler.h" #include "storage/direct_load/ob_direct_load_fast_heap_table.h" #include "storage/direct_load/ob_direct_load_insert_table_ctx.h" @@ -30,7 +31,7 @@ ObDirectLoadFastHeapTableBuildParam::ObDirectLoadFastHeapTableBuildParam() col_descs_(nullptr), insert_table_ctx_(nullptr), fast_heap_table_ctx_(nullptr), - result_info_(nullptr), + dml_row_handler_(nullptr), online_opt_stat_gather_(false) { } @@ -43,7 +44,7 @@ bool ObDirectLoadFastHeapTableBuildParam::is_valid() const { return tablet_id_.is_valid() && snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != fast_heap_table_ctx_ && - nullptr != result_info_ && nullptr != datum_utils_; + nullptr != dml_row_handler_ && nullptr != datum_utils_; } /** @@ -225,7 +226,11 @@ int ObDirectLoadFastHeapTableBuilder::append_row(const ObTabletID &tablet_id, LOG_WARN("fail to collect", KR(ret)); } else { ++row_count_; - ATOMIC_INC(¶m_.result_info_->rows_affected_); + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(param_.dml_row_handler_->handle_insert_row(datum_row_))) { + LOG_WARN("fail to handle insert row", KR(ret), K_(datum_row)); } } } diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h index b486eb8ffa..81efc3e713 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h @@ -24,6 +24,7 @@ namespace storage { class ObDirectLoadInsertTableContext; class ObSSTableInsertSliceWriter; +class ObDirectLoadDMLRowHandler; struct ObDirectLoadFastHeapTableBuildParam { @@ -32,7 +33,7 @@ public: ~ObDirectLoadFastHeapTableBuildParam(); bool is_valid() const; TO_STRING_KV(K_(tablet_id), K_(snapshot_version), K_(table_data_desc), KP_(insert_table_ctx), - KP_(fast_heap_table_ctx), KP_(result_info), K_(online_opt_stat_gather)); + KP_(fast_heap_table_ctx), KP_(dml_row_handler), K_(online_opt_stat_gather)); public: common::ObTabletID tablet_id_; int64_t snapshot_version_; @@ -41,7 +42,7 @@ public: const common::ObIArray *col_descs_; ObDirectLoadInsertTableContext *insert_table_ctx_; ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; bool online_opt_stat_gather_; }; diff --git a/src/storage/direct_load/ob_direct_load_mem_context.h b/src/storage/direct_load/ob_direct_load_mem_context.h index a3e69cfe21..b04c2fea90 100644 --- a/src/storage/direct_load/ob_direct_load_mem_context.h +++ b/src/storage/direct_load/ob_direct_load_mem_context.h @@ -5,12 +5,12 @@ #ifndef OB_DIRECT_LOAD_MEM_CONTEXT_H_ #define OB_DIRECT_LOAD_MEM_CONTEXT_H_ -#include "storage/direct_load/ob_direct_load_easy_queue.h" -#include "storage/direct_load/ob_direct_load_mem_define.h" -#include "storage/direct_load/ob_direct_load_i_table.h" -#include "storage/direct_load/ob_direct_load_table_data_desc.h" -#include "observer/table_load/ob_table_load_error_row_handler.h" #include "share/table/ob_table_load_define.h" +#include "storage/direct_load/ob_direct_load_easy_queue.h" +#include "storage/direct_load/ob_direct_load_dml_row_handler.h" +#include "storage/direct_load/ob_direct_load_i_table.h" +#include "storage/direct_load/ob_direct_load_mem_define.h" +#include "storage/direct_load/ob_direct_load_table_data_desc.h" namespace oceanbase { @@ -49,8 +49,8 @@ public: need_sort_(false), mem_load_task_count_(0), column_count_(0), + dml_row_handler_(nullptr), file_mgr_(nullptr), - result_info_(nullptr), fly_mem_chunk_count_(0), finish_compact_count_(0), mem_dump_task_count_(0), @@ -76,9 +76,8 @@ public: bool need_sort_; // false: sstable, true: external_table int32_t mem_load_task_count_; int32_t column_count_; - observer::ObTableLoadErrorRowHandler *error_row_handler_; + ObDirectLoadDMLRowHandler *dml_row_handler_; ObDirectLoadTmpFileManager *file_mgr_; - table::ObTableLoadResultInfo *result_info_; ObDirectLoadEasyQueue mem_chunk_queue_; int64_t fly_mem_chunk_count_; diff --git a/src/storage/direct_load/ob_direct_load_mem_dump.cpp b/src/storage/direct_load/ob_direct_load_mem_dump.cpp index ceedb549e8..8a046e42d4 100644 --- a/src/storage/direct_load/ob_direct_load_mem_dump.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_dump.cpp @@ -5,9 +5,6 @@ #define USING_LOG_PREFIX STORAGE #include "storage/direct_load/ob_direct_load_mem_dump.h" -#include "observer/table_load/ob_table_load_mem_compactor.h" -#include "observer/table_load/ob_table_load_store_ctx.h" -#include "observer/table_load/ob_table_load_table_ctx.h" #include "storage/direct_load/ob_direct_load_external_table.h" #include "storage/direct_load/ob_direct_load_external_table_builder.h" #include "storage/direct_load/ob_direct_load_external_table_compactor.h" @@ -20,7 +17,6 @@ namespace storage { using namespace common; using namespace blocksstable; -using namespace observer; using namespace table; using namespace sql; @@ -268,22 +264,11 @@ int ObDirectLoadMemDump::dump_tables() LOG_WARN("fail to transfer dataum row", KR(ret)); } else if (OB_FAIL(table_builder->append_row(external_row->tablet_id_, datum_row))) { if (OB_LIKELY(OB_ERR_PRIMARY_KEY_DUPLICATE == ret)) { - int tmp_ret = OB_SUCCESS; - if (mem_ctx_->error_row_handler_->get_action() == ObLoadDupActionType::LOAD_REPLACE) { - ATOMIC_AAF(&mem_ctx_->result_info_->rows_affected_, 2); - ATOMIC_INC(&mem_ctx_->result_info_->deleted_); - } else if (mem_ctx_->error_row_handler_->get_action() == ObLoadDupActionType::LOAD_IGNORE) { - ATOMIC_INC(&mem_ctx_->result_info_->skipped_); - } else if (mem_ctx_->error_row_handler_->get_action() == ObLoadDupActionType::LOAD_STOP_ON_DUP) { - if (OB_TMP_FAIL(mem_ctx_->error_row_handler_->append_error_row(datum_row))) { - LOG_WARN("failed to append row to error row handler", K(tmp_ret), K(datum_row)); - } - } - if (OB_LIKELY(OB_SUCCESS == tmp_ret)) { - ret = OB_SUCCESS; + if (OB_FAIL(mem_ctx_->dml_row_handler_->handle_update_row(datum_row))) { + LOG_WARN("fail to handle update row", KR(ret), K(datum_row)); } } else { - LOG_WARN("fail to append row", K(ret), K(datum_row)); + LOG_WARN("fail to append row", KR(ret), K(datum_row)); } } } diff --git a/src/storage/direct_load/ob_direct_load_mem_sample.cpp b/src/storage/direct_load/ob_direct_load_mem_sample.cpp index af5d0433da..ce45f378b0 100644 --- a/src/storage/direct_load/ob_direct_load_mem_sample.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_sample.cpp @@ -4,16 +4,11 @@ #define USING_LOG_PREFIX STORAGE -#include "observer/table_load/ob_table_load_partition_location.h" #include "observer/table_load/ob_table_load_stat.h" #include "storage/direct_load/ob_direct_load_mem_sample.h" -#include "observer/table_load/ob_table_load_store_ctx.h" -#include "observer/table_load/ob_table_load_table_ctx.h" #include "observer/table_load/ob_table_load_task.h" -#include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_task_scheduler.h" #include "share/table/ob_table_load_handle.h" -#include "observer/table_load/ob_table_load_mem_compactor.h" namespace oceanbase { diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp index daa176c64c..24e91687f8 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp @@ -46,8 +46,7 @@ ObDirectLoadMergeParam::ObDirectLoadMergeParam() is_fast_heap_table_(false), online_opt_stat_gather_(false), insert_table_ctx_(nullptr), - error_row_handler_(nullptr), - result_info_(nullptr) + dml_row_handler_(nullptr) { } @@ -59,7 +58,7 @@ bool ObDirectLoadMergeParam::is_valid() const { return OB_INVALID_ID != table_id_ && 0 < rowkey_column_num_ && 0 < store_column_count_ && snapshot_version_ > 0 && table_data_desc_.is_valid() && nullptr != datum_utils_ && - nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != error_row_handler_; + nullptr != col_descs_ && nullptr != insert_table_ctx_ && nullptr != dml_row_handler_; } /** diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.h b/src/storage/direct_load/ob_direct_load_merge_ctx.h index cd6e7f1cb4..64e7d4cc76 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.h +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.h @@ -18,11 +18,6 @@ namespace oceanbase { -namespace observer -{ -class ObTableLoadErrorRowHandler; -class ObTableLoadSchema; -} // namespace observer namespace common { class ObOptOSGColumnStat; class ObOptTableStat; @@ -37,6 +32,7 @@ class ObDirectLoadSSTable; class ObDirectLoadMultipleSSTable; class ObDirectLoadMultipleHeapTable; class ObDirectLoadMultipleMergeRangeSplitter; +class ObDirectLoadDMLRowHandler; struct ObDirectLoadMergeParam { @@ -47,7 +43,7 @@ public: TO_STRING_KV(K_(table_id), K_(target_table_id), K_(rowkey_column_num), K_(store_column_count), K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), K_(is_heap_table), K_(is_fast_heap_table), K_(online_opt_stat_gather), KP_(insert_table_ctx), - KP_(error_row_handler), KP_(result_info)); + KP_(dml_row_handler)); public: uint64_t table_id_; uint64_t target_table_id_; @@ -61,8 +57,7 @@ public: bool is_fast_heap_table_; bool online_opt_stat_gather_; ObDirectLoadInsertTableContext *insert_table_ctx_; - observer::ObTableLoadErrorRowHandler *error_row_handler_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; }; class ObDirectLoadMergeCtx diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp index d910b51ff8..05a6c30469 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp @@ -5,7 +5,7 @@ #define USING_LOG_PREFIX STORAGE #include "storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h" -#include "observer/table_load/ob_table_load_error_row_handler.h" +#include "storage/direct_load/ob_direct_load_dml_row_handler.h" #include "storage/blocksstable/ob_datum_range.h" #include "storage/direct_load/ob_direct_load_multiple_datum_range.h" #include "storage/direct_load/ob_direct_load_multiple_sstable_scanner.h" @@ -16,7 +16,6 @@ namespace storage { using namespace common; using namespace blocksstable; -using namespace observer; using namespace sql; /** @@ -24,7 +23,7 @@ using namespace sql; */ ObDirectLoadMultipleSSTableScanMergeParam::ObDirectLoadMultipleSSTableScanMergeParam() - : datum_utils_(nullptr), error_row_handler_(nullptr), result_info_(nullptr) + : datum_utils_(nullptr), dml_row_handler_(nullptr) { } @@ -32,8 +31,7 @@ ObDirectLoadMultipleSSTableScanMergeParam::~ObDirectLoadMultipleSSTableScanMerge bool ObDirectLoadMultipleSSTableScanMergeParam::is_valid() const { - return table_data_desc_.is_valid() && nullptr != datum_utils_ && nullptr != error_row_handler_ && - nullptr != result_info_; + return table_data_desc_.is_valid() && nullptr != datum_utils_ && nullptr != dml_row_handler_; } /** @@ -42,7 +40,7 @@ bool ObDirectLoadMultipleSSTableScanMergeParam::is_valid() const ObDirectLoadMultipleSSTableScanMerge::ObDirectLoadMultipleSSTableScanMerge() : datum_utils_(nullptr), - error_row_handler_(nullptr), + dml_row_handler_(nullptr), range_(nullptr), consumers_(nullptr), consumer_cnt_(0), @@ -59,7 +57,7 @@ void ObDirectLoadMultipleSSTableScanMerge::reset() { table_data_desc_.reset(); datum_utils_ = nullptr; - error_row_handler_ = nullptr; + dml_row_handler_ = nullptr; range_ = nullptr; for (int64_t i = 0; i < scanners_.count(); ++i) { scanners_[i]->~ObDirectLoadMultipleSSTableScanner(); @@ -131,8 +129,7 @@ int ObDirectLoadMultipleSSTableScanMerge::init( if (OB_SUCC(ret)) { table_data_desc_ = param.table_data_desc_; datum_utils_ = param.datum_utils_; - error_row_handler_ = param.error_row_handler_; - result_info_ = param.result_info_; + dml_row_handler_ = param.dml_row_handler_; range_ = ⦥ is_inited_ = true; } @@ -215,20 +212,11 @@ int ObDirectLoadMultipleSSTableScanMerge::inner_get_next_row( } else if (OB_LIKELY(rows_merger_->is_unique_champion())) { datum_row = top_item->row_; } else { - // record same rowkey row - if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_REPLACE) { - ATOMIC_AAF(&result_info_->rows_affected_, 2); - ATOMIC_INC(&result_info_->deleted_); - } else if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_IGNORE) { - ATOMIC_INC(&result_info_->skipped_); - } else if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_STOP_ON_DUP) { - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(top_item->row_->to_datums(datum_row_.storage_datums_, - datum_row_.count_))) { - LOG_WARN("fail to transfer external row to datums", KR(tmp_ret)); - } else if (OB_TMP_FAIL(error_row_handler_->append_error_row(datum_row_))) { - LOG_WARN("fail to append row to error row handler", KR(tmp_ret), K(datum_row_)); - } + // handle same rowkey row + if (OB_FAIL(top_item->row_->to_datums(datum_row_.storage_datums_, datum_row_.count_))) { + LOG_WARN("fail to transfer external row to datums", KR(ret)); + } else if (OB_FAIL(dml_row_handler_->handle_update_row(datum_row_))) { + LOG_WARN("fail to handle update row", KR(ret), K(datum_row_)); } } if (OB_SUCC(ret)) { diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h index 97f3307cd9..25c5b14251 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h @@ -19,12 +19,9 @@ namespace blocksstable { class ObStorageDatumUtils; } // namespace blocksstable -namespace observer -{ -class ObTableLoadErrorRowHandler; -} // namespace observer namespace storage { +class ObDirectLoadDMLRowHandler; struct ObDirectLoadMultipleSSTableScanMergeParam { @@ -32,12 +29,11 @@ public: ObDirectLoadMultipleSSTableScanMergeParam(); ~ObDirectLoadMultipleSSTableScanMergeParam(); bool is_valid() const; - TO_STRING_KV(K_(table_data_desc), KP_(datum_utils), KP_(error_row_handler), KP_(result_info)); + TO_STRING_KV(K_(table_data_desc), KP_(datum_utils), KP_(dml_row_handler)); public: ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; - observer::ObTableLoadErrorRowHandler *error_row_handler_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; }; class ObDirectLoadMultipleSSTableScanMerge : public ObIStoreRowIterator @@ -66,8 +62,7 @@ private: common::ObArenaAllocator allocator_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; - observer::ObTableLoadErrorRowHandler *error_row_handler_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; const ObDirectLoadMultipleDatumRange *range_; common::ObSEArray scanners_; int64_t *consumers_; diff --git a/src/storage/direct_load/ob_direct_load_origin_table.cpp b/src/storage/direct_load/ob_direct_load_origin_table.cpp index 3718965f58..8010b9a449 100644 --- a/src/storage/direct_load/ob_direct_load_origin_table.cpp +++ b/src/storage/direct_load/ob_direct_load_origin_table.cpp @@ -16,7 +16,6 @@ namespace storage { using namespace common; using namespace blocksstable; -using namespace observer; using namespace share; using namespace share::schema; diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp index 07a08f747b..1dcdb05298 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp @@ -21,7 +21,6 @@ namespace storage { using namespace common; using namespace blocksstable; -using namespace observer; using namespace share; /** @@ -230,9 +229,7 @@ int ObDirectLoadPartitionRangeMergeTask::RowIterator::init( data_fuse_param.store_column_count_ = merge_param.store_column_count_; data_fuse_param.table_data_desc_ = merge_param.table_data_desc_; data_fuse_param.datum_utils_ = merge_param.datum_utils_; - data_fuse_param.error_row_handler_ = merge_param.error_row_handler_; - data_fuse_param.dup_action_ = merge_param.error_row_handler_->get_action(); - data_fuse_param.result_info_ = merge_param.result_info_; + data_fuse_param.dml_row_handler_ = merge_param.dml_row_handler_; if (OB_FAIL(data_fuse_.init(data_fuse_param, origin_table, sstable_array, range))) { LOG_WARN("fail to init data fuse", KR(ret)); } @@ -386,9 +383,7 @@ int ObDirectLoadPartitionRangeMultipleMergeTask::RowIterator::init( data_fuse_param.store_column_count_ = merge_param.store_column_count_; data_fuse_param.table_data_desc_ = merge_param.table_data_desc_; data_fuse_param.datum_utils_ = merge_param.datum_utils_; - data_fuse_param.error_row_handler_ = merge_param.error_row_handler_; - data_fuse_param.dup_action_ = merge_param.error_row_handler_->get_action(); - data_fuse_param.result_info_ = merge_param.result_info_; + data_fuse_param.dml_row_handler_ = merge_param.dml_row_handler_; if (OB_FAIL(data_fuse_.init(data_fuse_param, origin_table, sstable_array, range))) { LOG_WARN("fail to init data fuse", KR(ret)); } @@ -516,7 +511,7 @@ int ObDirectLoadPartitionRangeMultipleMergeTask::construct_row_iter( ObDirectLoadPartitionHeapTableMergeTask::RowIterator::RowIterator() : deserialize_datums_(nullptr), deserialize_datum_cnt_(0), - result_info_(nullptr), + dml_row_handler_(nullptr), is_inited_(false) { } @@ -562,7 +557,7 @@ int ObDirectLoadPartitionHeapTableMergeTask::RowIterator::init( ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); deserialize_datum_cnt_ = merge_param.store_column_count_ - merge_param.rowkey_column_num_; pk_interval_ = pk_interval; - result_info_ = merge_param.result_info_; + dml_row_handler_ = merge_param.dml_row_handler_; is_inited_ = true; } } @@ -592,7 +587,11 @@ int ObDirectLoadPartitionHeapTableMergeTask::RowIterator::get_next_row( // fill hide pk datum_row_.storage_datums_[0].set_int(pk_seq); result_row = &datum_row_; - ATOMIC_INC(&result_info_->rows_affected_); + } + if (OB_SUCC(ret)) { + if (OB_FAIL(dml_row_handler_->handle_insert_row(*result_row))) { + LOG_WARN("fail to handle insert row", KR(ret), KPC(result_row)); + } } } return ret; @@ -671,7 +670,7 @@ int ObDirectLoadPartitionHeapTableMergeTask::construct_row_iter( ObDirectLoadPartitionHeapTableMultipleMergeTask::RowIterator::RowIterator() : deserialize_datums_(nullptr), deserialize_datum_cnt_(0), - result_info_(nullptr), + dml_row_handler_(nullptr), is_inited_(false) { } @@ -715,7 +714,7 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::RowIterator::init( ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); deserialize_datum_cnt_ = merge_param.store_column_count_ - merge_param.rowkey_column_num_; pk_interval_ = pk_interval; - result_info_ = merge_param.result_info_; + dml_row_handler_ = merge_param.dml_row_handler_; is_inited_ = true; } } @@ -746,7 +745,11 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::RowIterator::get_next_row( // fill hide pk datum_row_.storage_datums_[0].set_int(pk_seq); result_row = &datum_row_; - ATOMIC_INC(&result_info_->rows_affected_); + } + if (OB_SUCC(ret)) { + if (OB_FAIL(dml_row_handler_->handle_insert_row(*result_row))) { + LOG_WARN("fail to handle insert row", KR(ret), KPC(result_row)); + } } } return ret; @@ -829,7 +832,7 @@ ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::RowIterat pos_(0), deserialize_datums_(nullptr), deserialize_datum_cnt_(0), - result_info_(nullptr), + dml_row_handler_(nullptr), is_inited_(false) { } @@ -882,7 +885,7 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::init( table_data_desc_ = merge_param.table_data_desc_; heap_table_array_ = heap_table_array; pk_interval_ = pk_interval; - result_info_ = merge_param.result_info_; + dml_row_handler_ = merge_param.dml_row_handler_; is_inited_ = true; } } @@ -899,6 +902,7 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::get_n LOG_WARN("ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator not init", KR(ret), KP(this)); } else { + // get row from origin table if (pos_ == 0) { const ObDatumRow *datum_row = nullptr; if (OB_FAIL(origin_iter_->get_next_row(datum_row))) { @@ -931,6 +935,7 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::get_n result_row = &datum_row_; } } + // get row from load data while (OB_SUCC(ret) && result_row == nullptr) { const ObDirectLoadMultipleExternalRow *external_row = nullptr; uint64_t pk_seq = OB_INVALID_ID; @@ -954,7 +959,11 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::get_n // fill hide pk datum_row_.storage_datums_[0].set_int(pk_seq); result_row = &datum_row_; - ATOMIC_INC(&result_info_->rows_affected_); + } + if (OB_SUCC(ret) && nullptr != result_row) { + if (OB_FAIL(dml_row_handler_->handle_insert_row(*result_row))) { + LOG_WARN("fail to handle insert row", KR(ret), KPC(result_row)); + } } } } diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.h b/src/storage/direct_load/ob_direct_load_partition_merge_task.h index a3633ab280..3228c55809 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.h +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.h @@ -162,7 +162,7 @@ private: blocksstable::ObStorageDatum *deserialize_datums_; int64_t deserialize_datum_cnt_; share::ObTabletCacheInterval pk_interval_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; bool is_inited_; }; private: @@ -199,7 +199,7 @@ private: blocksstable::ObStorageDatum *deserialize_datums_; int64_t deserialize_datum_cnt_; share::ObTabletCacheInterval pk_interval_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; bool is_inited_; }; private: @@ -249,7 +249,7 @@ private: blocksstable::ObStorageDatum *deserialize_datums_; int64_t deserialize_datum_cnt_; share::ObTabletCacheInterval pk_interval_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; bool is_inited_; }; private: diff --git a/src/storage/direct_load/ob_direct_load_sstable_builder.cpp b/src/storage/direct_load/ob_direct_load_sstable_builder.cpp index 7db6b80933..1883dab93d 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_builder.cpp @@ -13,7 +13,6 @@ namespace storage { using namespace common; using namespace blocksstable; -using namespace observer; /** * ObDirectLoadIndexBlock diff --git a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp index 36f0098152..345d124313 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp @@ -5,8 +5,8 @@ #define USING_LOG_PREFIX STORAGE #include "storage/direct_load/ob_direct_load_sstable_scan_merge.h" -#include "observer/table_load/ob_table_load_error_row_handler.h" #include "storage/blocksstable/ob_datum_range.h" +#include "storage/direct_load/ob_direct_load_dml_row_handler.h" #include "storage/direct_load/ob_direct_load_sstable_scanner.h" namespace oceanbase @@ -15,7 +15,6 @@ namespace storage { using namespace common; using namespace blocksstable; -using namespace observer; using namespace sql; /** @@ -23,7 +22,7 @@ using namespace sql; */ ObDirectLoadSSTableScanMergeParam::ObDirectLoadSSTableScanMergeParam() - : datum_utils_(nullptr), error_row_handler_(nullptr), result_info_(nullptr) + : datum_utils_(nullptr), dml_row_handler_(nullptr) { } @@ -34,7 +33,7 @@ ObDirectLoadSSTableScanMergeParam::~ObDirectLoadSSTableScanMergeParam() bool ObDirectLoadSSTableScanMergeParam::is_valid() const { return tablet_id_.is_valid() && table_data_desc_.is_valid() && nullptr != datum_utils_ && - nullptr != error_row_handler_ && nullptr != result_info_; + nullptr != dml_row_handler_; } /** @@ -43,7 +42,7 @@ bool ObDirectLoadSSTableScanMergeParam::is_valid() const ObDirectLoadSSTableScanMerge::ObDirectLoadSSTableScanMerge() : datum_utils_(nullptr), - error_row_handler_(nullptr), + dml_row_handler_(nullptr), range_(nullptr), consumers_(nullptr), consumer_cnt_(0), @@ -64,7 +63,7 @@ void ObDirectLoadSSTableScanMerge::reset() tablet_id_.reset(); table_data_desc_.reset(); datum_utils_ = nullptr; - error_row_handler_ = nullptr; + dml_row_handler_ = nullptr; range_ = nullptr; for (int64_t i = 0; i < scanners_.count(); ++i) { scanners_[i]->~ObDirectLoadSSTableScanner(); @@ -136,8 +135,7 @@ int ObDirectLoadSSTableScanMerge::init(const ObDirectLoadSSTableScanMergeParam & tablet_id_ = param.tablet_id_; table_data_desc_ = param.table_data_desc_; datum_utils_ = param.datum_utils_; - error_row_handler_ = param.error_row_handler_; - result_info_ = param.result_info_; + dml_row_handler_ = param.dml_row_handler_; range_ = ⦥ is_inited_ = true; } @@ -219,20 +217,11 @@ int ObDirectLoadSSTableScanMerge::inner_get_next_row(const ObDirectLoadExternalR } else if (OB_LIKELY(rows_merger_->is_unique_champion())) { external_row = top_item->external_row_; } else { - // record same rowkey row - if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_REPLACE) { - ATOMIC_AAF(&result_info_->rows_affected_, 2); - ATOMIC_INC(&result_info_->deleted_); - } else if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_IGNORE) { - ATOMIC_INC(&result_info_->skipped_); - } else if (error_row_handler_->get_action() == ObLoadDupActionType::LOAD_STOP_ON_DUP) { - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(top_item->external_row_->to_datums(datum_row_.storage_datums_, - datum_row_.count_))) { - LOG_WARN("fail to transfer external row to datums", KR(tmp_ret)); - } else if (OB_TMP_FAIL(error_row_handler_->append_error_row(datum_row_))) { - LOG_WARN("fail to append row to error row handler", KR(tmp_ret), K(datum_row_)); - } + // handle same rowkey row + if (OB_FAIL(top_item->external_row_->to_datums(datum_row_.storage_datums_, datum_row_.count_))) { + LOG_WARN("fail to transfer external row to datums", KR(ret)); + } else if (OB_FAIL(dml_row_handler_->handle_update_row(datum_row_))) { + LOG_WARN("fail to handle update row", KR(ret), K(datum_row_)); } } if (OB_SUCC(ret)) { diff --git a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h index f4e2bd993a..06972aa6a4 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h +++ b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h @@ -20,26 +20,23 @@ namespace blocksstable class ObDatumRange; class ObStorageDatumUtils; } // namespace blocksstable -namespace observer -{ -class ObTableLoadErrorRowHandler; -} // namespace observer namespace storage { class ObDirectLoadExternalRow; +class ObDirectLoadDMLRowHandler; + struct ObDirectLoadSSTableScanMergeParam { public: ObDirectLoadSSTableScanMergeParam(); ~ObDirectLoadSSTableScanMergeParam(); bool is_valid() const; - TO_STRING_KV(K_(tablet_id), K_(table_data_desc), KP_(datum_utils), KP_(error_row_handler), KP_(result_info)); + TO_STRING_KV(K_(tablet_id), K_(table_data_desc), KP_(datum_utils), KP_(dml_row_handler)); public: common::ObTabletID tablet_id_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; - observer::ObTableLoadErrorRowHandler *error_row_handler_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; }; class ObDirectLoadSSTableScanMerge : public ObIStoreRowIterator @@ -69,8 +66,7 @@ private: common::ObTabletID tablet_id_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; - observer::ObTableLoadErrorRowHandler *error_row_handler_; - table::ObTableLoadResultInfo *result_info_; + ObDirectLoadDMLRowHandler *dml_row_handler_; const blocksstable::ObDatumRange *range_; common::ObSEArray scanners_; int64_t *consumers_; diff --git a/src/storage/direct_load/ob_direct_load_sstable_scanner.cpp b/src/storage/direct_load/ob_direct_load_sstable_scanner.cpp index 3761118626..9fefc14f39 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_scanner.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_scanner.cpp @@ -13,7 +13,6 @@ namespace storage { using namespace common; using namespace blocksstable; -using namespace observer; /** * ObDirectLoadSSTableScanner diff --git a/src/storage/direct_load/ob_direct_load_table_store.cpp b/src/storage/direct_load/ob_direct_load_table_store.cpp index 093e4b6977..8cbf5a792f 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.cpp +++ b/src/storage/direct_load/ob_direct_load_table_store.cpp @@ -32,9 +32,9 @@ ObDirectLoadTableStoreParam::ObDirectLoadTableStoreParam() is_fast_heap_table_(false), insert_table_ctx_(nullptr), fast_heap_table_ctx_(nullptr), + dml_row_handler_(nullptr), extra_buf_(nullptr), - extra_buf_size_(0), - result_info_(nullptr) + extra_buf_size_(0) { } @@ -48,7 +48,7 @@ bool ObDirectLoadTableStoreParam::is_valid() const nullptr != file_mgr_ && (!is_fast_heap_table_ || (nullptr != insert_table_ctx_ && nullptr != fast_heap_table_ctx_)) && - nullptr != result_info_; + nullptr != dml_row_handler_; } /** @@ -105,7 +105,7 @@ int ObDirectLoadTableStoreBucket::init(const ObDirectLoadTableStoreParam ¶m, fast_heap_table_build_param.col_descs_ = param.col_descs_; fast_heap_table_build_param.insert_table_ctx_ = param.insert_table_ctx_; fast_heap_table_build_param.fast_heap_table_ctx_ = param.fast_heap_table_ctx_; - fast_heap_table_build_param.result_info_ = param.result_info_; + fast_heap_table_build_param.dml_row_handler_ = param.dml_row_handler_; fast_heap_table_build_param.online_opt_stat_gather_ = param.online_opt_stat_gather_; ObDirectLoadFastHeapTableBuilder *fast_heap_table_builder = nullptr; if (OB_ISNULL(fast_heap_table_builder = diff --git a/src/storage/direct_load/ob_direct_load_table_store.h b/src/storage/direct_load/ob_direct_load_table_store.h index f756d8230b..4452509d27 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.h +++ b/src/storage/direct_load/ob_direct_load_table_store.h @@ -19,6 +19,7 @@ class ObDirectLoadTmpFileManager; class ObDirectLoadTableBuilderAllocator; class ObDirectLoadInsertTableContext; class ObDirectLoadFastHeapTableContext; +class ObDirectLoadDMLRowHandler; struct ObDirectLoadTableStoreParam { @@ -28,7 +29,7 @@ public: bool is_valid() const; TO_STRING_KV(K_(snapshot_version), K_(table_data_desc), KP_(datum_utils), KP_(col_descs), KP_(file_mgr), K_(is_multiple_mode), K_(is_fast_heap_table), KP_(insert_table_ctx), - KP_(fast_heap_table_ctx), KP_(extra_buf), K_(extra_buf_size), KP_(result_info)); + KP_(fast_heap_table_ctx), KP_(dml_row_handler), KP_(extra_buf), K_(extra_buf_size)); public: int64_t snapshot_version_; ObDirectLoadTableDataDesc table_data_desc_; @@ -40,9 +41,9 @@ public: bool online_opt_stat_gather_; ObDirectLoadInsertTableContext *insert_table_ctx_; ObDirectLoadFastHeapTableContext *fast_heap_table_ctx_; + ObDirectLoadDMLRowHandler *dml_row_handler_; char *extra_buf_; int64_t extra_buf_size_; - table::ObTableLoadResultInfo *result_info_; }; class ObDirectLoadTableStoreBucket