fix compact bug

This commit is contained in:
yongshige 2023-08-14 07:12:41 +00:00 committed by ob-robot
parent be7a0ce760
commit ef9887b33b
12 changed files with 140 additions and 26 deletions

View File

@ -97,6 +97,95 @@ int ObTableLoadErrorRowHandler::handle_update_row(const ObDatumRow &row)
return ret;
}
int ObTableLoadErrorRowHandler::handle_update_row(
common::ObArray<const ObDirectLoadExternalRow *> &rows, const ObDirectLoadExternalRow *&row)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_UNLIKELY(rows.count() < 2)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret));
} else {
int64_t duplicate_row_count = rows.count() - 1;
std::sort(rows.begin(), rows.end(),
[](const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs) {
return lhs->seq_no_ < rhs->seq_no_;
});
if (ObLoadDupActionType::LOAD_STOP_ON_DUP == dup_action_) {
if (0 == max_error_row_count_) {
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
} else {
ObMutexGuard guard(mutex_);
error_row_count_ += duplicate_row_count;
if (error_row_count_ >= max_error_row_count_) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count),
K_(error_row_count));
}
}
ATOMIC_AAF(&job_stat_->detected_error_rows_, duplicate_row_count);
} else if (ObLoadDupActionType::LOAD_REPLACE == dup_action_) {
ATOMIC_AAF(&result_info_->rows_affected_, 2 * duplicate_row_count);
ATOMIC_AAF(&result_info_->deleted_, duplicate_row_count);
row = rows.at(duplicate_row_count);
} else if (ObLoadDupActionType::LOAD_IGNORE == dup_action_) {
ATOMIC_AAF(&result_info_->skipped_, duplicate_row_count);
row = rows.at(0);
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dup action", KR(ret), K_(dup_action));
}
}
return ret;
}
int ObTableLoadErrorRowHandler::handle_update_row(
common::ObArray<const ObDirectLoadMultipleDatumRow *> &rows,
const ObDirectLoadMultipleDatumRow *&row)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_UNLIKELY(rows.count() < 2)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret));
} else {
int64_t duplicate_row_count = rows.count() - 1;
std::sort(rows.begin(), rows.end(),
[](const ObDirectLoadMultipleDatumRow *lhs, const ObDirectLoadMultipleDatumRow *rhs) {
return lhs->seq_no_ < rhs->seq_no_;
});
if (ObLoadDupActionType::LOAD_STOP_ON_DUP == dup_action_) {
if (0 == max_error_row_count_) {
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
} else {
error_row_count_ += duplicate_row_count;
ObMutexGuard guard(mutex_);
if (error_row_count_ >= max_error_row_count_) {
ret = OB_ERR_TOO_MANY_ROWS;
LOG_WARN("error row count reaches its maximum value", KR(ret), K_(max_error_row_count),
K_(error_row_count));
}
}
ATOMIC_AAF(&job_stat_->detected_error_rows_, duplicate_row_count);
} else if (ObLoadDupActionType::LOAD_REPLACE == dup_action_) {
ATOMIC_AAF(&result_info_->rows_affected_, 2 * duplicate_row_count);
ATOMIC_AAF(&result_info_->deleted_, duplicate_row_count);
row = rows.at(duplicate_row_count);
} else if (ObLoadDupActionType::LOAD_IGNORE == dup_action_) {
ATOMIC_AAF(&result_info_->skipped_, duplicate_row_count);
row = rows.at(0);
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dup action", KR(ret), K_(dup_action));
}
}
return ret;
}
int ObTableLoadErrorRowHandler::handle_update_row(const ObDatumRow &old_row,
const ObDatumRow &new_row,
const ObDatumRow *&result_row)

View File

