[FEAT MERGE] Big Txn Execution Optimization
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
#include "storage/ls/ob_ls_tablet_service.h"
|
||||
#include "storage/tablet/ob_tablet_iterator.h"
|
||||
#include "storage/tx/ob_tx_data_functor.h"
|
||||
#include "storage/tx/ob_tx_data_define.h"
|
||||
#include "storage/tx_storage/ob_tenant_freezer.h"
|
||||
#include "storage/tx_table/ob_tx_ctx_table.h"
|
||||
#include "storage/tx_table/ob_tx_table_define.h"
|
||||
@ -36,30 +37,25 @@ namespace storage
|
||||
int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
STATIC_ASSERT(sizeof(ObTxData) <= TX_DATA_SLICE_SIZE, "Size of ObTxData Overflow.");
|
||||
STATIC_ASSERT(sizeof(ObUndoAction) == UNDO_ACTION_SZIE, "Size of ObUndoAction Overflow.");
|
||||
STATIC_ASSERT(sizeof(ObUndoStatusNode) <= TX_DATA_SLICE_SIZE, "Size of ObUndoStatusNode Overflow");
|
||||
|
||||
STATIC_ASSERT(sizeof(ObTxData) <= TX_DATA_SIZE, "Size of ObTxData Overflow.");
|
||||
STATIC_ASSERT(sizeof(TxDataHashNode) <= TX_DATA_HASH_NODE_SIZE,
|
||||
"Size of TxDataHashNode Overflow.");
|
||||
STATIC_ASSERT(sizeof(ObTxDataSortListNode) <= TX_DATA_SORT_LIST_NODE_SIZE,
|
||||
"Size of ObTxDataSortListNode Overflow.");
|
||||
STATIC_ASSERT(TX_DATA_SIZE + TX_DATA_HASH_NODE_SIZE + TX_DATA_SORT_LIST_NODE_SIZE
|
||||
<= TX_DATA_SLICE_SIZE,
|
||||
"Size of ObTxDataSortListNode Overflow.");
|
||||
|
||||
mem_attr_.label_ = "TX_DATA_TABLE";
|
||||
mem_attr_.tenant_id_ = MTL_ID();
|
||||
mem_attr_.ctx_id_ = ObCtxIds::DEFAULT_CTX_ID;
|
||||
ObMemAttr mem_attr;
|
||||
mem_attr.label_ = "TX_DATA_TABLE";
|
||||
mem_attr.tenant_id_ = MTL_ID();
|
||||
mem_attr.ctx_id_ = ObCtxIds::DEFAULT_CTX_ID;
|
||||
ObMemtableMgrHandle memtable_mgr_handle;
|
||||
if (OB_ISNULL(ls) || OB_ISNULL(tx_ctx_table)) {
|
||||
ret = OB_ERR_NULL_VALUE;
|
||||
STORAGE_LOG(WARN, "ls tablet service or tx ctx table is nullptr", KR(ret));
|
||||
} else if (OB_FAIL(slice_allocator_.init(TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE,
|
||||
common::default_blk_alloc, mem_attr_))) {
|
||||
common::default_blk_alloc, mem_attr))) {
|
||||
STORAGE_LOG(ERROR, "slice_allocator_ init fail");
|
||||
} else if (FALSE_IT(ls_tablet_svr_ = ls->get_tablet_svr())) {
|
||||
} else if (OB_FAIL(ls_tablet_svr_->get_tx_data_memtable_mgr(memtable_mgr_handle))) {
|
||||
STORAGE_LOG(WARN, "get tx data memtable mgr fail.", KR(ret), K(tablet_id_));
|
||||
} else if (FALSE_IT(arena_allocator_.set_attr(mem_attr_))) {
|
||||
} else if (FALSE_IT(arena_allocator_.set_attr(mem_attr))) {
|
||||
} else if (OB_FAIL(init_tx_data_read_schema_())) {
|
||||
STORAGE_LOG(WARN, "init tx data read ctx failed.", KR(ret), K(tablet_id_));
|
||||
} else {
|
||||
@ -72,6 +68,7 @@ int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table)
|
||||
tablet_id_ = LS_TX_DATA_TABLET;
|
||||
|
||||
is_inited_ = true;
|
||||
FLOG_INFO("tx data table init success", K(sizeof(ObTxData)), K(sizeof(ObTxDataLinkNode)), KPC(this));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -225,7 +222,7 @@ int ObTxDataTable::clean_memtables_cache_()
|
||||
|
||||
void ObTxDataTable::destroy() { reset(); }
|
||||
|
||||
int ObTxDataTable::alloc_tx_data(ObTxData *&tx_data)
|
||||
int ObTxDataTable::alloc_tx_data(ObTxDataGuard &tx_data_guard)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
void *slice_ptr = nullptr;
|
||||
@ -234,25 +231,34 @@ int ObTxDataTable::alloc_tx_data(ObTxData *&tx_data)
|
||||
STORAGE_LOG(WARN, "allocate memory from slice_allocator fail.", KR(ret), KP(this),
|
||||
K(tablet_id_));
|
||||
} else {
|
||||
tx_data = ObTxData::get_tx_data_by_hash_node(reinterpret_cast<TxDataHashNode *>(slice_ptr));
|
||||
// construct ObTxData()
|
||||
new (tx_data) ObTxData();
|
||||
ObTxData *tx_data = new (slice_ptr) ObTxData();
|
||||
tx_data->slice_allocator_ = &slice_allocator_;
|
||||
tx_data_guard.init(tx_data);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxDataTable::deep_copy_tx_data(ObTxData *in_tx_data, ObTxData *&out_tx_data)
|
||||
int ObTxDataTable::deep_copy_tx_data(const ObTxDataGuard &in_tx_data_guard, ObTxDataGuard &out_tx_data_guard)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
void *slice_ptr = nullptr;
|
||||
const ObTxData *in_tx_data = in_tx_data_guard.tx_data();
|
||||
ObTxData *out_tx_data = nullptr;
|
||||
if (OB_ISNULL(slice_ptr = slice_allocator_.alloc())) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
STORAGE_LOG(WARN, "allocate memory from slice_allocator fail.", KR(ret), KP(this),
|
||||
K(tablet_id_));
|
||||
} else if (OB_ISNULL(in_tx_data)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "invalid nullptr of tx data", K(in_tx_data_guard), KPC(this));
|
||||
} else {
|
||||
out_tx_data = ObTxData::get_tx_data_by_hash_node(reinterpret_cast<TxDataHashNode *>(slice_ptr));
|
||||
new (out_tx_data) ObTxData(*in_tx_data);
|
||||
out_tx_data = new (slice_ptr) ObTxData();
|
||||
*out_tx_data = *in_tx_data;
|
||||
out_tx_data->slice_allocator_ = &slice_allocator_;
|
||||
out_tx_data->undo_status_list_.head_ = nullptr;
|
||||
out_tx_data->ref_cnt_ = 0;
|
||||
out_tx_data_guard.init(out_tx_data);
|
||||
|
||||
if (OB_FAIL(deep_copy_undo_status_list_(in_tx_data->undo_status_list_,
|
||||
out_tx_data->undo_status_list_))) {
|
||||
STORAGE_LOG(WARN, "deep copy undo status list failed.");
|
||||
@ -263,7 +269,7 @@ int ObTxDataTable::deep_copy_tx_data(ObTxData *in_tx_data, ObTxData *&out_tx_dat
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxDataTable::deep_copy_undo_status_list_(ObUndoStatusList &in_list,
|
||||
int ObTxDataTable::deep_copy_undo_status_list_(const ObUndoStatusList &in_list,
|
||||
ObUndoStatusList &out_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -302,16 +308,6 @@ int ObTxDataTable::alloc_undo_status_node(ObUndoStatusNode *&undo_status_node)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTxDataTable::free_tx_data(ObTxData *tx_data)
|
||||
{
|
||||
if (OB_NOT_NULL(tx_data)) {
|
||||
free_undo_status_list_(tx_data->undo_status_list_.head_);
|
||||
// The memory of tx data belongs to a slice of memory allocated by slice allocator. And the
|
||||
// start of the slice memory is hash node.
|
||||
slice_allocator_.free(ObTxData::get_hash_node_by_tx_data(tx_data));
|
||||
}
|
||||
}
|
||||
|
||||
int ObTxDataTable::free_undo_status_node(ObUndoStatusNode *&undo_status_node)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -319,7 +315,7 @@ int ObTxDataTable::free_undo_status_node(ObUndoStatusNode *&undo_status_node)
|
||||
ret = OB_ERR_NULL_VALUE;
|
||||
STORAGE_LOG(WARN, "trying to free nullptr", KR(ret), K(tablet_id_));
|
||||
} else {
|
||||
slice_allocator_.free(reinterpret_cast<void *>(undo_status_node));
|
||||
slice_allocator_.free(undo_status_node);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -360,8 +356,8 @@ int ObTxDataTable::insert(ObTxData *&tx_data)
|
||||
} else if (OB_FAIL(insert_(tx_data, write_guard))) {
|
||||
STORAGE_LOG(WARN, "insert tx data failed.", KR(ret), KPC(tx_data), KP(this), K(tablet_id_));
|
||||
} else {
|
||||
// STORAGE_LOG(DEBUG, "insert tx data succeed.", KPC(tx_data));
|
||||
// successfully insert
|
||||
// TODO : @gengli do not dec ref and set nullptr after insert
|
||||
}
|
||||
if (tg.get_diff() > 100000) {
|
||||
STORAGE_LOG(INFO, "ObTxDataTable insert cost too much time", K(tx_id), K(tg));
|
||||
@ -377,6 +373,7 @@ int ObTxDataTable::insert_(ObTxData *&tx_data, ObTxDataMemtableWriteGuard &write
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObTimeGuard tg("tx_data_table::insert_", 100 * 1000);
|
||||
bool inserted = false;
|
||||
ObTxDataMemtable *tx_data_memtable = nullptr;
|
||||
ObTableHandleV2 (&memtable_handles)[MAX_TX_DATA_MEMTABLE_CNT] = write_guard.handles_;
|
||||
|
||||
@ -390,39 +387,42 @@ int ObTxDataTable::insert_(ObTxData *&tx_data, ObTxDataMemtableWriteGuard &write
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "tx data memtable is nullptr.", KR(ret), KP(this), K(tablet_id_),
|
||||
K(memtable_handles[i]));
|
||||
} else if (OB_UNLIKELY(ObTxDataMemtable::State::RELEASED == tx_data_memtable->get_state())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR,
|
||||
"trying to insert a tx data into a tx data memtable in frozen/dumped state.",
|
||||
KR(ret), KP(this), K(tablet_id_), KP(tx_data_memtable), KPC(tx_data_memtable));
|
||||
} else if (FALSE_IT(tg.click())) {
|
||||
// do nothing
|
||||
} else if (tx_data_memtable->get_start_scn() < tx_data->end_scn_
|
||||
&& tx_data_memtable->get_end_scn() >= tx_data->end_scn_) {
|
||||
tg.click();
|
||||
ret = insert_into_memtable_(tx_data_memtable, tx_data);
|
||||
if (OB_FAIL(tx_data_memtable->insert(tx_data))) {
|
||||
STORAGE_LOG(WARN,
|
||||
"insert tx data into tx data memtable failed",
|
||||
KR(ret),
|
||||
"ls_id", get_ls_id(),
|
||||
KPC(tx_data),
|
||||
KPC(tx_data_memtable));
|
||||
} else {
|
||||
inserted = true;
|
||||
}
|
||||
} else {
|
||||
// should not insert into this memtable
|
||||
STORAGE_LOG(INFO, "skip this tx data memtable", KPC(tx_data), KPC(tx_data_memtable));
|
||||
}
|
||||
}
|
||||
tg.click();
|
||||
|
||||
// If this tx data can not be inserted into all memtables, check if it should be filtered.
|
||||
// We use the start scn of the first memtable as the filtering time stamp
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(tx_data) && OB_NOT_NULL(tx_data_memtable)) {
|
||||
// We use the start log ts of the first memtable as the filtering time stamp
|
||||
if (OB_SUCC(ret) && !inserted && OB_NOT_NULL(tx_data_memtable)) {
|
||||
SCN clog_checkpoint_scn = tx_data_memtable->get_key().get_start_scn();
|
||||
if (tx_data->end_scn_ <= clog_checkpoint_scn) {
|
||||
// Filter this tx data. The part trans ctx need to handle this error code because the memory
|
||||
// of tx data need to be freed.
|
||||
STORAGE_LOG(INFO, "This tx data is filtered.", K(clog_checkpoint_scn), KPC(tx_data));
|
||||
free_tx_data(tx_data);
|
||||
tx_data = nullptr;
|
||||
tg.click();
|
||||
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "cannot find the correct tx data memtable to insert", KR(ret),
|
||||
KPC(tx_data), K(memtable_handles));
|
||||
KPC(tx_data), K(clog_checkpoint_scn), K(memtable_handles));
|
||||
}
|
||||
}
|
||||
if (tg.get_diff() > 100000) {
|
||||
@ -432,69 +432,6 @@ int ObTxDataTable::insert_(ObTxData *&tx_data, ObTxDataMemtableWriteGuard &write
|
||||
return ret;
|
||||
}
|
||||
|
||||
// A tx data with lesser end_scn may be inserted after a tx data with greater end_scn due to
|
||||
// the out-of-order callback of log. This function will retain the newer tx data and delete the
|
||||
// older one.
|
||||
int ObTxDataTable::insert_into_memtable_(ObTxDataMemtable *tx_data_memtable, ObTxData *&tx_data)
|
||||
{
|
||||
common::ObTimeGuard tg("tx_data_table::insert_into_memtable", 100 * 1000);
|
||||
int ret = OB_SUCCESS;
|
||||
bool need_insert = true;
|
||||
|
||||
if (OB_UNLIKELY(tx_data_memtable->contain_tx_data(tx_data->tx_id_))) {
|
||||
tg.click();
|
||||
// check and insert
|
||||
ObTxData *existed_tx_data = nullptr;
|
||||
if (OB_FAIL(tx_data_memtable->get_tx_data(tx_data->tx_id_, existed_tx_data))) {
|
||||
STORAGE_LOG(WARN, "get tx data from tx data memtable failed.", KR(ret), KPC(tx_data),
|
||||
KPC(tx_data_memtable));
|
||||
} else if (OB_ISNULL(existed_tx_data)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "existed tx data is unexpected nullptr", KR(ret), KPC(tx_data),
|
||||
KPC(tx_data_memtable));
|
||||
} else if (existed_tx_data->end_scn_ < tx_data->end_scn_) {
|
||||
tg.click();
|
||||
if (OB_FAIL(tx_data_memtable->remove(tx_data->tx_id_))) {
|
||||
STORAGE_LOG(ERROR, "remove tx data from tx data memtable failed.", KR(ret), KPC(tx_data),
|
||||
KPC(tx_data_memtable));
|
||||
} else {
|
||||
existed_tx_data->is_in_tx_data_table_ = false;
|
||||
|
||||
// successfully remove
|
||||
}
|
||||
} else {
|
||||
// The end_scn of tx data in memtable is greater than or equal to the end_scn of current
|
||||
// tx data, which means the tx data waiting to insert is an older one. So we set need_insert =
|
||||
// false to skip inserting this tx data
|
||||
need_insert = false;
|
||||
tx_data_memtable->revert_tx_data(existed_tx_data);
|
||||
}
|
||||
}
|
||||
tg.click();
|
||||
|
||||
// insert or free according to the need_insert flag
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_LIKELY(need_insert)) {
|
||||
tg.click();
|
||||
if (OB_FAIL(tx_data_memtable->insert(tx_data))) {
|
||||
STORAGE_LOG(WARN, "insert tx data into tx data memtable failed.", KR(ret),
|
||||
KPC(tx_data_memtable), KPC(tx_data));
|
||||
} else {
|
||||
tx_data = nullptr;
|
||||
}
|
||||
tg.click();
|
||||
} else {
|
||||
free_tx_data(tx_data);
|
||||
tx_data = nullptr;
|
||||
}
|
||||
}
|
||||
if (tg.get_diff() > 100000) {
|
||||
STORAGE_LOG(INFO, "ObTxDataTable insert_info_memtable cost too much time", K(tg));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxDataTable::check_with_tx_data(const ObTransID tx_id, ObITxDataCheckFunctor &fn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -519,52 +456,100 @@ int ObTxDataTable::check_tx_data_in_memtable_(const ObTransID tx_id, ObITxDataCh
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(update_memtables_cache())) {
|
||||
STORAGE_LOG(WARN, "get all memtables fail.", KR(ret), K(tx_id));
|
||||
} else {
|
||||
// look for the tx data
|
||||
// construct memtable_handle before tx_data_guard to destruct it latter
|
||||
ObTableHandleV2 src_memtable_handle;
|
||||
ObTxDataGuard tx_data_guard;
|
||||
bool find = false;
|
||||
|
||||
if (OB_FAIL(get_tx_data_in_memtables_cache_(tx_id, src_memtable_handle, tx_data_guard, find))) {
|
||||
STORAGE_LOG(INFO, "get tx data in memtables cache failed.", KR(ret), K(tx_id));
|
||||
} else if (find) {
|
||||
// do function if find tx data in memtable
|
||||
if (OB_FAIL(fn(tx_data_guard.tx_data()))) {
|
||||
STORAGE_LOG(WARN, "do data check function fail.", KR(ret), KP(this), K(tablet_id_), K(tx_data_guard.tx_data()));
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(check_tx_data_with_cache_once_(tx_id, fn))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
if (OB_FAIL(update_memtables_cache())) {
|
||||
STORAGE_LOG(WARN, "update memtables cache failed", KR(ret));
|
||||
} else {
|
||||
// do check_tx_data_with_cache_once_ again
|
||||
}
|
||||
} else if (OB_TRANS_CTX_NOT_EXIST == ret) {
|
||||
// need check tx data in sstable
|
||||
} else {
|
||||
// do data check function success
|
||||
STORAGE_LOG(WARN, "check tx data with cache failed", KR(ret));
|
||||
}
|
||||
} else {
|
||||
ret = OB_TRANS_CTX_NOT_EXIST;
|
||||
// check tx data with cache succeed
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxDataTable::check_tx_data_with_cache_once_(const transaction::ObTransID tx_id, ObITxDataCheckFunctor &fn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
ObTxDataMemtable *tx_data_memtable = nullptr;
|
||||
ObTxDataGuard tx_data_guard;
|
||||
bool find = false;
|
||||
|
||||
TCRLockGuard guard(memtables_cache_.lock_);
|
||||
ObTableHdlArray &memtable_handles = memtables_cache_.memtable_handles_;
|
||||
|
||||
for (int i = memtable_handles.count() - 1; OB_SUCC(ret) && !find && i >= 0; i--) {
|
||||
tx_data_memtable = nullptr;
|
||||
if (OB_FAIL(memtable_handles.at(i).get_tx_data_memtable(tx_data_memtable))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "get tx data memtable from table handles fail.", KR(ret), K(tx_id), K(memtable_handles.at(i)));
|
||||
} else if (OB_ISNULL(tx_data_memtable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "tx data memtable is nullptr.", KR(ret), K(tx_id));
|
||||
} else {
|
||||
int tmp_ret = tx_data_memtable->get_tx_data(tx_id, tx_data_guard);
|
||||
if (OB_SUCCESS == tmp_ret) {
|
||||
find = true;
|
||||
} else if (OB_ENTRY_NOT_EXIST == tmp_ret) {
|
||||
// search tx_id in next memtable
|
||||
} else {
|
||||
STORAGE_LOG(WARN,
|
||||
"get_tx_data fail.",
|
||||
KR(tmp_ret),
|
||||
KP(this),
|
||||
K(tablet_id_),
|
||||
KP(tx_data_memtable),
|
||||
KPC(tx_data_memtable));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (find) {
|
||||
ret = fn(*tx_data_guard.tx_data());
|
||||
} else {
|
||||
int64_t memtable_head = -1;
|
||||
int64_t memtable_tail = -1;
|
||||
if (OB_FAIL(get_memtable_mgr_()->get_memtable_range(memtable_head, memtable_tail))) {
|
||||
STORAGE_LOG(WARN, "get memtable range failed", KR(ret));
|
||||
} else if (memtable_head != memtables_cache_.memtable_head_ || memtable_tail != memtables_cache_.memtable_tail_) {
|
||||
ret = OB_EAGAIN;
|
||||
} else {
|
||||
ret = OB_TRANS_CTX_NOT_EXIST;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxDataTable::update_memtables_cache()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool need_update = true;
|
||||
|
||||
// lock for updating memtables cache
|
||||
TCWLockGuard guard(memtables_cache_.lock_);
|
||||
if (OB_FAIL(check_need_update_memtables_cache_(need_update))) {
|
||||
STORAGE_LOG(WARN, "check if memtable handles need update failed.", KR(ret));
|
||||
} else if (need_update) {
|
||||
bool make_sure_need_update = true;
|
||||
// lock for updating memtables cache
|
||||
TCWLockGuard guard(memtables_cache_.lock_);
|
||||
if (OB_FAIL(check_need_update_memtables_cache_(make_sure_need_update))) {
|
||||
STORAGE_LOG(WARN, "check if memtable handles need update failed.", KR(ret));
|
||||
} else if (!make_sure_need_update) {
|
||||
// do not need update cache, skip update
|
||||
} else if (FALSE_IT(memtables_cache_.reuse())) {
|
||||
} else if (OB_FAIL(get_memtable_mgr_()->get_all_memtables_with_range(memtables_cache_.memtable_handles_,
|
||||
memtables_cache_.memtable_head_,
|
||||
memtables_cache_.memtable_tail_))) {
|
||||
STORAGE_LOG(WARN, "get all memtables with range failed.", KR(ret), KPC(this), KPC(get_memtable_mgr_()));
|
||||
}
|
||||
} else if (!need_update) {
|
||||
// do not need update cache, skip update
|
||||
} else if (FALSE_IT(memtables_cache_.reuse())) {
|
||||
} else if (OB_FAIL(get_memtable_mgr_()->get_all_memtables_with_range(memtables_cache_.memtable_handles_,
|
||||
memtables_cache_.memtable_head_,
|
||||
memtables_cache_.memtable_tail_))) {
|
||||
STORAGE_LOG(WARN, "get all memtables with range failed.", KR(ret), KPC(this), KPC(get_memtable_mgr_()));
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -575,11 +560,7 @@ int ObTxDataTable::check_need_update_memtables_cache_(bool &need_update)
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t memtable_head = -1;
|
||||
int64_t memtable_tail = -1;
|
||||
if (!is_started_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
need_update = false;
|
||||
STORAGE_LOG(WARN, "tx data memtable has stopped", KR(ret), KPC(this), K(need_update));
|
||||
} else if (OB_FAIL(get_memtable_mgr_()->get_memtable_range(memtable_head, memtable_tail))) {
|
||||
if (OB_FAIL(get_memtable_mgr_()->get_memtable_range(memtable_head, memtable_tail))) {
|
||||
STORAGE_LOG(WARN, "get memtable range failed.", KR(ret));
|
||||
} else if (memtables_cache_.memtable_head_ == memtable_head && memtables_cache_.memtable_tail_ == memtable_tail) {
|
||||
// cache already up to date, skip update
|
||||
@ -615,6 +596,8 @@ int ObTxDataTable::get_tx_data_in_memtables_cache_(const ObTransID tx_id,
|
||||
} else if (OB_ISNULL(tx_data_memtable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "tx data memtable is nullptr.", KR(ret), K(tx_id));
|
||||
} else if (ObTxDataMemtable::State::RELEASED == tx_data_memtable->get_state()) {
|
||||
// skip get tx data in this tx data memtable
|
||||
} else {
|
||||
int tmp_ret = tx_data_memtable->get_tx_data(tx_id, tx_data_guard);
|
||||
if (OB_SUCCESS == tmp_ret) {
|
||||
@ -655,9 +638,9 @@ int ObTxDataTable::check_tx_data_in_sstable_(const ObTransID tx_id, ObITxDataChe
|
||||
STORAGE_LOG(WARN, "check tx data in sstable failed.", KR(ret), KP(this), K(tablet_id_));
|
||||
}
|
||||
|
||||
// free undo status list if exist
|
||||
if (OB_NOT_NULL(tx_data.undo_status_list_.head_)) {
|
||||
free_undo_status_list_(tx_data.undo_status_list_.head_);
|
||||
tx_data.undo_status_list_.head_ = nullptr;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -690,7 +673,6 @@ int ObTxDataTable::get_tx_data_in_sstable_(const transaction::ObTransID tx_id, O
|
||||
// get tx data from sstable succeed.
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -831,11 +813,11 @@ int ObTxDataTable::get_upper_trans_version_before_given_scn(const SCN sstable_en
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (skip_calc) {
|
||||
} else if (0 == calc_upper_trans_version_cache_.commit_scns_.array_.count()) {
|
||||
} else if (0 == calc_upper_trans_version_cache_.commit_versions_.array_.count()) {
|
||||
STORAGE_LOG(ERROR, "Unexpected empty array.", K(calc_upper_trans_version_cache_));
|
||||
} else {
|
||||
TCRLockGuard lock_guard(calc_upper_trans_version_cache_.lock_);
|
||||
if (!calc_upper_trans_version_cache_.commit_scns_.is_valid()) {
|
||||
if (!calc_upper_trans_version_cache_.commit_versions_.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "invalid cache for upper trans version calculation", KR(ret));
|
||||
} else if (OB_FAIL(calc_upper_trans_scn_(sstable_end_scn, upper_trans_version))) {
|
||||
@ -1156,7 +1138,7 @@ int ObTxDataTable::update_calc_upper_trans_version_cache_(ObITable *table)
|
||||
LOG_WARN("invalid tablet handle", KR(ret), K(tablet_handle), K(tablet_id_));
|
||||
} else {
|
||||
ObCommitVersionsGetter getter(iter_param, table);
|
||||
if (OB_FAIL(getter.get_next_row(calc_upper_trans_version_cache_.commit_scns_))) {
|
||||
if (OB_FAIL(getter.get_next_row(calc_upper_trans_version_cache_.commit_versions_))) {
|
||||
STORAGE_LOG(WARN, "update calc_upper_trans_trans_version_cache failed.", KR(ret), KPC(table));
|
||||
} else {
|
||||
calc_upper_trans_version_cache_.is_inited_ = true;
|
||||
@ -1171,7 +1153,7 @@ int ObTxDataTable::calc_upper_trans_scn_(const SCN sstable_end_scn, SCN &upper_t
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
const auto &array = calc_upper_trans_version_cache_.commit_scns_.array_;
|
||||
const auto &array = calc_upper_trans_version_cache_.commit_versions_.array_;
|
||||
int l = 0;
|
||||
int r = array.count() - 1;
|
||||
|
||||
@ -1203,10 +1185,13 @@ int ObTxDataTable::calc_upper_trans_scn_(const SCN sstable_end_scn, SCN &upper_t
|
||||
"ls_id", get_ls_id(),
|
||||
"array_count", array.count(),
|
||||
"chose_idx", l);
|
||||
for (int i = 0; i < array.count(); i++) {
|
||||
STORAGE_LOG(INFO, "commit versions array : ", K(i), K(array.at(i)));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxDataTable::supplement_undo_actions_if_exist(ObTxData *&tx_data)
|
||||
int ObTxDataTable::supplement_undo_actions_if_exist(ObTxData *tx_data)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTxData tx_data_from_sstable;
|
||||
@ -1228,6 +1213,12 @@ int ObTxDataTable::supplement_undo_actions_if_exist(ObTxData *&tx_data)
|
||||
} else {
|
||||
// assign and reset to avoid deep copy
|
||||
tx_data->undo_status_list_ = tx_data_from_sstable.undo_status_list_;
|
||||
tx_data_from_sstable.undo_status_list_.reset();
|
||||
}
|
||||
|
||||
if (OB_NOT_NULL(tx_data_from_sstable.undo_status_list_.head_)) {
|
||||
STORAGE_LOG(WARN, "supplement undo actions failed", KR(ret), KPC(tx_data), K(get_ls_id()));
|
||||
free_undo_status_list_(tx_data_from_sstable.undo_status_list_.head_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1292,13 +1283,14 @@ int ObTxDataTable::dump_tx_data_in_memtable_2_text_(const ObTransID tx_id, FILE
|
||||
} else {
|
||||
ObTableHandleV2 src_memtable_handle;
|
||||
ObTxDataGuard tx_data_guard;
|
||||
tx_data_guard.reset();
|
||||
bool find = false;
|
||||
|
||||
if (OB_FAIL(get_tx_data_in_memtables_cache_(tx_id, src_memtable_handle, tx_data_guard, find))) {
|
||||
STORAGE_LOG(INFO, "get tx data in memtables cache failed.", KR(ret), K(tx_id));
|
||||
} else if (find) {
|
||||
fprintf(fd, "********** Tx Data MemTable ***********\n\n");
|
||||
tx_data_guard.tx_data().dump_2_text(fd);
|
||||
tx_data_guard.tx_data()->dump_2_text(fd);
|
||||
fprintf(fd, "\n********** Tx Data MemTable ***********\n");
|
||||
} else {
|
||||
ret = OB_TRANS_CTX_NOT_EXIST;
|
||||
@ -1325,17 +1317,6 @@ int ObTxDataTable::dump_tx_data_in_sstable_2_text_(const ObTransID tx_id, FILE *
|
||||
|
||||
share::ObLSID ObTxDataTable::get_ls_id() { return ls_->get_ls_id(); }
|
||||
|
||||
bool CleanTxDataSSTableCacheFunctor::operator()(const transaction::ObTransID &key,
|
||||
ObTxData *tx_data)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t *latest_used_ts = ObTxData::get_latest_used_ts_by_tx_data(tx_data);
|
||||
if ((*latest_used_ts < clean_ts_) && (OB_FAIL(sstable_cache_.del(key)))) {
|
||||
STORAGE_LOG(WARN, "delete tx data from tx data sstable cache failed.", KR(ret), KPC(tx_data));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
|
||||
} // namespace oceanbase
|
||||
|
||||
Reference in New Issue
Block a user