[FIX] the logic refreshing tx data memtable cache is incorrect
This commit is contained in:
@ -60,6 +60,39 @@ using namespace palf;
|
||||
namespace storage
|
||||
{
|
||||
|
||||
int ObTxDataTable::check_tx_data_with_cache_once_(const transaction::ObTransID tx_id, ObITxDataCheckFunctor &fn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
ObTxDataGuard tx_data_guard;
|
||||
bool find = false;
|
||||
|
||||
if (OB_FAIL(get_tx_data_from_cache_(tx_id, tx_data_guard, find))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
STORAGE_LOG(WARN, "get tx data from cache failed", KR(ret));
|
||||
}
|
||||
} else {
|
||||
if (find) {
|
||||
/**************** rewrite logic *******************/
|
||||
|
||||
ret = fn(*tx_data_guard.tx_data());
|
||||
|
||||
/**************** rewrite logic *******************/
|
||||
} else {
|
||||
int64_t memtable_head = -1;
|
||||
int64_t memtable_tail = -1;
|
||||
if (OB_FAIL(get_memtable_mgr_()->get_memtable_range(memtable_head, memtable_tail))) {
|
||||
STORAGE_LOG(WARN, "get memtable range failed", KR(ret));
|
||||
} else if (memtable_head != memtables_cache_.memtable_head_ || memtable_tail != memtables_cache_.memtable_tail_) {
|
||||
ret = OB_EAGAIN;
|
||||
} else {
|
||||
ret = OB_TRANS_CTX_NOT_EXIST;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int clear_tx_data(ObTxDataTable *tx_data_table)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -503,10 +503,21 @@ int ObTxDataTable::check_tx_data_with_cache_once_(const transaction::ObTransID t
|
||||
bool find = false;
|
||||
|
||||
if (OB_FAIL(get_tx_data_from_cache_(tx_id, tx_data_guard, find))) {
|
||||
STORAGE_LOG(WARN, "get memtable range failed", KR(ret));
|
||||
if (OB_EAGAIN != ret) {
|
||||
STORAGE_LOG(WARN, "get tx data from cache failed", KR(ret));
|
||||
}
|
||||
} else {
|
||||
if (find) {
|
||||
ret = fn(*tx_data_guard.tx_data());
|
||||
if (ObTxData::RUNNING == tx_data_guard.tx_data()->state_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR,
|
||||
"read a running state tx data from tx data table",
|
||||
KR(ret),
|
||||
K(tx_data_guard),
|
||||
KPC(tx_data_guard.tx_data()));
|
||||
} else {
|
||||
ret = fn(*tx_data_guard.tx_data());
|
||||
}
|
||||
} else {
|
||||
int64_t memtable_head = -1;
|
||||
int64_t memtable_tail = -1;
|
||||
@ -528,8 +539,9 @@ int ObTxDataTable::get_tx_data_from_cache_(const transaction::ObTransID tx_id, O
|
||||
|
||||
TCRLockGuard guard(memtables_cache_.lock_);
|
||||
ObTableHdlArray &memtable_handles = memtables_cache_.memtable_handles_;
|
||||
int memtable_handle_tail = memtable_handles.count() - 1;
|
||||
|
||||
for (int i = memtable_handles.count() - 1; OB_SUCC(ret) && !find && i >= 0; i--) {
|
||||
for (int i = memtable_handle_tail ; OB_SUCC(ret) && !find && i >= 0; i--) {
|
||||
ObTxDataMemtable *tx_data_memtable = nullptr;
|
||||
if (OB_FAIL(memtable_handles.at(i).get_tx_data_memtable(tx_data_memtable))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -537,6 +549,9 @@ int ObTxDataTable::get_tx_data_from_cache_(const transaction::ObTransID tx_id, O
|
||||
} else if (OB_ISNULL(tx_data_memtable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "tx data memtable is nullptr.", KR(ret), K(tx_id));
|
||||
} else if (i == memtable_handle_tail && ObTxDataMemtable::State::ACTIVE != tx_data_memtable->get_state()) {
|
||||
// the latest memtable is not active, update memtable handles cache
|
||||
ret = OB_EAGAIN;
|
||||
} else {
|
||||
int tmp_ret = tx_data_memtable->get_tx_data(tx_id, tx_data_guard);
|
||||
if (OB_SUCCESS == tmp_ret) {
|
||||
|
Reference in New Issue
Block a user