@ -30,6 +30,10 @@ public:
sql::ObLoadDataStat *job_stat);
int handle_insert_row(const blocksstable::ObDatumRow &row) override;
int handle_update_row(const blocksstable::ObDatumRow &row) override;
int handle_update_row(common::ObArray<const ObDirectLoadExternalRow *> &rows,
const ObDirectLoadExternalRow *&row) override;
int handle_update_row(common::ObArray<const ObDirectLoadMultipleDatumRow *> &rows,
const ObDirectLoadMultipleDatumRow *&row) override;
int handle_update_row(const blocksstable::ObDatumRow &old_row,
const blocksstable::ObDatumRow &new_row,
const blocksstable::ObDatumRow *&result_row) override;

View File

@ -1440,7 +1440,8 @@ int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process()
ObLoadDataDirectImpl::LargeFileLoadExecutor::LargeFileLoadExecutor()
: next_worker_idx_(0),
next_chunk_id_(0)
next_chunk_id_(0),
total_line_no_(0)
{
}
@ -1519,12 +1520,13 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle
handle->worker_idx_ = get_worker_idx();
handle->session_id_ = handle->worker_idx_ + 1;
handle->data_desc_ = data_desc_;
handle->start_line_no_ = total_line_count_ + 1;
handle->start_line_no_ = total_line_no_ ;
handle->result_.created_ts_ = ObTimeUtil::current_time();
handle->sequence_no_.chunk_id_ = chunk_id;
handle->sequence_no_.chunk_seq_no_ = 0;
handle->data_buffer_.swap(expr_buffer_);
handle->data_buffer_.is_end_file_ = data_reader_.is_end_file();
total_line_no_ += current_line_count;
}
return ret;
}

View File

@ -434,6 +434,7 @@ private:
DataReader data_reader_;
int64_t next_worker_idx_;
int64_t next_chunk_id_;
int64_t total_line_no_;
DISALLOW_COPY_AND_ASSIGN(LargeFileLoadExecutor);
};

View File

