optimize memtable garbage collection

This commit is contained in:
Handora 2023-05-16 09:46:50 +00:00 committed by ob-robot
parent 6ff52a2fd8
commit b5ac0c7bf6
21 changed files with 282 additions and 104 deletions

View File

@ -32,6 +32,25 @@
namespace oceanbase
{
using namespace share;
namespace memtable
{
int ObMemtable::get_ls_id(share::ObLSID &ls_id)
{
ls_id = share::ObLSID(1001);
return OB_SUCCESS;
}
int ObMemtable::batch_remove_unused_callback_for_uncommited_txn(
const ObLSID , const memtable::ObMemtableSet *)
{
int ret = OB_SUCCESS;
return ret;
}
}
namespace storage
{
int ObTenantCheckpointSlogHandler::read_from_ckpt(const ObMetaDiskAddr &phy_addr,

View File

@ -9585,7 +9585,7 @@ int ObPLResolver::resolve_sqlcode_or_sqlerrm(ObQualifiedName &q_name,
int ret = OB_SUCCESS;
UNUSED(unit_ast);
if (1 == q_name.access_idents_.count()
&& q_name.access_idents_.at(0).get_type() == UNKNOWN
&& q_name.access_idents_.at(0).get_type() == AccessNameType::UNKNOWN
&& (0 == q_name.access_idents_.at(0).access_name_.case_compare("SQLCODE")
|| 0 == q_name.access_idents_.at(0).access_name_.case_compare("SQLERRM"))) {
ObPLSQLCodeSQLErrmRawExpr *c_expr = NULL;

View File

@ -453,15 +453,17 @@ int ObTransCallbackMgr::remove_callbacks_for_fast_commit(bool &has_remove)
return ret;
}
int ObTransCallbackMgr::remove_callback_for_uncommited_txn(ObIMemtable *memtable, const share::SCN max_applied_scn)
int ObTransCallbackMgr::remove_callback_for_uncommited_txn(
const memtable::ObMemtableSet *memtable_set,
const share::SCN max_applied_scn)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(memtable)) {
if (OB_ISNULL(memtable_set)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "memtable is null", K(ret));
} else if (OB_FAIL(callback_list_.remove_callbacks_for_remove_memtable(memtable, max_applied_scn))) {
TRANS_LOG(WARN, "fifo remove callback fail", K(ret), K(*memtable));
} else if (OB_FAIL(callback_list_.remove_callbacks_for_remove_memtable(memtable_set, max_applied_scn))) {
TRANS_LOG(WARN, "fifo remove callback fail", K(ret), KPC(memtable_set));
}
return ret;

View File

@ -22,6 +22,7 @@
#include "storage/tx/ob_trans_define.h"
#include "storage/memtable/mvcc/ob_tx_callback_list.h"
#include "storage/tablelock/ob_table_lock_common.h"
#include "storage/memtable/ob_memtable_util.h"
namespace oceanbase
{
@ -217,7 +218,9 @@ public:
void set_for_replay(const bool for_replay);
bool is_for_replay() const { return ATOMIC_LOAD(&for_replay_); }
int remove_callbacks_for_fast_commit(bool &has_remove);
int remove_callback_for_uncommited_txn(memtable::ObIMemtable *memtable, const share::SCN max_applied_scn);
int remove_callback_for_uncommited_txn(
const memtable::ObMemtableSet *memtable_set,
const share::SCN max_applied_scn);
int get_memtable_key_arr(transaction::ObMemtableKeyArray &memtable_key_arr);
void acquire_callback_list();
void revert_callback_list();

View File

@ -192,19 +192,33 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove)
return ret;
}
int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove, const share::SCN max_applied_scn)
int ObTxCallbackList::remove_callbacks_for_remove_memtable(
const memtable::ObMemtableSet *memtable_set,
const share::SCN max_applied_scn)
{
int ret = OB_SUCCESS;
SpinLockGuard guard(latch_);
ObRemoveSyncCallbacksWCondFunctor functor(
// condition for remove
[memtable_for_remove](ObITransCallback *callback) -> bool {
if (callback->get_memtable() == memtable_for_remove) {
return true;
} else {
return false;
[memtable_set](ObITransCallback *callback) -> bool {
bool ok = false;
int ret = OB_SUCCESS;
int bool_ret = true;
while (!ok) {
if (OB_HASH_EXIST == (ret = memtable_set->exist_refactored((uint64_t)callback->get_memtable()))) {
bool_ret = true;
ok = true;
} else if (OB_HASH_NOT_EXIST == ret) {
bool_ret = false;
ok = true;
} else {
// We have no idea to handle the error
TRANS_LOG(ERROR, "hashset fetch encounter unexpected error", K(ret));
ok = false;
}
}
return bool_ret;
}, // condition for stop
[max_applied_scn](ObITransCallback *callback) -> bool {
if (callback->get_scn() > max_applied_scn) {
@ -222,7 +236,7 @@ int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable
callback_mgr_.add_release_memtable_callback_remove_cnt(functor.get_remove_cnt());
ensure_checksum_(functor.get_checksum_last_scn());
if (functor.get_remove_cnt() > 0) {
TRANS_LOG(INFO, "remove callbacks for remove memtable", KP(memtable_for_remove),
TRANS_LOG(INFO, "remove callbacks for remove memtable", KP(memtable_set),
K(functor), K(*this));
}
}

View File

@ -14,7 +14,7 @@
#define OCEANBASE_STORAGE_MEMTABLE_MVCC_OB_TX_CALLBACK_LIST
#include "storage/memtable/mvcc/ob_tx_callback_functor.h"
// #include "storage/tx/ob_trans_part_ctx.h"
#include "storage/memtable/ob_memtable_util.h"
namespace oceanbase
{
@ -58,13 +58,14 @@ public:
int remove_callbacks_for_fast_commit(bool &has_remove);
// remove_callbacks_for_remove_memtable will remove all callbacks that is
// belonged to the specified memtable. It will only remove callbacks without
// removing data by calling checkpoint_callback. So user need implement lazy
// callback for the correctness. And user need guarantee all callbacks
// belonged to the memtable must be synced before removing. What's more, it
// will calculate checksum when removing.
int remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove,
const share::SCN max_applied_scn);
// belonged to the specified memtable sets. It will only remove callbacks
// without removing data by calling checkpoint_callback. So user need to
// implement lazy callback for the correctness. And user need guarantee all
// callbacks belonged to the memtable sets must be synced before removing.
// What's more, it will calculate checksum when removing.
int remove_callbacks_for_remove_memtable(
const memtable::ObMemtableSet *memtable_set,
const share::SCN max_applied_scn);
// remove_callbacks_for_rollback_to will remove callbacks from back to front
// until callbacks smaller or equal than the seq_no. It will remove both

View File

@ -203,7 +203,8 @@ int ObMemtable::init(const ObITable::TableKey &table_key,
return ret;
}
int ObMemtable::remove_unused_callback_for_uncommited_txn_()
int ObMemtable::batch_remove_unused_callback_for_uncommited_txn(
const ObLSID ls_id, const memtable::ObMemtableSet *memtable_set)
{
int ret = OB_SUCCESS;
// NB: Do not use cache here, because the trans_service may be destroyed under
@ -212,8 +213,8 @@ int ObMemtable::remove_unused_callback_for_uncommited_txn_()
MTL_CTX()->get<transaction::ObTransService *>();
if (NULL != txs_svr
&& OB_FAIL(txs_svr->remove_callback_for_uncommited_txn(this))) {
TRANS_LOG(WARN, "remove callback for uncommited txn failed", K(ret), K(*this));
&& OB_FAIL(txs_svr->remove_callback_for_uncommited_txn(ls_id, memtable_set))) {
TRANS_LOG(WARN, "remove callback for uncommited txn failed", K(ret), KPC(memtable_set));
}
return ret;
@ -238,10 +239,6 @@ void ObMemtable::destroy()
if (OB_SUCCESS != freezer->unset_tenant_slow_freeze(tablet_id)) {
TRANS_LOG(WARN, "unset tenant slow freeze failed.", K(*this));
}
if (OB_FAIL(remove_unused_callback_for_uncommited_txn_())) {
TRANS_LOG(WARN, "failed to remove callback for uncommited txn", K(ret), K(*this));
}
}
ObITable::reset();
ObFreezeCheckpoint::reset();

View File

@ -379,6 +379,9 @@ public:
int64_t get_memtable_mgr_op_cnt() { return ATOMIC_LOAD(&memtable_mgr_op_cnt_); }
int64_t inc_memtable_mgr_op_cnt() { return ATOMIC_AAF(&memtable_mgr_op_cnt_, 1); }
int64_t dec_memtable_mgr_op_cnt() { return ATOMIC_SAF(&memtable_mgr_op_cnt_, 1); }
static int batch_remove_unused_callback_for_uncommited_txn(
const share::ObLSID ls_id,
const memtable::ObMemtableSet *memtable_set);
/* freeze */
virtual int set_frozen() override { local_allocator_.set_frozen(); return OB_SUCCESS; }
@ -507,8 +510,6 @@ private:
const storage::ObTableReadInfo &read_info,
ObMvccWriteResult &res);
int remove_unused_callback_for_uncommited_txn_();
void get_begin(ObMvccAccessCtx &ctx);
void get_end(ObMvccAccessCtx &ctx, int ret);
void scan_begin(ObMvccAccessCtx &ctx);

View File

@ -870,18 +870,21 @@ int ObMemtableCtx::remove_callbacks_for_fast_commit()
return ret;
}
int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable *mt, const share::SCN max_applied_scn)
int ObMemtableCtx::remove_callback_for_uncommited_txn(
const memtable::ObMemtableSet *memtable_set,
const share::SCN max_applied_scn)
{
int ret = OB_SUCCESS;
ObByteLockGuard guard(lock_);
if (OB_ISNULL(mt)) {
if (OB_ISNULL(memtable_set)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "memtable is NULL", K(mt));
TRANS_LOG(WARN, "memtable is NULL", K(memtable_set));
} else if (OB_FAIL(reuse_log_generator_())) {
TRANS_LOG(ERROR, "fail to reset log generator", K(ret));
} else if (OB_FAIL(trans_mgr_.remove_callback_for_uncommited_txn(mt, max_applied_scn))) {
TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt));
} else if (OB_FAIL(trans_mgr_.remove_callback_for_uncommited_txn(memtable_set,
max_applied_scn))) {
TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(memtable_set));
}
return ret;

View File

@ -397,7 +397,9 @@ public:
uint64_t get_tenant_id() const;
inline bool has_read_elr_data() const { return read_elr_data_; }
int remove_callbacks_for_fast_commit();
int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt, const share::SCN max_applied_scn);
int remove_callback_for_uncommited_txn(
const memtable::ObMemtableSet *memtable_set,
const share::SCN max_applied_scn);
int rollback(const int64_t seq_no, const int64_t from_seq_no);
bool is_all_redo_submitted();
bool is_for_replay() const { return trans_mgr_.is_for_replay(); }

View File

@ -16,6 +16,8 @@
#include "share/ob_define.h"
#include "lib/container/ob_iarray.h"
#include "lib/thread_local/ob_tsi_factory.h"
#include "lib/hash/ob_hashset.h"
#include "common/rowkey/ob_store_rowkey.h"
namespace oceanbase
{
@ -24,6 +26,8 @@ using namespace common;
namespace memtable
{
typedef common::hash::ObHashSet<uint64_t> ObMemtableSet;
template <typename T>
const char *strarray(const common::ObIArray<T> &array)
{
@ -65,7 +69,7 @@ public:
ObFakeStoreRowKey(const char *str, const int64_t size)
{
for(int64_t i = 0; i < OBJ_CNT; i++) {
obj_array_[i].set_char_value(str, size);
obj_array_[i].set_char_value(str, (ObString::obstr_size_t)size);
}
rowkey_.assign(obj_array_, OBJ_CNT);
}

View File

@ -156,6 +156,8 @@ int ObTenantMetaMemMgr::init()
LOG_WARN("fail to init pin set lock", K(ret));
} else if (OB_FAIL(pinned_tablet_set_.create(pin_set_bucket_num))) {
LOG_WARN("fail to create pinned tablet set", K(ret));
} else if (OB_FAIL(gc_memtable_map_.create(10, "GCMemtableMap", "GCMemtableMap", tenant_id_))) {
LOG_WARN("fail to initialize gc memtable map", K(ret));
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::TenantMetaMemMgr, tg_id_))) {
LOG_WARN("fail to create thread for t3m", K(ret));
} else {
@ -233,6 +235,20 @@ void ObTenantMetaMemMgr::destroy()
pin_set_lock_.destroy();
pinned_tablet_set_.destroy();
while (!is_all_clean && OB_SUCC(gc_tables_in_queue(is_all_clean)));
for (auto iter = gc_memtable_map_.begin();
OB_SUCC(ret) && iter != gc_memtable_map_.end(); ++iter) {
memtable::ObMemtableSet *memtable_set = iter->second;
if (OB_NOT_NULL(memtable_set)) {
if (0 != memtable_set->size()) {
LOG_ERROR("leaked memtable", KPC(memtable_set));
}
if (OB_FAIL(memtable_set->destroy())) {
LOG_ERROR("memtable set destroy failed", K(ret));
}
ob_free(memtable_set);
}
}
gc_memtable_map_.destroy();
bucket_lock_.destroy();
allocator_.reset();
for (int64_t i = 0; i <= ObITable::TableType::REMOTE_LOGICAL_MINOR_SSTABLE; i++) {
@ -320,8 +336,17 @@ int ObTenantMetaMemMgr::gc_tables_in_queue(bool &all_table_cleaned)
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("the table type is invalid", K(ret), KP(item), KPC(item), KPC(table), K(table->get_key()));
} else {
pool_arr_[index]->free_obj(static_cast<void *>(table));
table_cnt_arr[index]++;
if (ObITable::TableType::DATA_MEMTABLE == table_type) {
if (OB_FAIL(push_memtable_into_gc_map_(static_cast<memtable::ObMemtable *>(table)))) {
LOG_WARN("push memtable into gc map failed", K(ret));
}
} else {
pool_arr_[index]->free_obj(static_cast<void *>(table));
}
if (OB_SUCC(ret)) {
table_cnt_arr[index]++;
}
}
} else if (TC_REACH_TIME_INTERVAL(1000 * 1000)) {
LOG_INFO("the table is unsafe to destroy", KPC(table));
@ -343,6 +368,9 @@ int ObTenantMetaMemMgr::gc_tables_in_queue(bool &all_table_cleaned)
}
}
// batch gc memtable will handle error gracefully
batch_gc_memtable_();
if (OB_SUCC(ret)) {
all_table_cleaned = (0 == free_tables_queue_.size());
}
@ -384,6 +412,97 @@ int ObTenantMetaMemMgr::gc_tables_in_queue(bool &all_table_cleaned)
return ret;
}
void ObTenantMetaMemMgr::batch_gc_memtable_()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
for (auto iter = gc_memtable_map_.begin();
iter != gc_memtable_map_.end(); ++iter) {
const ObLSID &ls_id = iter->first;
memtable::ObMemtableSet *memtable_set = iter->second;
if (OB_NOT_NULL(memtable_set)
&& 0 != memtable_set->size()) {
if (OB_TMP_FAIL(ObMemtable::batch_remove_unused_callback_for_uncommited_txn(ls_id,
memtable_set))) {
LOG_ERROR("batch remove memtable set failed", K(tmp_ret), KPC(memtable_set));
for (auto set_iter = memtable_set->begin(); set_iter != memtable_set->end(); ++set_iter) {
if (OB_TMP_FAIL(push_table_into_gc_queue((ObITable *)(set_iter->first),
ObITable::TableType::DATA_MEMTABLE))) {
LOG_ERROR("push table into gc queue failed, maybe there will be leak",
K(tmp_ret), KPC(memtable_set));
}
}
LOG_INFO("batch gc memtable failed and push into gc queue again", K(memtable_set->size()));
while (OB_TMP_FAIL(memtable_set->clear())) {
LOG_ERROR("clear memtable set failed", K(tmp_ret), KPC(memtable_set));
}
} else {
for (auto set_iter = memtable_set->begin(); set_iter != memtable_set->end(); ++set_iter) {
pool_arr_[static_cast<int>(ObITable::TableType::DATA_MEMTABLE)]->free_obj((void *)(set_iter->first));
}
LOG_INFO("batch gc memtable successfully", K(memtable_set->size()));
while (OB_TMP_FAIL(memtable_set->clear())) {
LOG_ERROR("clear memtable set failed", K(tmp_ret), KPC(memtable_set));
}
}
}
}
if (REACH_TENANT_TIME_INTERVAL(1_hour)) {
if (OB_TMP_FAIL(gc_memtable_map_.clear())) {
LOG_ERROR("clear gc memtable map failed", K(tmp_ret));
}
}
}
int ObTenantMetaMemMgr::push_memtable_into_gc_map_(memtable::ObMemtable *memtable)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id;
const ObMemAttr attr(tenant_id_, "memtable_set");
memtable::ObMemtableSet *memtable_set = nullptr;
if (OB_FAIL(memtable->get_ls_id(ls_id))) {
LOG_WARN("get memtable ls id failed", K(ret), KPC(memtable));
} else if (OB_FAIL(gc_memtable_map_.get_refactored(ls_id, memtable_set))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
memtable::ObMemtableSet *tmp_memtable_set;
void *buf = NULL;
if (OB_ISNULL(buf = ob_malloc(sizeof(memtable::ObMemtableSet), attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory for hash set", K(ret));
} else if (OB_ISNULL(tmp_memtable_set = new (buf) ObMemtableSet())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory for hash set", K(ret));
} else if (OB_FAIL(tmp_memtable_set->create(1024))) {
LOG_WARN("fail to create", K(ret));
} else if (OB_FAIL(gc_memtable_map_.set_refactored(ls_id, tmp_memtable_set))) {
LOG_WARN("fail to set hash set", K(ret));
} else {
memtable_set = tmp_memtable_set;
}
if (NULL != buf && OB_FAIL(ret)) {
ob_free(tmp_memtable_set);
}
} else {
LOG_WARN("map get failed", K(ret), KPC(memtable));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(memtable_set->set_refactored((uint64_t)(memtable), 0/*flag, not overwrite*/))) {
LOG_WARN("map set failed", K(ret), KPC(memtable));
}
}
return ret;
}
void ObTenantMetaMemMgr::gc_sstable(ObSSTable *sstable)
{
if (OB_UNLIKELY(nullptr == sstable)) {

View File

@ -26,6 +26,7 @@
#include "storage/blocksstable/ob_sstable.h"
#include "storage/ddl/ob_tablet_ddl_kv_mgr.h"
#include "storage/memtable/ob_memtable.h"
#include "storage/memtable/ob_memtable_util.h"
#include "storage/meta_mem/ob_meta_obj_struct.h"
#include "storage/meta_mem/ob_meta_pointer_map.h"
#include "storage/meta_mem/ob_meta_pointer.h"
@ -338,6 +339,7 @@ private:
static const int64_t MIN_MINOR_SSTABLE_GC_INTERVAL_US = 1 * 1000 * 1000L; // 1s
static const int64_t REFRESH_CONFIG_INTERVAL_US = 10 * 1000 * 1000L; // 10s
static const int64_t ONE_ROUND_RECYCLE_COUNT_THRESHOLD = 20000L;
static const int64_t BATCH_MEMTABLE_GC_THRESHOLD = 100L;
static const int64_t DEFAULT_TABLET_WASH_HEAP_COUNT = 16;
static const int64_t DEFAULT_MINOR_SSTABLE_SET_COUNT = 49999;
static const int64_t SSTABLE_GC_MAX_TIME = 500; // 500us
@ -433,6 +435,9 @@ private:
common::ObIArray<ObTenantMetaMemStatus> &info) const;
int get_allocator_info(common::ObIArray<ObTenantMetaMemStatus> &info) const;
int exist_pinned_tablet(const ObTabletMapKey &key);
int push_memtable_into_gc_map_(memtable::ObMemtable *memtable);
void batch_gc_memtable_();
private:
int cmp_ret_;
HeapCompare compare_;
@ -453,6 +458,8 @@ private:
ObBucketLock pin_set_lock_;
PinnedTabletSet pinned_tablet_set_; // tablets which are in multi source data transaction procedure
common::hash::ObHashMap<share::ObLSID, memtable::ObMemtableSet*> gc_memtable_map_;
ObTenantMetaObjPool<memtable::ObMemtable> memtable_pool_;
ObTenantMetaObjPool<blocksstable::ObSSTable> sstable_pool_;
ObTenantMetaObjPool<ObDDLKV> ddl_kv_pool_;

View File

@ -556,7 +556,7 @@ int ObLSTxCtxMgr::get_tx_ctx_directly_from_hash_map(const ObTransID &tx_id, ObPa
return ret;
}
int ObLSTxCtxMgr::remove_callback_for_uncommited_tx(ObMemtable* mt)
int ObLSTxCtxMgr::remove_callback_for_uncommited_tx(const memtable::ObMemtableSet *memtable_set)
{
int ret = OB_SUCCESS;
ObTimeGuard timeguard("remove callback for uncommited txn", 10L * 1000L);
@ -564,17 +564,15 @@ int ObLSTxCtxMgr::remove_callback_for_uncommited_tx(ObMemtable* mt)
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObLSTxCtxMgr not inited", K_(ls_id));
ret = OB_NOT_INIT;
} else if (OB_ISNULL(mt)) {
} else if (OB_ISNULL(memtable_set)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "memtable is null", K_(ls_id));
} else if (mt->get_timestamp() < online_ts_) {
TRANS_LOG(INFO, "pass old memtable", KPC(mt), K(online_ts_), K(ls_id_));
} else {
ObRemoveCallbackFunctor fn(mt);
ObRemoveCallbackFunctor fn(memtable_set);
if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
TRANS_LOG(WARN, "for each transaction context error", KR(ret), KP(mt));
TRANS_LOG(WARN, "for each transaction context error", KR(ret), KPC(memtable_set));
} else {
TRANS_LOG(DEBUG, "remove callback for uncommited txn success", KP(mt));
TRANS_LOG(DEBUG, "remove callback for uncommited txn success", KPC(memtable_set));
}
}
return ret;
@ -2037,7 +2035,8 @@ int ObTxCtxMgr::get_min_undecided_scn(const ObLSID &ls_id, SCN &scn)
return ret;
}
int ObTxCtxMgr::remove_callback_for_uncommited_tx(const ObLSID &ls_id, ObMemtable* mt)
int ObTxCtxMgr::remove_callback_for_uncommited_tx(
const ObLSID ls_id, const memtable::ObMemtableSet *memtable_set)
{
int ret = OB_SUCCESS;
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
@ -2049,13 +2048,13 @@ int ObTxCtxMgr::remove_callback_for_uncommited_tx(const ObLSID &ls_id, ObMemtabl
TRANS_LOG(WARN, "invalid argument", K(ls_id));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(get_ls_tx_ctx_mgr(ls_id, ls_tx_ctx_mgr))) {
TRANS_LOG(WARN, "get participant transaction context mgr error", KP(mt));
TRANS_LOG(WARN, "get participant transaction context mgr error", KP(memtable_set));
ret = OB_PARTITION_NOT_EXIST;
} else {
if (OB_FAIL(ls_tx_ctx_mgr->remove_callback_for_uncommited_tx(mt))) {
TRANS_LOG(WARN, "get remove callback for uncommited txn failed", KR(ret), KP(mt));
if (OB_FAIL(ls_tx_ctx_mgr->remove_callback_for_uncommited_tx(memtable_set))) {
TRANS_LOG(WARN, "get remove callback for uncommited txn failed", KR(ret), KP(memtable_set));
} else {
TRANS_LOG(DEBUG, "get remove callback for uncommited txn succeed", KP(mt));
TRANS_LOG(DEBUG, "get remove callback for uncommited txn succeed", KP(memtable_set));
}
revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr);
}

View File

@ -375,7 +375,8 @@ public:
// When the memtable has mini-merged, the commit_version of its associated
// transaction may be undecided;
// @param [in] mt: the memtable point which is going to be deleted;
int remove_callback_for_uncommited_tx(memtable::ObMemtable* mt);
int remove_callback_for_uncommited_tx(
const memtable::ObMemtableSet *memtable_set);
// Find the TxCtx and execute the check functor with the tx_data retained on the TxCtx;
// Called by the ObTxCtxTable
@ -928,7 +929,9 @@ public:
// transaction may be undecided;
// @param [in] ls_id: the specified ls_id;
// @param [in] mt: the memtable point which will be deleted;
int remove_callback_for_uncommited_tx(const share::ObLSID &ls_id, memtable::ObMemtable* mt);
int remove_callback_for_uncommited_tx(
const ObLSID ls_id,
const memtable::ObMemtableSet *memtable_set);
TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(this));

