Add more defensive code for tx data memtable
This commit is contained in:
parent
e03f133ffc
commit
172b395e64
@ -171,6 +171,10 @@ int ObTxDataMemtable::insert(ObTxData *tx_data)
|
||||
} else if (OB_ISNULL(tx_data)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "tx data is nullptr", KR(ret));
|
||||
} else if (ObTxCommitData::COMMIT == tx_data->state_ &&
|
||||
(tx_data->commit_version_.is_max() || !tx_data->commit_version_.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "invalid tx data", KR(ret), KPC(tx_data));
|
||||
} else if (OB_ISNULL(tx_data_map_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "unexpected null value of tx_data_map_", KR(ret));
|
||||
@ -356,7 +360,7 @@ int ObTxDataMemtable::pre_process_commit_version_row_(ObTxData *fake_tx_data)
|
||||
if (OB_FAIL(fill_in_cur_commit_versions_(cur_commit_versions)/*step 1*/)) {
|
||||
STORAGE_LOG(WARN, "periodical select commit version failed.", KR(ret));
|
||||
} else if (OB_FAIL(get_past_commit_versions_(past_commit_versions)/*step 2*/)) {
|
||||
STORAGE_LOG(WARN, "get past commit versions failed.", KR(ret));
|
||||
STORAGE_LOG(WARN, "get past commit versions failed.", KR(ret), K(past_commit_versions));
|
||||
} else if (do_recycle_ && OB_FAIL(memtable_mgr_->get_tx_data_table()->get_recycle_scn(recycle_scn) /*step 3*/)) {
|
||||
STORAGE_LOG(WARN, "get recycle ts failed.", KR(ret));
|
||||
} else if (OB_FAIL(merge_cur_and_past_commit_verisons_(recycle_scn, cur_commit_versions,/*step 4*/
|
||||
@ -433,9 +437,6 @@ int ObTxDataMemtable::periodical_get_next_commit_version_(ProcessCommitVersionDa
|
||||
// avoid rollback or abort transaction influencing commit versions array
|
||||
if (ObTxData::COMMIT != tmp_tx_data->state_) {
|
||||
continue;
|
||||
} else if (!tmp_tx_data->commit_version_.is_valid() || tmp_tx_data->commit_version_.is_max()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "unexpected tx data commit version", KPC(tmp_tx_data));
|
||||
} else {
|
||||
tx_data = tmp_tx_data;
|
||||
}
|
||||
@ -453,6 +454,11 @@ int ObTxDataMemtable::periodical_get_next_commit_version_(ProcessCommitVersionDa
|
||||
cur_max_commit_version = tx_data->commit_version_;
|
||||
}
|
||||
|
||||
if (cur_max_commit_version.is_max()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "unexpected max commit version", KR(ret), KPC(tx_data));
|
||||
}
|
||||
|
||||
// If this tx data is the first tx data in sorted list or its start_log_ts is 1_s larger than
|
||||
// the pre_start_scn, we use this start_log_ts to calculate upper_trans_version
|
||||
if (SCN::min_scn() == pre_start_scn ||
|
||||
@ -508,10 +514,15 @@ int ObTxDataMemtable::get_past_commit_versions_(ObCommitVersionsArray &past_comm
|
||||
ObCommitVersionsGetter getter(iter_param, tmp_sstable);
|
||||
if (OB_FAIL(getter.get_next_row(past_commit_versions))) {
|
||||
STORAGE_LOG(WARN, "get commit versions from tx data sstable failed.", KR(ret));
|
||||
} else if (!past_commit_versions.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "invalid past commit versions array", KR(ret), K(past_commit_versions), KPC(sstable));
|
||||
} else {
|
||||
STORAGE_LOG(INFO, "finish get past commit versions", KR(ret), K(past_commit_versions), KPC(sstable));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "There is no tx data sstable yet", KR(ret), KPC(sstable));
|
||||
STORAGE_LOG(INFO, "There is no tx data sstable yet", KR(ret), K(past_commit_versions), KP(sstable));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -332,6 +332,13 @@ DEF_TO_STRING(ObCommitVersionsArray)
|
||||
{
|
||||
int64_t pos = 0;
|
||||
J_KV(KP(this), K(array_.count()));
|
||||
int64_t cnt = array_.count();
|
||||
for (int64_t i = 0; i < cnt && i < 5; i++) {
|
||||
J_KV("idx", i, "node", array_.at(i));
|
||||
}
|
||||
for (int64_t i = cnt - 5; i >= 5 && i < cnt; i++) {
|
||||
J_KV("idx", i, "node", array_.at(i));
|
||||
}
|
||||
return pos;
|
||||
}
|
||||
|
||||
|
@ -344,6 +344,9 @@ public:
|
||||
DECLARE_TO_STRING;
|
||||
};
|
||||
|
||||
ObCommitVersionsArray(): array_() {}
|
||||
~ObCommitVersionsArray() { reset(); }
|
||||
|
||||
void reset() { array_.reset(); }
|
||||
|
||||
ObCommitVersionsArray &operator=(const ObCommitVersionsArray& rhs)
|
||||
|
@ -648,7 +648,7 @@ int ObCommitVersionsGetter::get_next_row(ObCommitVersionsArray &commit_versions)
|
||||
STORAGE_LOG(ERROR, "Unexpected empty commit versions array.", KR(ret), KPC(row));
|
||||
} else if (!commit_versions.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "invalid cache", KR(ret));
|
||||
STORAGE_LOG(ERROR, "invalid cache", KR(ret), K(commit_versions), KPC(table_));
|
||||
} else {
|
||||
// get commit versions from tx data sstable done.
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user