@ -276,7 +276,8 @@ bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadConstDatumArray
*/
int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils,
sql::ObLoadDupActionType dup_action)
sql::ObLoadDupActionType dup_action,
bool ignore_seq_no)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -287,6 +288,7 @@ int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils,
LOG_WARN("fail to init datum array compare", KR(ret));
} else {
dup_action_ = dup_action;
ignore_seq_no_ = ignore_seq_no;
is_inited_ = true;
}
}
@ -329,7 +331,7 @@ int ObDirectLoadExternalRowCompare::compare(const ObDirectLoadExternalRow *lhs,
&rhs->rowkey_datum_array_, cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret));
} else {
if (cmp_ret == 0) {
if (cmp_ret == 0 && !ignore_seq_no_) {
if (lhs->seq_no_ == rhs->seq_no_) {
cmp_ret = 0;
} else if (lhs->seq_no_ > rhs->seq_no_) {
@ -355,7 +357,8 @@ int ObDirectLoadExternalRowCompare::compare(const ObDirectLoadExternalRow *lhs,
*/
int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils &datum_utils,
sql::ObLoadDupActionType dup_action)
sql::ObLoadDupActionType dup_action,
bool ignore_seq_no)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -366,12 +369,14 @@ int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils
LOG_WARN("fail to init datum array compare", KR(ret));
} else {
dup_action_ = dup_action;
ignore_seq_no_ = ignore_seq_no;
is_inited_ = true;
}
}
return ret;
}
bool ObDirectLoadExternalMultiPartitionRowCompare::operator()(
const ObDirectLoadExternalMultiPartitionRow *lhs,
const ObDirectLoadExternalMultiPartitionRow *rhs)
@ -436,7 +441,7 @@ int ObDirectLoadExternalMultiPartitionRowCompare::compare(
&rhs->external_row_.rowkey_datum_array_,
cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret));
} else if (cmp_ret == 0) {
} else if (cmp_ret == 0 && !ignore_seq_no_) {
if (lhs->external_row_.seq_no_ == rhs->external_row_.seq_no_) {
cmp_ret = 0;
} else if (lhs->external_row_.seq_no_ > rhs->external_row_.seq_no_) {
@ -475,7 +480,7 @@ int ObDirectLoadExternalMultiPartitionRowCompare::compare(
} else if (OB_FAIL(datum_array_compare_.compare(&lhs->rowkey_datum_array_,
&rhs->rowkey_datum_array_, cmp_ret))) {
LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret));
} else if (cmp_ret == 0) {
} else if (cmp_ret == 0 && !ignore_seq_no_) {
if (lhs->seq_no_ == rhs->seq_no_) {
cmp_ret = 0;
} else if (lhs->seq_no_ > rhs->seq_no_) {

View File

@ -85,9 +85,9 @@ public:
class ObDirectLoadExternalRowCompare
{
public:
ObDirectLoadExternalRowCompare() : result_code_(common::OB_SUCCESS), is_inited_(false) {}
ObDirectLoadExternalRowCompare() : result_code_(common::OB_SUCCESS), ignore_seq_no_(false), is_inited_(false) {}
int init(const blocksstable::ObStorageDatumUtils &datum_utils,
sql::ObLoadDupActionType dup_action);
sql::ObLoadDupActionType dup_action, bool ignore_seq_no = false);
int compare(const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs, int &cmp_ret);
bool operator()(const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs);
int get_error_code() const { return result_code_; }
@ -96,6 +96,7 @@ public:
ObDirectLoadDatumArrayCompare datum_array_compare_;
sql::ObLoadDupActionType dup_action_;
int result_code_;
bool ignore_seq_no_;
bool is_inited_;
};
@ -103,11 +104,11 @@ class ObDirectLoadExternalMultiPartitionRowCompare
{
public:
ObDirectLoadExternalMultiPartitionRowCompare()
: result_code_(common::OB_SUCCESS), is_inited_(false)
: result_code_(common::OB_SUCCESS), ignore_seq_no_(false), is_inited_(false)
{
}
int init(const blocksstable::ObStorageDatumUtils &datum_utils,
sql::ObLoadDupActionType dup_action);
sql::ObLoadDupActionType dup_action, bool ignore_seq_no = false);
bool operator()(const ObDirectLoadExternalMultiPartitionRow *lhs,
const ObDirectLoadExternalMultiPartitionRow *rhs);
bool operator()(const ObDirectLoadConstExternalMultiPartitionRow *lhs,
@ -122,6 +123,7 @@ public:
ObDirectLoadDatumArrayCompare datum_array_compare_;
sql::ObLoadDupActionType dup_action_;
int result_code_;
bool ignore_seq_no_;
bool is_inited_;
};

View File

@ -5,6 +5,8 @@
#pragma once
#include "storage/blocksstable/ob_datum_row.h"
#include "storage/direct_load/ob_direct_load_external_row.h"
#include "storage/direct_load/ob_direct_load_multiple_datum_row.h"
namespace oceanbase
{
@ -20,6 +22,10 @@ public:
virtual int handle_insert_row(const blocksstable::ObDatumRow &row) = 0;
// handle rows with the same primary key in the imported data
virtual int handle_update_row(const blocksstable::ObDatumRow &row) = 0;
virtual int handle_update_row(common::ObArray<const ObDirectLoadExternalRow *> &rows,
const ObDirectLoadExternalRow *&row) = 0;
virtual int handle_update_row(common::ObArray<const ObDirectLoadMultipleDatumRow *> &rows,
const ObDirectLoadMultipleDatumRow *&row) = 0;
// handle rows with the same primary key between the imported data and the original data
virtual int handle_update_row(const blocksstable::ObDatumRow &old_row,
const blocksstable::ObDatumRow &new_row,

View File

@ -204,6 +204,7 @@ int ObDirectLoadMemDump::dump_tables()
ObArray<ObDirectLoadMemChunkIter<RowType, CompareType>> chunk_iters; //用于暂存iters
ObDirectLoadExternalMerger<RowType, CompareType> merger;
CompareType compare;
CompareType compare1; //不带上seq_no的排序
const RowType *external_row = nullptr;
ObDatumRow datum_row;
@ -217,10 +218,12 @@ int ObDirectLoadMemDump::dump_tables()
LOG_WARN("fail to allocate memory", KR(ret));
} else if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_))) {
LOG_WARN("fail to init compare", KR(ret));
} else if (OB_FAIL(compare1.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_, true))) {
LOG_WARN("fail to init compare1", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < context_ptr_->mem_chunk_array_.count(); i++) {
ChunkType *chunk = context_ptr_->mem_chunk_array_[i];
auto iter = chunk->scan(range_.start_, range_.end_, compare);
auto iter = chunk->scan(range_.start_, range_.end_, compare1);
if (OB_FAIL(chunk_iters.push_back(iter))) {
LOG_WARN("fail to push iter", KR(ret));
}

View File

@ -208,17 +208,17 @@ int ObDirectLoadMultipleSSTableScanMerge::inner_get_next_row(
} else {
const LoserTreeItem *top_item = nullptr;
datum_row = nullptr;
rows_.reuse();
while (OB_SUCC(ret) && !rows_merger_->empty() && nullptr == datum_row) {
if (OB_FAIL(rows_merger_->top(top_item))) {
LOG_WARN("fail to get top item", KR(ret));
} else if (OB_FAIL(rows_.push_back(top_item->row_))) {
LOG_WARN("fail to push back", KR(ret));
} else if (OB_LIKELY(rows_merger_->is_unique_champion())) {
datum_row = top_item->row_;
} else {
// handle same rowkey row
if (OB_FAIL(top_item->row_->to_datums(datum_row_.storage_datums_, datum_row_.count_))) {
LOG_WARN("fail to transfer external row to datums", KR(ret));
} else if (OB_FAIL(dml_row_handler_->handle_update_row(datum_row_))) {
LOG_WARN("fail to handle update row", KR(ret), K(datum_row_));
if (OB_LIKELY(rows_.count() == 1)) {
datum_row = rows_.at(0);
} else if (OB_FAIL(dml_row_handler_->handle_update_row(rows_, datum_row))) {
LOG_WARN("fail to handle update row", KR(ret), K(rows_));
}
}
if (OB_SUCC(ret)) {

View File

@ -72,6 +72,7 @@ private:
ScanMergeLoserTree *loser_tree_;
common::ObRowsMerger<LoserTreeItem, LoserTreeCompare> *rows_merger_;
blocksstable::ObDatumRow datum_row_;
common::ObArray<const ObDirectLoadMultipleDatumRow *> rows_;
bool is_inited_;
};

View File

@ -213,17 +213,17 @@ int ObDirectLoadSSTableScanMerge::inner_get_next_row(const ObDirectLoadExternalR
} else {
const LoserTreeItem *top_item = nullptr;
external_row = nullptr;
rows_.reuse();
while (OB_SUCC(ret) && !rows_merger_->empty() && nullptr == external_row) {
if (OB_FAIL(rows_merger_->top(top_item))) {
LOG_WARN("fail to get top item", KR(ret));
} else if (OB_FAIL(rows_.push_back(top_item->external_row_))) {
LOG_WARN("fail to push back", KR(ret));
} else if (OB_LIKELY(rows_merger_->is_unique_champion())) {
external_row = top_item->external_row_;
} else {
// handle same rowkey row
if (OB_FAIL(top_item->external_row_->to_datums(datum_row_.storage_datums_, datum_row_.count_))) {
LOG_WARN("fail to transfer external row to datums", KR(ret));
} else if (OB_FAIL(dml_row_handler_->handle_update_row(datum_row_))) {
LOG_WARN("fail to handle update row", KR(ret), K(datum_row_));
if (OB_LIKELY(rows_.count() == 1)) {
external_row = rows_.at(0);
} else if (OB_FAIL(dml_row_handler_->handle_update_row(rows_, external_row))) {
LOG_WARN("fail to handle update row", KR(ret), K(rows_));
}
}
if (OB_SUCC(ret)) {

View File

@ -76,6 +76,7 @@ private:
ScanMergeLoserTree *loser_tree_;
common::ObRowsMerger<LoserTreeItem, LoserTreeCompare> *rows_merger_;
blocksstable::ObDatumRow datum_row_;
common::ObArray<const ObDirectLoadExternalRow *> rows_;
bool is_inited_;
};