View File

@ -1076,17 +1076,19 @@ public:
class ObRemoveCallbackFunctor
{
public:
explicit ObRemoveCallbackFunctor(memtable::ObMemtable *mt) : mt_(mt) {}
explicit ObRemoveCallbackFunctor(
const memtable::ObMemtableSet *memtable_set)
: memtable_set_(memtable_set) {}
~ObRemoveCallbackFunctor() {}
OPERATOR_V4(ObRemoveCallbackFunctor)
{
bool bool_ret = true;
int tmp_ret = OB_SUCCESS;
if (!tx_id.is_valid() || OB_ISNULL(tx_ctx) || OB_ISNULL(mt_)) {
if (!tx_id.is_valid() || OB_ISNULL(tx_ctx) || OB_ISNULL(memtable_set_)) {
tmp_ret = OB_ERR_UNEXPECTED;
TRANS_LOG_RET(WARN, tmp_ret, "invalid argument", K(tx_id));
} else if (OB_TMP_FAIL(tx_ctx->remove_callback_for_uncommited_txn(mt_))) {
} else if (OB_TMP_FAIL(tx_ctx->remove_callback_for_uncommited_txn(memtable_set_))) {
TRANS_LOG_RET(WARN, tmp_ret, "remove callback for unncommitted tx failed",
K(tmp_ret), K(tx_id), KP(tx_ctx));
}
@ -1098,7 +1100,7 @@ public:
return bool_ret;
}
private:
memtable::ObMemtable *mt_;
const memtable::ObMemtableSet *memtable_set_;
};
class ObTxSubmitLogFunctor

View File

@ -1583,7 +1583,8 @@ int64_t ObPartTransCtx::to_string(char* buf, const int64_t buf_len) const
return len1 + len2;
}
int ObPartTransCtx::remove_callback_for_uncommited_txn(ObMemtable *mt)
int ObPartTransCtx::remove_callback_for_uncommited_txn(
const memtable::ObMemtableSet *memtable_set)
{
int ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
@ -1591,12 +1592,13 @@ int ObPartTransCtx::remove_callback_for_uncommited_txn(ObMemtable *mt)
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObPartTransCtx not inited");
ret = OB_NOT_INIT;
} else if (OB_ISNULL(mt)) {
} else if (OB_ISNULL(memtable_set)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "memtable is NULL", K(mt));
TRANS_LOG(WARN, "memtable is NULL", K(memtable_set));
} else if (OB_UNLIKELY(is_exiting_)) {
} else if (OB_FAIL(mt_ctx_.remove_callback_for_uncommited_txn(mt, exec_info_.max_applied_log_ts_))) {
TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt_ctx_), K(exec_info_.max_applied_log_ts_));
} else if (OB_FAIL(mt_ctx_.remove_callback_for_uncommited_txn(memtable_set, exec_info_.max_applied_log_ts_))) {
TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt_ctx_),
K(memtable_set), K(exec_info_.max_applied_log_ts_));
}
return ret;

