fix bug multi_replace stmt hang in fetch_conflict
This commit is contained in:
committed by
wangzelin.wzl
parent
e54cab6c45
commit
a809ae9f36
@ -1748,7 +1748,7 @@ int ObPartitionStorage::fetch_conflict_rows(const ObStoreCtx& ctx, const ObDMLBa
|
|||||||
|
|
||||||
int ObPartitionStorage::multi_get_rows(const ObStoreCtx& store_ctx, const ObTableAccessParam& access_param,
|
int ObPartitionStorage::multi_get_rows(const ObStoreCtx& store_ctx, const ObTableAccessParam& access_param,
|
||||||
ObTableAccessContext& access_ctx, ObRelativeTable& relative_table, const GetRowkeyArray& rowkeys,
|
ObTableAccessContext& access_ctx, ObRelativeTable& relative_table, const GetRowkeyArray& rowkeys,
|
||||||
ObNewRowIterator*& duplicated_rows)
|
ObNewRowIterator*& duplicated_rows, int64_t data_table_rowkey_cnt)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const ObTablesHandle& tables_handle = relative_table.tables_handle_;
|
const ObTablesHandle& tables_handle = relative_table.tables_handle_;
|
||||||
@ -1795,7 +1795,7 @@ int ObPartitionStorage::multi_get_rows(const ObStoreCtx& store_ctx, const ObTabl
|
|||||||
STORAGE_LOG(ERROR, "no memory to alloc ObValueRowIterator", K(ret));
|
STORAGE_LOG(ERROR, "no memory to alloc ObValueRowIterator", K(ret));
|
||||||
} else {
|
} else {
|
||||||
duplicated_rows = dup_iter;
|
duplicated_rows = dup_iter;
|
||||||
if (OB_FAIL(dup_iter->init(true))) {
|
if (OB_FAIL(dup_iter->init(true, data_table_rowkey_cnt))) {
|
||||||
STORAGE_LOG(WARN, "failed to initialize ObValueRowIterator", K(ret));
|
STORAGE_LOG(WARN, "failed to initialize ObValueRowIterator", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1825,7 +1825,9 @@ int ObPartitionStorage::multi_get_rows(const ObStoreCtx& store_ctx, const ObTabl
|
|||||||
row->row_val_.count_ = access_param.output_exprs_->count();
|
row->row_val_.count_ = access_param.output_exprs_->count();
|
||||||
LOG_DEBUG("get conflict row", K_(row->row_val));
|
LOG_DEBUG("get conflict row", K_(row->row_val));
|
||||||
}
|
}
|
||||||
if (OB_FAIL(dup_iter->add_row(row->row_val_))) {
|
if (OB_FAIL(ret)) {
|
||||||
|
|
||||||
|
} else if (OB_FAIL(dup_iter->add_row(row->row_val_))) {
|
||||||
STORAGE_LOG(WARN, "failed to store conflict row", K(*row));
|
STORAGE_LOG(WARN, "failed to store conflict row", K(*row));
|
||||||
} else {
|
} else {
|
||||||
LOG_DEBUG("get conflict row", K_(row->row_val));
|
LOG_DEBUG("get conflict row", K_(row->row_val));
|
||||||
@ -1894,13 +1896,15 @@ int ObPartitionStorage::get_index_conflict_row(ObDMLRunningCtx& run_ctx, const O
|
|||||||
LOG_WARN("get conflict row failed", K(relative_table), K(index_rowkey), K(pk_out_descs));
|
LOG_WARN("get conflict row failed", K(relative_table), K(index_rowkey), K(pk_out_descs));
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret) && OB_UNLIKELY(need_index_back) && OB_UNLIKELY(tmp_rowkey_iter != NULL)) {
|
if (OB_SUCC(ret) && OB_UNLIKELY(need_index_back) && OB_UNLIKELY(tmp_rowkey_iter != NULL)) {
|
||||||
|
int64_t data_table_rowkey_cnt = run_ctx.relative_tables_.data_table_.get_rowkey_column_num();
|
||||||
OZ(convert_row_to_rowkey(*dup_rowkey_iter, rowkeys));
|
OZ(convert_row_to_rowkey(*dup_rowkey_iter, rowkeys));
|
||||||
OZ(multi_get_rows(run_ctx.store_ctx_,
|
OZ(multi_get_rows(run_ctx.store_ctx_,
|
||||||
table_access_param,
|
table_access_param,
|
||||||
table_access_ctx,
|
table_access_ctx,
|
||||||
run_ctx.relative_tables_.data_table_,
|
run_ctx.relative_tables_.data_table_,
|
||||||
rowkeys,
|
rowkeys,
|
||||||
duplicated_rows));
|
duplicated_rows,
|
||||||
|
data_table_rowkey_cnt));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (dup_rowkey_iter != NULL) {
|
if (dup_rowkey_iter != NULL) {
|
||||||
@ -1917,9 +1921,11 @@ int ObPartitionStorage::get_conflict_row(ObDMLRunningCtx& run_ctx, const ObTable
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObExtStoreRowkey ext_rowkey(rowkey);
|
ObExtStoreRowkey ext_rowkey(rowkey);
|
||||||
GetRowkeyArray rowkeys;
|
GetRowkeyArray rowkeys;
|
||||||
|
int64_t data_table_rowkey_cnt = run_ctx.relative_tables_.data_table_.get_rowkey_column_num();
|
||||||
OZ(rowkeys.push_back(ext_rowkey));
|
OZ(rowkeys.push_back(ext_rowkey));
|
||||||
OZ(multi_get_rows(run_ctx.store_ctx_, access_param, access_ctx, relative_table, rowkeys, duplicated_rows));
|
OZ(multi_get_rows(run_ctx.store_ctx_, access_param, access_ctx,
|
||||||
LOG_DEBUG("get conflict row", K(ret), K(rowkey), K(relative_table), K(access_param));
|
relative_table, rowkeys, duplicated_rows, data_table_rowkey_cnt));
|
||||||
|
LOG_DEBUG("get conflict row", K(ret), K(rowkey), K(relative_table), K(access_param), K(data_table_rowkey_cnt));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -556,7 +556,7 @@ private:
|
|||||||
const common::ObNewRow& row, common::ObNewRowIterator*& duplicated_rows);
|
const common::ObNewRow& row, common::ObNewRowIterator*& duplicated_rows);
|
||||||
int multi_get_rows(const ObStoreCtx& store_ctx, const ObTableAccessParam& access_param,
|
int multi_get_rows(const ObStoreCtx& store_ctx, const ObTableAccessParam& access_param,
|
||||||
ObTableAccessContext& access_ctx, ObRelativeTable& relative_table, const GetRowkeyArray& rowkeys,
|
ObTableAccessContext& access_ctx, ObRelativeTable& relative_table, const GetRowkeyArray& rowkeys,
|
||||||
common::ObNewRowIterator*& duplicated_rows);
|
common::ObNewRowIterator*& duplicated_rows, int64_t data_table_rowkey_cnt);
|
||||||
int get_conflict_rows(ObDMLRunningCtx& run_ctx, const ObInsertFlag flag,
|
int get_conflict_rows(ObDMLRunningCtx& run_ctx, const ObInsertFlag flag,
|
||||||
const common::ObIArray<uint64_t>& dup_col_ids, const common::ObNewRow& row,
|
const common::ObIArray<uint64_t>& dup_col_ids, const common::ObNewRow& row,
|
||||||
common::ObNewRowIterator*& duplicated_rows);
|
common::ObNewRowIterator*& duplicated_rows);
|
||||||
|
|||||||
@ -22,13 +22,14 @@ ObValueRowIterator::ObValueRowIterator()
|
|||||||
unique_(false),
|
unique_(false),
|
||||||
allocator_(ObModIds::OB_VALUE_ROW_ITER),
|
allocator_(ObModIds::OB_VALUE_ROW_ITER),
|
||||||
rows_(),
|
rows_(),
|
||||||
cur_idx_(0)
|
cur_idx_(0),
|
||||||
|
data_table_rowkey_cnt_(0)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
ObValueRowIterator::~ObValueRowIterator()
|
ObValueRowIterator::~ObValueRowIterator()
|
||||||
{}
|
{}
|
||||||
|
|
||||||
int ObValueRowIterator::init(bool unique)
|
int ObValueRowIterator::init(bool unique, int64_t data_table_rowkey_cnt)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_UNLIKELY(is_inited_)) {
|
if (OB_UNLIKELY(is_inited_)) {
|
||||||
@ -38,6 +39,7 @@ int ObValueRowIterator::init(bool unique)
|
|||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
unique_ = unique;
|
unique_ = unique;
|
||||||
cur_idx_ = 0;
|
cur_idx_ = 0;
|
||||||
|
data_table_rowkey_cnt_ = data_table_rowkey_cnt;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -52,6 +54,9 @@ int ObValueRowIterator::add_row(common::ObNewRow& row)
|
|||||||
} else if (OB_UNLIKELY(!is_inited_)) {
|
} else if (OB_UNLIKELY(!is_inited_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
STORAGE_LOG(WARN, "ObValueRowIterator is not initialized", K(ret));
|
STORAGE_LOG(WARN, "ObValueRowIterator is not initialized", K(ret));
|
||||||
|
} else if (data_table_rowkey_cnt_ > row.count_) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
STORAGE_LOG(WARN, "invalid row", K(ret), K(row), K(data_table_rowkey_cnt_));
|
||||||
} else {
|
} else {
|
||||||
bool exist = false;
|
bool exist = false;
|
||||||
// check whether exists
|
// check whether exists
|
||||||
@ -60,14 +65,20 @@ int ObValueRowIterator::add_row(common::ObNewRow& row)
|
|||||||
// on multiple unique index is small, so there is usually only one row in the value row iterator
|
// on multiple unique index is small, so there is usually only one row in the value row iterator
|
||||||
// so using list traversal to deduplicate unique index is more efficiently
|
// so using list traversal to deduplicate unique index is more efficiently
|
||||||
// and also saves the CPU overhead that constructs the hash map
|
// and also saves the CPU overhead that constructs the hash map
|
||||||
ObStoreRowkey rowkey(row.cells_, row.count_);
|
ObStoreRowkey rowkey(row.cells_, data_table_rowkey_cnt_);
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && !exist && i < rows_.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && !exist && i < rows_.count(); ++i) {
|
||||||
ObStoreRowkey tmp_rowkey(rows_.at(i).cells_, rows_.at(i).count_);
|
if (data_table_rowkey_cnt_ > rows_.at(i).count_) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
STORAGE_LOG(WARN, "invalid row", K(ret), K(row), K(data_table_rowkey_cnt_));
|
||||||
|
} else {
|
||||||
|
ObStoreRowkey tmp_rowkey(rows_.at(i).cells_, data_table_rowkey_cnt_);
|
||||||
|
STORAGE_LOG(DEBUG, "print rowkey info", K(rowkey), K(tmp_rowkey), K(data_table_rowkey_cnt_));
|
||||||
if (OB_UNLIKELY(tmp_rowkey == rowkey)) {
|
if (OB_UNLIKELY(tmp_rowkey == rowkey)) {
|
||||||
exist = true;
|
exist = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// store non-exist row
|
// store non-exist row
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (!exist) {
|
if (!exist) {
|
||||||
|
|||||||
@ -29,7 +29,7 @@ class ObValueRowIterator : public common::ObNewRowIterator {
|
|||||||
public:
|
public:
|
||||||
ObValueRowIterator();
|
ObValueRowIterator();
|
||||||
virtual ~ObValueRowIterator();
|
virtual ~ObValueRowIterator();
|
||||||
virtual int init(bool unique);
|
virtual int init(bool unique, int64_t data_table_rowkey_cnt);
|
||||||
virtual int get_next_row(common::ObNewRow*& row);
|
virtual int get_next_row(common::ObNewRow*& row);
|
||||||
virtual int get_next_rows(common::ObNewRow*& rows, int64_t& row_count);
|
virtual int get_next_rows(common::ObNewRow*& rows, int64_t& row_count);
|
||||||
virtual int add_row(common::ObNewRow& row);
|
virtual int add_row(common::ObNewRow& row);
|
||||||
@ -41,6 +41,7 @@ private:
|
|||||||
common::ObArenaAllocator allocator_;
|
common::ObArenaAllocator allocator_;
|
||||||
RowArray rows_;
|
RowArray rows_;
|
||||||
int64_t cur_idx_;
|
int64_t cur_idx_;
|
||||||
|
int64_t data_table_rowkey_cnt_;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObValueRowIterator);
|
DISALLOW_COPY_AND_ASSIGN(ObValueRowIterator);
|
||||||
|
|||||||
Reference in New Issue
Block a user