[master] deduplicate when merge txExecResult

This commit is contained in:
chinaxing 2023-05-17 11:41:38 +00:00 committed by ob-robot
parent 9da5b3061d
commit 23ea96d0d2
2 changed files with 24 additions and 14 deletions

View File

@ -698,8 +698,9 @@ int ObTxDesc::get_inc_exec_info(ObTxExecResult &exec_info)
}
exec_info_reap_ts_ += 1;
}
(void) exec_info.merge_cflict_txs(cflict_txs_);
cflict_txs_.reset();
if (OB_SUCC(ret) && OB_SUCC(exec_info.merge_cflict_txs(cflict_txs_))) {
cflict_txs_.reset();
}
DETECT_LOG(TRACE, "merge conflict txs to exec result", K(cflict_txs_), K(exec_info));
return ret;
}
@ -1155,19 +1156,31 @@ int ObTxExecResult::add_touched_ls(const ObIArray<share::ObLSID> &ls_list)
return ret;
}
template<typename T>
static int append_dedup(ObIArray<T> &a, const ObIArray<T> &b)
{
int ret = OB_SUCCESS;
ARRAY_FOREACH(b, i) {
if (!is_contain(a, b.at(i))) { ret = a.push_back(b.at(i)); }
}
return ret;
}
int ObTxExecResult::merge_result(const ObTxExecResult &r)
{
int ret = OB_SUCCESS;
TRANS_LOG(TRACE, "txExecResult.merge with.start", K(r), KPC(this), K(lbt()));
incomplete_ |= r.incomplete_;
if (OB_FAIL(append(parts_, r.parts_))) {
if (OB_FAIL(append_dedup(parts_, r.parts_))) {
incomplete_ = true;
TRANS_LOG(WARN, "merge fail, set incomplete", K(ret), KPC(this));
} else if (OB_FAIL(append(touched_ls_list_, r.touched_ls_list_))) {
} else if (OB_FAIL(append_dedup(touched_ls_list_, r.touched_ls_list_))) {
incomplete_ = true;
TRANS_LOG(WARN, "merge touched_ls_list fail, set incomplete", K(ret), KPC(this));
}
merge_cflict_txs(r.cflict_txs_);
if (OB_SUCC(ret)) {
ret = merge_cflict_txs(r.cflict_txs_);
}
if (incomplete_) {
TRANS_LOG(TRACE, "tx result incomplete:", KP(this));
}
@ -1176,16 +1189,13 @@ int ObTxExecResult::merge_result(const ObTxExecResult &r)
return ret;
}
void ObTxExecResult::merge_cflict_txs(const common::ObIArray<transaction::ObTransIDAndAddr> &txs)
int ObTxExecResult::merge_cflict_txs(const common::ObIArray<transaction::ObTransIDAndAddr> &txs)
{
int ret = OB_SUCCESS;
for (int64_t idx = 0; idx < txs.count() && OB_SUCC(ret); ++idx) {
if (!is_contain(cflict_txs_, txs.at(idx))) {
if (OB_FAIL(cflict_txs_.push_back(txs.at(idx)))) {
DETECT_LOG(WARN, "push fail", KR(ret), KPC(this), K(txs));
}
}
if (OB_FAIL(append_dedup(cflict_txs_, txs))) {
DETECT_LOG(WARN, "append fail", KR(ret), KPC(this), K(txs));
}
return ret;
}
int ObTxExecResult::assign(const ObTxExecResult &r)

View File

@ -314,8 +314,8 @@ public:
TRANS_LOG(TRACE, "tx result incomplete:", KP(this));
incomplete_ = true;
}
void merge_cflict_txs(const common::ObIArray<ObTransIDAndAddr> &txs);
inline bool is_incomplete() const { return incomplete_; }
int merge_cflict_txs(const common::ObIArray<ObTransIDAndAddr> &txs);
bool is_incomplete() const { return incomplete_; }
int add_touched_ls(const share::ObLSID ls);
int add_touched_ls(const ObIArray<share::ObLSID> &ls_list);
const share::ObLSArray &get_touched_ls() const { return touched_ls_list_; }