View File

@ -174,7 +174,8 @@ public:
uint64_t get_lock_for_read_retry_count() const { return mt_ctx_.get_lock_for_read_retry_count(); }
int check_scheduler_status();
int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt);
int remove_callback_for_uncommited_txn(
const memtable::ObMemtableSet *memtable_set);
int64_t get_trans_mem_total_size() const { return mt_ctx_.get_trans_mem_total_size(); }
void update_max_submitted_seq_no(const int64_t seq_no)

View File

@ -609,10 +609,10 @@ int ObTransService::handle_redo_sync_task_(ObDupTableRedoSyncTask *task, bool &n
return OB_NOT_SUPPORTED;
}
int ObTransService::remove_callback_for_uncommited_txn(memtable::ObMemtable* mt)
int ObTransService::remove_callback_for_uncommited_txn(
const ObLSID ls_id, const memtable::ObMemtableSet *memtable_set)
{
int ret = OB_SUCCESS;
ObLSID ls_id;
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObTransService not inited");
@ -620,25 +620,21 @@ int ObTransService::remove_callback_for_uncommited_txn(memtable::ObMemtable* mt)
} else if (OB_UNLIKELY(!is_running_)) {
TRANS_LOG(WARN, "ObTransService is not running");
ret = OB_NOT_RUNNING;
} else if (OB_ISNULL(mt)) {
} else if (OB_ISNULL(memtable_set)) {
TRANS_LOG(WARN, "memtable is NULL");
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(mt->get_ls_id(ls_id))) {
TRANS_LOG(WARN, "get ls id failed", K(ret));
} else if (!ls_id.is_valid()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected ls id", KR(ret), K(ls_id), KP(mt));
} else if (OB_FAIL(tx_ctx_mgr_.remove_callback_for_uncommited_tx(ls_id, mt))) {
TRANS_LOG(WARN, "participant remove callback for uncommitt txn failed", KR(ret), K(ls_id), KP(mt));
TRANS_LOG(ERROR, "unexpected ls id", KR(ret), K(ls_id), KPC(memtable_set));
} else if (OB_FAIL(tx_ctx_mgr_.remove_callback_for_uncommited_tx(ls_id, memtable_set))) {
TRANS_LOG(WARN, "participant remove callback for uncommitt txn failed", KR(ret), K(ls_id), KP(memtable_set));
} else {
TRANS_LOG(DEBUG, "participant remove callback for uncommitt txn success", K(ls_id), KP(mt));
TRANS_LOG(DEBUG, "participant remove callback for uncommitt txn success", K(ls_id), KP(memtable_set));
}
return ret;
}
/**
* get snapshot_version for stmt
*

View File

@ -176,7 +176,9 @@ public:
//get the memory used condition of transaction module
int iterate_trans_memory_stat(ObTransMemStatIterator &mem_stat_iter);
int dump_elr_statistic();
int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt);
int remove_callback_for_uncommited_txn(
const ObLSID ls_id,
const memtable::ObMemtableSet *memtable_set);
int64_t get_tenant_id() const { return tenant_id_; }
const common::ObAddr &get_server() { return self_; }
ObTransTimer &get_trans_timer() { return timer_; }

View File

@ -361,20 +361,14 @@ TEST_F(TestTxCallbackList, remove_callback_by_release_memtable)
EXPECT_EQ(9, callback_list_.get_length());
EXPECT_EQ(OB_SUCCESS,
callback_list_.remove_callbacks_for_remove_memtable(memtable2, scn_1/*not used*/));
EXPECT_EQ(7, callback_list_.get_length());
EXPECT_EQ(2, mgr_.get_callback_remove_for_remove_memtable_count());
memtable::ObMemtableSet memtable_set;
EXPECT_EQ(OB_SUCCESS, memtable_set.create(24));
EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable2)));
EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable1)));
EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable3)));
EXPECT_EQ(OB_SUCCESS,
callback_list_.remove_callbacks_for_remove_memtable(memtable1, scn_1/*not used*/));
EXPECT_EQ(5, callback_list_.get_length());
EXPECT_EQ(4, mgr_.get_callback_remove_for_remove_memtable_count());
EXPECT_EQ(OB_SUCCESS,
callback_list_.remove_callbacks_for_remove_memtable(memtable3, scn_1/*not used*/));
callback_list_.remove_callbacks_for_remove_memtable(&memtable_set, scn_1/*not used*/));
EXPECT_EQ(4, callback_list_.get_length());
EXPECT_EQ(5, mgr_.get_callback_remove_for_remove_memtable_count());
@ -822,16 +816,13 @@ TEST_F(TestTxCallbackList, checksum_remove_memtable_and_tx_end)
EXPECT_EQ(9, callback_list_.get_length());
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable2, scn_1/*not used*/));
EXPECT_EQ(true, is_checksum_equal(5, checksum_));
EXPECT_EQ(scn_3, callback_list_.checksum_scn_);
memtable::ObMemtableSet memtable_set;
EXPECT_EQ(OB_SUCCESS, memtable_set.create(24));
EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable2)));
EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable1)));
EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable3)));
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable3, scn_1/*not used*/));
EXPECT_EQ(true, is_checksum_equal(5, checksum_));
EXPECT_EQ(scn_3, callback_list_.checksum_scn_);
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable1, scn_1/*not used*/));
EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(&memtable_set, scn_1/*not used*/));
EXPECT_EQ(true, is_checksum_equal(5, checksum_));
EXPECT_EQ(scn_3, callback_list_.checksum_scn_);
@ -1060,8 +1051,11 @@ TEST_F(TestTxCallbackList, checksum_all_and_tx_end_test) {
}
if (enable) {
memtable::ObMemtableSet memtable_set;
EXPECT_EQ(OB_SUCCESS, memtable_set.create(24));
EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(mt)));
EXPECT_EQ(OB_SUCCESS,
callback_list_.remove_callbacks_for_remove_memtable(mt, scn_1/*not used*/));
callback_list_.remove_callbacks_for_remove_memtable(&memtable_set, scn_1/*not used*/));
}
return enable;
@ -1190,20 +1184,27 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove)
return ret;
}
int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove,
const share::SCN)
int ObTxCallbackList::remove_callbacks_for_remove_memtable(
const memtable::ObMemtableSet *memtable_set,
const share::SCN)
{
int ret = OB_SUCCESS;
SpinLockGuard guard(latch_);
ObRemoveSyncCallbacksWCondFunctor functor(
// condition for remove
[memtable_for_remove](ObITransCallback *callback) -> bool {
if (callback->get_memtable() == memtable_for_remove) {
return true;
[memtable_set](ObITransCallback *callback) -> bool {
int ret = OB_SUCCESS;
int bool_ret = true;
if (OB_HASH_EXIST == (ret = memtable_set->exist_refactored((uint64_t)callback->get_memtable()))) {
bool_ret = true;
} else if (OB_HASH_NOT_EXIST == ret) {
bool_ret = false;
} else {
return false;
// We have no idea to handle the error
ob_abort();
}
return bool_ret;
}, // condition for stop
[](ObITransCallback *) -> bool {
return false;
@ -1217,7 +1218,7 @@ int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable
callback_mgr_.add_release_memtable_callback_remove_cnt(functor.get_remove_cnt());
ensure_checksum_(functor.get_checksum_last_scn());
if (functor.get_remove_cnt() > 0) {
TRANS_LOG(INFO, "remove callbacks for remove memtable", KP(memtable_for_remove),
TRANS_LOG(INFO, "remove callbacks for remove memtable", KP(memtable_set),
K(functor), K(*this));
}
}