From ef9887b33b260adabe86e1f16433baed595218f9 Mon Sep 17 00:00:00 2001 From: yongshige <598633031@qq.com> Date: Mon, 14 Aug 2023 07:12:41 +0000 Subject: [PATCH] fix compact bug --- .../ob_table_load_error_row_handler.cpp | 89 +++++++++++++++++++ .../ob_table_load_error_row_handler.h | 4 + .../engine/cmd/ob_load_data_direct_impl.cpp | 6 +- src/sql/engine/cmd/ob_load_data_direct_impl.h | 1 + .../direct_load/ob_direct_load_compare.cpp | 15 ++-- .../direct_load/ob_direct_load_compare.h | 10 ++- .../ob_direct_load_dml_row_handler.h | 6 ++ .../direct_load/ob_direct_load_mem_dump.cpp | 5 +- ...irect_load_multiple_sstable_scan_merge.cpp | 14 +-- ..._direct_load_multiple_sstable_scan_merge.h | 1 + .../ob_direct_load_sstable_scan_merge.cpp | 14 +-- .../ob_direct_load_sstable_scan_merge.h | 1 + 12 files changed, 140 insertions(+), 26 deletions(-) diff --git a/src/observer/table_load/ob_table_load_error_row_handler.cpp b/src/observer/table_load/ob_table_load_error_row_handler.cpp index b0d06a64a..5edc74aec 100644 --- a/src/observer/table_load/ob_table_load_error_row_handler.cpp +++ b/src/observer/table_load/ob_table_load_error_row_handler.cpp @@ -97,6 +97,95 @@ int ObTableLoadErrorRowHandler::handle_update_row(const ObDatumRow &row) return ret; } +int ObTableLoadErrorRowHandler::handle_update_row( + common::ObArray &rows, const ObDirectLoadExternalRow *&row) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (OB_UNLIKELY(rows.count() < 2)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret)); + } else { + int64_t duplicate_row_count = rows.count() - 1; + std::sort(rows.begin(), rows.end(), + [](const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs) { + return lhs->seq_no_ < rhs->seq_no_; + }); + if (ObLoadDupActionType::LOAD_STOP_ON_DUP == dup_action_) { + if (0 == max_error_row_count_) { + ret = OB_ERR_PRIMARY_KEY_DUPLICATE; + } else { + ObMutexGuard guard(mutex_); + error_row_count_ += duplicate_row_count; + if (error_row_count_ >= max_error_row_count_) { + ret = OB_ERR_TOO_MANY_ROWS; + LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count), + K_(error_row_count)); + } + } + ATOMIC_AAF(&job_stat_->detected_error_rows_, duplicate_row_count); + } else if (ObLoadDupActionType::LOAD_REPLACE == dup_action_) { + ATOMIC_AAF(&result_info_->rows_affected_, 2 * duplicate_row_count); + ATOMIC_AAF(&result_info_->deleted_, duplicate_row_count); + row = rows.at(duplicate_row_count); + } else if (ObLoadDupActionType::LOAD_IGNORE == dup_action_) { + ATOMIC_AAF(&result_info_->skipped_, duplicate_row_count); + row = rows.at(0); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected dup action", KR(ret), K_(dup_action)); + } + } + return ret; +} + +int ObTableLoadErrorRowHandler::handle_update_row( + common::ObArray &rows, + const ObDirectLoadMultipleDatumRow *&row) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (OB_UNLIKELY(rows.count() < 2)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret)); + } else { + int64_t duplicate_row_count = rows.count() - 1; + std::sort(rows.begin(), rows.end(), + [](const ObDirectLoadMultipleDatumRow *lhs, const ObDirectLoadMultipleDatumRow *rhs) { + return lhs->seq_no_ < rhs->seq_no_; + }); + if (ObLoadDupActionType::LOAD_STOP_ON_DUP == dup_action_) { + if (0 == max_error_row_count_) { + ret = OB_ERR_PRIMARY_KEY_DUPLICATE; + } else { + error_row_count_ += duplicate_row_count; + ObMutexGuard guard(mutex_); + if (error_row_count_ >= max_error_row_count_) { + ret = OB_ERR_TOO_MANY_ROWS; + LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count), + K_(error_row_count)); + } + } + ATOMIC_AAF(&job_stat_->detected_error_rows_, duplicate_row_count); + } else if (ObLoadDupActionType::LOAD_REPLACE == dup_action_) { + ATOMIC_AAF(&result_info_->rows_affected_, 2 * duplicate_row_count); + ATOMIC_AAF(&result_info_->deleted_, duplicate_row_count); + row = rows.at(duplicate_row_count); + } else if (ObLoadDupActionType::LOAD_IGNORE == dup_action_) { + ATOMIC_AAF(&result_info_->skipped_, duplicate_row_count); + row = rows.at(0); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected dup action", KR(ret), K_(dup_action)); + } + } + return ret; +} + int ObTableLoadErrorRowHandler::handle_update_row(const ObDatumRow &old_row, const ObDatumRow &new_row, const ObDatumRow *&result_row) diff --git a/src/observer/table_load/ob_table_load_error_row_handler.h b/src/observer/table_load/ob_table_load_error_row_handler.h index 10507a8a7..e6468a825 100644 --- a/src/observer/table_load/ob_table_load_error_row_handler.h +++ b/src/observer/table_load/ob_table_load_error_row_handler.h @@ -30,6 +30,10 @@ public: sql::ObLoadDataStat *job_stat); int handle_insert_row(const blocksstable::ObDatumRow &row) override; int handle_update_row(const blocksstable::ObDatumRow &row) override; + int handle_update_row(common::ObArray &rows, + const ObDirectLoadExternalRow *&row) override; + int handle_update_row(common::ObArray &rows, + const ObDirectLoadMultipleDatumRow *&row) override; int handle_update_row(const blocksstable::ObDatumRow &old_row, const blocksstable::ObDatumRow &new_row, const blocksstable::ObDatumRow *&result_row) override; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 5f4316664..17719e2dc 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -1440,7 +1440,8 @@ int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process() ObLoadDataDirectImpl::LargeFileLoadExecutor::LargeFileLoadExecutor() : next_worker_idx_(0), - next_chunk_id_(0) + next_chunk_id_(0), + total_line_no_(0) { } @@ -1519,12 +1520,13 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle handle->worker_idx_ = get_worker_idx(); handle->session_id_ = handle->worker_idx_ + 1; handle->data_desc_ = data_desc_; - handle->start_line_no_ = total_line_count_ + 1; + handle->start_line_no_ = total_line_no_ ; handle->result_.created_ts_ = ObTimeUtil::current_time(); handle->sequence_no_.chunk_id_ = chunk_id; handle->sequence_no_.chunk_seq_no_ = 0; handle->data_buffer_.swap(expr_buffer_); handle->data_buffer_.is_end_file_ = data_reader_.is_end_file(); + total_line_no_ += current_line_count; } return ret; } diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index a4bd54b98..12090afc9 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -434,6 +434,7 @@ private: DataReader data_reader_; int64_t next_worker_idx_; int64_t next_chunk_id_; + int64_t total_line_no_; DISALLOW_COPY_AND_ASSIGN(LargeFileLoadExecutor); }; diff --git a/src/storage/direct_load/ob_direct_load_compare.cpp b/src/storage/direct_load/ob_direct_load_compare.cpp index 03639ccec..b8ced5fe3 100644 --- a/src/storage/direct_load/ob_direct_load_compare.cpp +++ b/src/storage/direct_load/ob_direct_load_compare.cpp @@ -276,7 +276,8 @@ bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadConstDatumArray */ int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils, - sql::ObLoadDupActionType dup_action) + sql::ObLoadDupActionType dup_action, + bool ignore_seq_no) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -287,6 +288,7 @@ int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils, LOG_WARN("fail to init datum array compare", KR(ret)); } else { dup_action_ = dup_action; + ignore_seq_no_ = ignore_seq_no; is_inited_ = true; } } @@ -329,7 +331,7 @@ int ObDirectLoadExternalRowCompare::compare(const ObDirectLoadExternalRow *lhs, &rhs->rowkey_datum_array_, cmp_ret))) { LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret)); } else { - if (cmp_ret == 0) { + if (cmp_ret == 0 && !ignore_seq_no_) { if (lhs->seq_no_ == rhs->seq_no_) { cmp_ret = 0; } else if (lhs->seq_no_ > rhs->seq_no_) { @@ -355,7 +357,8 @@ int ObDirectLoadExternalRowCompare::compare(const ObDirectLoadExternalRow *lhs, */ int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils &datum_utils, - sql::ObLoadDupActionType dup_action) + sql::ObLoadDupActionType dup_action, + bool ignore_seq_no) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -366,12 +369,14 @@ int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils LOG_WARN("fail to init datum array compare", KR(ret)); } else { dup_action_ = dup_action; + ignore_seq_no_ = ignore_seq_no; is_inited_ = true; } } return ret; } + bool ObDirectLoadExternalMultiPartitionRowCompare::operator()( const ObDirectLoadExternalMultiPartitionRow *lhs, const ObDirectLoadExternalMultiPartitionRow *rhs) @@ -436,7 +441,7 @@ int ObDirectLoadExternalMultiPartitionRowCompare::compare( &rhs->external_row_.rowkey_datum_array_, cmp_ret))) { LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret)); - } else if (cmp_ret == 0) { + } else if (cmp_ret == 0 && !ignore_seq_no_) { if (lhs->external_row_.seq_no_ == rhs->external_row_.seq_no_) { cmp_ret = 0; } else if (lhs->external_row_.seq_no_ > rhs->external_row_.seq_no_) { @@ -475,7 +480,7 @@ int ObDirectLoadExternalMultiPartitionRowCompare::compare( } else if (OB_FAIL(datum_array_compare_.compare(&lhs->rowkey_datum_array_, &rhs->rowkey_datum_array_, cmp_ret))) { LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret)); - } else if (cmp_ret == 0) { + } else if (cmp_ret == 0 && !ignore_seq_no_) { if (lhs->seq_no_ == rhs->seq_no_) { cmp_ret = 0; } else if (lhs->seq_no_ > rhs->seq_no_) { diff --git a/src/storage/direct_load/ob_direct_load_compare.h b/src/storage/direct_load/ob_direct_load_compare.h index 819c3c9c5..917b8f7be 100644 --- a/src/storage/direct_load/ob_direct_load_compare.h +++ b/src/storage/direct_load/ob_direct_load_compare.h @@ -85,9 +85,9 @@ public: class ObDirectLoadExternalRowCompare { public: - ObDirectLoadExternalRowCompare() : result_code_(common::OB_SUCCESS), is_inited_(false) {} + ObDirectLoadExternalRowCompare() : result_code_(common::OB_SUCCESS), ignore_seq_no_(false), is_inited_(false) {} int init(const blocksstable::ObStorageDatumUtils &datum_utils, - sql::ObLoadDupActionType dup_action); + sql::ObLoadDupActionType dup_action, bool ignore_seq_no = false); int compare(const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs, int &cmp_ret); bool operator()(const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs); int get_error_code() const { return result_code_; } @@ -96,6 +96,7 @@ public: ObDirectLoadDatumArrayCompare datum_array_compare_; sql::ObLoadDupActionType dup_action_; int result_code_; + bool ignore_seq_no_; bool is_inited_; }; @@ -103,11 +104,11 @@ class ObDirectLoadExternalMultiPartitionRowCompare { public: ObDirectLoadExternalMultiPartitionRowCompare() - : result_code_(common::OB_SUCCESS), is_inited_(false) + : result_code_(common::OB_SUCCESS), ignore_seq_no_(false), is_inited_(false) { } int init(const blocksstable::ObStorageDatumUtils &datum_utils, - sql::ObLoadDupActionType dup_action); + sql::ObLoadDupActionType dup_action, bool ignore_seq_no = false); bool operator()(const ObDirectLoadExternalMultiPartitionRow *lhs, const ObDirectLoadExternalMultiPartitionRow *rhs); bool operator()(const ObDirectLoadConstExternalMultiPartitionRow *lhs, @@ -122,6 +123,7 @@ public: ObDirectLoadDatumArrayCompare datum_array_compare_; sql::ObLoadDupActionType dup_action_; int result_code_; + bool ignore_seq_no_; bool is_inited_; }; diff --git a/src/storage/direct_load/ob_direct_load_dml_row_handler.h b/src/storage/direct_load/ob_direct_load_dml_row_handler.h index 6de19f1ea..a21341e9b 100644 --- a/src/storage/direct_load/ob_direct_load_dml_row_handler.h +++ b/src/storage/direct_load/ob_direct_load_dml_row_handler.h @@ -5,6 +5,8 @@ #pragma once #include "storage/blocksstable/ob_datum_row.h" +#include "storage/direct_load/ob_direct_load_external_row.h" +#include "storage/direct_load/ob_direct_load_multiple_datum_row.h" namespace oceanbase { @@ -20,6 +22,10 @@ public: virtual int handle_insert_row(const blocksstable::ObDatumRow &row) = 0; // handle rows with the same primary key in the imported data virtual int handle_update_row(const blocksstable::ObDatumRow &row) = 0; + virtual int handle_update_row(common::ObArray &rows, + const ObDirectLoadExternalRow *&row) = 0; + virtual int handle_update_row(common::ObArray &rows, + const ObDirectLoadMultipleDatumRow *&row) = 0; // handle rows with the same primary key between the imported data and the original data virtual int handle_update_row(const blocksstable::ObDatumRow &old_row, const blocksstable::ObDatumRow &new_row, diff --git a/src/storage/direct_load/ob_direct_load_mem_dump.cpp b/src/storage/direct_load/ob_direct_load_mem_dump.cpp index ead5a0c40..6b049aca2 100644 --- a/src/storage/direct_load/ob_direct_load_mem_dump.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_dump.cpp @@ -204,6 +204,7 @@ int ObDirectLoadMemDump::dump_tables() ObArray> chunk_iters; //用于暂存iters ObDirectLoadExternalMerger merger; CompareType compare; + CompareType compare1; //不带上seq_no的排序 const RowType *external_row = nullptr; ObDatumRow datum_row; @@ -217,10 +218,12 @@ int ObDirectLoadMemDump::dump_tables() LOG_WARN("fail to allocate memory", KR(ret)); } else if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_))) { LOG_WARN("fail to init compare", KR(ret)); + } else if (OB_FAIL(compare1.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_, true))) { + LOG_WARN("fail to init compare1", KR(ret)); } for (int64_t i = 0; OB_SUCC(ret) && i < context_ptr_->mem_chunk_array_.count(); i++) { ChunkType *chunk = context_ptr_->mem_chunk_array_[i]; - auto iter = chunk->scan(range_.start_, range_.end_, compare); + auto iter = chunk->scan(range_.start_, range_.end_, compare1); if (OB_FAIL(chunk_iters.push_back(iter))) { LOG_WARN("fail to push iter", KR(ret)); } diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp index 80af0f713..832f7a5b9 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp @@ -208,17 +208,17 @@ int ObDirectLoadMultipleSSTableScanMerge::inner_get_next_row( } else { const LoserTreeItem *top_item = nullptr; datum_row = nullptr; + rows_.reuse(); while (OB_SUCC(ret) && !rows_merger_->empty() && nullptr == datum_row) { if (OB_FAIL(rows_merger_->top(top_item))) { LOG_WARN("fail to get top item", KR(ret)); + } else if (OB_FAIL(rows_.push_back(top_item->row_))) { + LOG_WARN("fail to push back", KR(ret)); } else if (OB_LIKELY(rows_merger_->is_unique_champion())) { - datum_row = top_item->row_; - } else { - // handle same rowkey row - if (OB_FAIL(top_item->row_->to_datums(datum_row_.storage_datums_, datum_row_.count_))) { - LOG_WARN("fail to transfer external row to datums", KR(ret)); - } else if (OB_FAIL(dml_row_handler_->handle_update_row(datum_row_))) { - LOG_WARN("fail to handle update row", KR(ret), K(datum_row_)); + if (OB_LIKELY(rows_.count() == 1)) { + datum_row = rows_.at(0); + } else if (OB_FAIL(dml_row_handler_->handle_update_row(rows_, datum_row))) { + LOG_WARN("fail to handle update row", KR(ret), K(rows_)); } } if (OB_SUCC(ret)) { diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h index 25c5b1425..3f7d1009e 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h @@ -72,6 +72,7 @@ private: ScanMergeLoserTree *loser_tree_; common::ObRowsMerger *rows_merger_; blocksstable::ObDatumRow datum_row_; + common::ObArray rows_; bool is_inited_; }; diff --git a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp index 32344244b..89fe8d13d 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp @@ -213,17 +213,17 @@ int ObDirectLoadSSTableScanMerge::inner_get_next_row(const ObDirectLoadExternalR } else { const LoserTreeItem *top_item = nullptr; external_row = nullptr; + rows_.reuse(); while (OB_SUCC(ret) && !rows_merger_->empty() && nullptr == external_row) { if (OB_FAIL(rows_merger_->top(top_item))) { LOG_WARN("fail to get top item", KR(ret)); + } else if (OB_FAIL(rows_.push_back(top_item->external_row_))) { + LOG_WARN("fail to push back", KR(ret)); } else if (OB_LIKELY(rows_merger_->is_unique_champion())) { - external_row = top_item->external_row_; - } else { - // handle same rowkey row - if (OB_FAIL(top_item->external_row_->to_datums(datum_row_.storage_datums_, datum_row_.count_))) { - LOG_WARN("fail to transfer external row to datums", KR(ret)); - } else if (OB_FAIL(dml_row_handler_->handle_update_row(datum_row_))) { - LOG_WARN("fail to handle update row", KR(ret), K(datum_row_)); + if (OB_LIKELY(rows_.count() == 1)) { + external_row = rows_.at(0); + } else if (OB_FAIL(dml_row_handler_->handle_update_row(rows_, external_row))) { + LOG_WARN("fail to handle update row", KR(ret), K(rows_)); } } if (OB_SUCC(ret)) { diff --git a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h index 06972aa6a..31fb177b2 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h +++ b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h @@ -76,6 +76,7 @@ private: ScanMergeLoserTree *loser_tree_; common::ObRowsMerger *rows_merger_; blocksstable::ObDatumRow datum_row_; + common::ObArray rows_; bool is_inited_; };