From b5ac0c7bf60eba1d57b767cbbd650f7dbc514a31 Mon Sep 17 00:00:00 2001 From: Handora Date: Tue, 16 May 2023 09:46:50 +0000 Subject: [PATCH] optimize memtable garbage collection --- .../storage/test_tenant_meta_mem_mgr.cpp | 19 +++ src/pl/ob_pl_resolver.cpp | 2 +- .../memtable/mvcc/ob_mvcc_trans_ctx.cpp | 10 +- src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h | 5 +- .../memtable/mvcc/ob_tx_callback_list.cpp | 28 +++- .../memtable/mvcc/ob_tx_callback_list.h | 17 +-- src/storage/memtable/ob_memtable.cpp | 11 +- src/storage/memtable/ob_memtable.h | 5 +- src/storage/memtable/ob_memtable_context.cpp | 13 +- src/storage/memtable/ob_memtable_context.h | 4 +- src/storage/memtable/ob_memtable_util.h | 6 +- .../meta_mem/ob_tenant_meta_mem_mgr.cpp | 123 +++++++++++++++++- src/storage/meta_mem/ob_tenant_meta_mem_mgr.h | 7 + src/storage/tx/ob_trans_ctx_mgr_v4.cpp | 23 ++-- src/storage/tx/ob_trans_ctx_mgr_v4.h | 7 +- src/storage/tx/ob_trans_functor.h | 10 +- src/storage/tx/ob_trans_part_ctx.cpp | 12 +- src/storage/tx/ob_trans_part_ctx.h | 3 +- src/storage/tx/ob_trans_service.cpp | 18 +-- src/storage/tx/ob_trans_service.h | 4 +- .../memtable/mvcc/test_mvcc_callback.cpp | 59 ++++----- 21 files changed, 282 insertions(+), 104 deletions(-) diff --git a/mittest/mtlenv/storage/test_tenant_meta_mem_mgr.cpp b/mittest/mtlenv/storage/test_tenant_meta_mem_mgr.cpp index 731c0fed2..a6bf11260 100644 --- a/mittest/mtlenv/storage/test_tenant_meta_mem_mgr.cpp +++ b/mittest/mtlenv/storage/test_tenant_meta_mem_mgr.cpp @@ -32,6 +32,25 @@ namespace oceanbase { using namespace share; + +namespace memtable +{ +int ObMemtable::get_ls_id(share::ObLSID &ls_id) +{ + ls_id = share::ObLSID(1001); + return OB_SUCCESS; +} + +int ObMemtable::batch_remove_unused_callback_for_uncommited_txn( + const ObLSID , const memtable::ObMemtableSet *) +{ + int ret = OB_SUCCESS; + + return ret; +} + +} + namespace storage { int ObTenantCheckpointSlogHandler::read_from_ckpt(const ObMetaDiskAddr &phy_addr, diff --git a/src/pl/ob_pl_resolver.cpp b/src/pl/ob_pl_resolver.cpp index 845f33e6a..715467f30 100644 --- a/src/pl/ob_pl_resolver.cpp +++ b/src/pl/ob_pl_resolver.cpp @@ -9585,7 +9585,7 @@ int ObPLResolver::resolve_sqlcode_or_sqlerrm(ObQualifiedName &q_name, int ret = OB_SUCCESS; UNUSED(unit_ast); if (1 == q_name.access_idents_.count() - && q_name.access_idents_.at(0).get_type() == UNKNOWN + && q_name.access_idents_.at(0).get_type() == AccessNameType::UNKNOWN && (0 == q_name.access_idents_.at(0).access_name_.case_compare("SQLCODE") || 0 == q_name.access_idents_.at(0).access_name_.case_compare("SQLERRM"))) { ObPLSQLCodeSQLErrmRawExpr *c_expr = NULL; diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index b5da0fbb3..657e75e1e 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -453,15 +453,17 @@ int ObTransCallbackMgr::remove_callbacks_for_fast_commit(bool &has_remove) return ret; } -int ObTransCallbackMgr::remove_callback_for_uncommited_txn(ObIMemtable *memtable, const share::SCN max_applied_scn) +int ObTransCallbackMgr::remove_callback_for_uncommited_txn( + const memtable::ObMemtableSet *memtable_set, + const share::SCN max_applied_scn) { int ret = OB_SUCCESS; - if (OB_ISNULL(memtable)) { + if (OB_ISNULL(memtable_set)) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "memtable is null", K(ret)); - } else if (OB_FAIL(callback_list_.remove_callbacks_for_remove_memtable(memtable, max_applied_scn))) { - TRANS_LOG(WARN, "fifo remove callback fail", K(ret), K(*memtable)); + } else if (OB_FAIL(callback_list_.remove_callbacks_for_remove_memtable(memtable_set, max_applied_scn))) { + TRANS_LOG(WARN, "fifo remove callback fail", K(ret), KPC(memtable_set)); } return ret; diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h index c8648d9e4..310c11a91 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h @@ -22,6 +22,7 @@ #include "storage/tx/ob_trans_define.h" #include "storage/memtable/mvcc/ob_tx_callback_list.h" #include "storage/tablelock/ob_table_lock_common.h" +#include "storage/memtable/ob_memtable_util.h" namespace oceanbase { @@ -217,7 +218,9 @@ public: void set_for_replay(const bool for_replay); bool is_for_replay() const { return ATOMIC_LOAD(&for_replay_); } int remove_callbacks_for_fast_commit(bool &has_remove); - int remove_callback_for_uncommited_txn(memtable::ObIMemtable *memtable, const share::SCN max_applied_scn); + int remove_callback_for_uncommited_txn( + const memtable::ObMemtableSet *memtable_set, + const share::SCN max_applied_scn); int get_memtable_key_arr(transaction::ObMemtableKeyArray &memtable_key_arr); void acquire_callback_list(); void revert_callback_list(); diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp index 001929568..2843af405 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp @@ -192,19 +192,33 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove) return ret; } -int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove, const share::SCN max_applied_scn) +int ObTxCallbackList::remove_callbacks_for_remove_memtable( + const memtable::ObMemtableSet *memtable_set, + const share::SCN max_applied_scn) { int ret = OB_SUCCESS; SpinLockGuard guard(latch_); ObRemoveSyncCallbacksWCondFunctor functor( // condition for remove - [memtable_for_remove](ObITransCallback *callback) -> bool { - if (callback->get_memtable() == memtable_for_remove) { - return true; - } else { - return false; + [memtable_set](ObITransCallback *callback) -> bool { + bool ok = false; + int ret = OB_SUCCESS; + int bool_ret = true; + while (!ok) { + if (OB_HASH_EXIST == (ret = memtable_set->exist_refactored((uint64_t)callback->get_memtable()))) { + bool_ret = true; + ok = true; + } else if (OB_HASH_NOT_EXIST == ret) { + bool_ret = false; + ok = true; + } else { + // We have no idea to handle the error + TRANS_LOG(ERROR, "hashset fetch encounter unexpected error", K(ret)); + ok = false; + } } + return bool_ret; }, // condition for stop [max_applied_scn](ObITransCallback *callback) -> bool { if (callback->get_scn() > max_applied_scn) { @@ -222,7 +236,7 @@ int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable callback_mgr_.add_release_memtable_callback_remove_cnt(functor.get_remove_cnt()); ensure_checksum_(functor.get_checksum_last_scn()); if (functor.get_remove_cnt() > 0) { - TRANS_LOG(INFO, "remove callbacks for remove memtable", KP(memtable_for_remove), + TRANS_LOG(INFO, "remove callbacks for remove memtable", KP(memtable_set), K(functor), K(*this)); } } diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.h b/src/storage/memtable/mvcc/ob_tx_callback_list.h index d759540b1..4ada8e5d2 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.h +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.h @@ -14,7 +14,7 @@ #define OCEANBASE_STORAGE_MEMTABLE_MVCC_OB_TX_CALLBACK_LIST #include "storage/memtable/mvcc/ob_tx_callback_functor.h" -// #include "storage/tx/ob_trans_part_ctx.h" +#include "storage/memtable/ob_memtable_util.h" namespace oceanbase { @@ -58,13 +58,14 @@ public: int remove_callbacks_for_fast_commit(bool &has_remove); // remove_callbacks_for_remove_memtable will remove all callbacks that is - // belonged to the specified memtable. It will only remove callbacks without - // removing data by calling checkpoint_callback. So user need implement lazy - // callback for the correctness. And user need guarantee all callbacks - // belonged to the memtable must be synced before removing. What's more, it - // will calculate checksum when removing. - int remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove, - const share::SCN max_applied_scn); + // belonged to the specified memtable sets. It will only remove callbacks + // without removing data by calling checkpoint_callback. So user need to + // implement lazy callback for the correctness. And user need guarantee all + // callbacks belonged to the memtable sets must be synced before removing. + // What's more, it will calculate checksum when removing. + int remove_callbacks_for_remove_memtable( + const memtable::ObMemtableSet *memtable_set, + const share::SCN max_applied_scn); // remove_callbacks_for_rollback_to will remove callbacks from back to front // until callbacks smaller or equal than the seq_no. It will remove both diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 1d7ac2bc6..d2d955ff1 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -203,7 +203,8 @@ int ObMemtable::init(const ObITable::TableKey &table_key, return ret; } -int ObMemtable::remove_unused_callback_for_uncommited_txn_() +int ObMemtable::batch_remove_unused_callback_for_uncommited_txn( + const ObLSID ls_id, const memtable::ObMemtableSet *memtable_set) { int ret = OB_SUCCESS; // NB: Do not use cache here, because the trans_service may be destroyed under @@ -212,8 +213,8 @@ int ObMemtable::remove_unused_callback_for_uncommited_txn_() MTL_CTX()->get(); if (NULL != txs_svr - && OB_FAIL(txs_svr->remove_callback_for_uncommited_txn(this))) { - TRANS_LOG(WARN, "remove callback for uncommited txn failed", K(ret), K(*this)); + && OB_FAIL(txs_svr->remove_callback_for_uncommited_txn(ls_id, memtable_set))) { + TRANS_LOG(WARN, "remove callback for uncommited txn failed", K(ret), KPC(memtable_set)); } return ret; @@ -238,10 +239,6 @@ void ObMemtable::destroy() if (OB_SUCCESS != freezer->unset_tenant_slow_freeze(tablet_id)) { TRANS_LOG(WARN, "unset tenant slow freeze failed.", K(*this)); } - - if (OB_FAIL(remove_unused_callback_for_uncommited_txn_())) { - TRANS_LOG(WARN, "failed to remove callback for uncommited txn", K(ret), K(*this)); - } } ObITable::reset(); ObFreezeCheckpoint::reset(); diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index f0ee6dcc7..c6ade4f64 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -379,6 +379,9 @@ public: int64_t get_memtable_mgr_op_cnt() { return ATOMIC_LOAD(&memtable_mgr_op_cnt_); } int64_t inc_memtable_mgr_op_cnt() { return ATOMIC_AAF(&memtable_mgr_op_cnt_, 1); } int64_t dec_memtable_mgr_op_cnt() { return ATOMIC_SAF(&memtable_mgr_op_cnt_, 1); } + static int batch_remove_unused_callback_for_uncommited_txn( + const share::ObLSID ls_id, + const memtable::ObMemtableSet *memtable_set); /* freeze */ virtual int set_frozen() override { local_allocator_.set_frozen(); return OB_SUCCESS; } @@ -507,8 +510,6 @@ private: const storage::ObTableReadInfo &read_info, ObMvccWriteResult &res); - int remove_unused_callback_for_uncommited_txn_(); - void get_begin(ObMvccAccessCtx &ctx); void get_end(ObMvccAccessCtx &ctx, int ret); void scan_begin(ObMvccAccessCtx &ctx); diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index bd0556e23..5fadbb0c8 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -870,18 +870,21 @@ int ObMemtableCtx::remove_callbacks_for_fast_commit() return ret; } -int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable *mt, const share::SCN max_applied_scn) +int ObMemtableCtx::remove_callback_for_uncommited_txn( + const memtable::ObMemtableSet *memtable_set, + const share::SCN max_applied_scn) { int ret = OB_SUCCESS; ObByteLockGuard guard(lock_); - if (OB_ISNULL(mt)) { + if (OB_ISNULL(memtable_set)) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "memtable is NULL", K(mt)); + TRANS_LOG(WARN, "memtable is NULL", K(memtable_set)); } else if (OB_FAIL(reuse_log_generator_())) { TRANS_LOG(ERROR, "fail to reset log generator", K(ret)); - } else if (OB_FAIL(trans_mgr_.remove_callback_for_uncommited_txn(mt, max_applied_scn))) { - TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt)); + } else if (OB_FAIL(trans_mgr_.remove_callback_for_uncommited_txn(memtable_set, + max_applied_scn))) { + TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(memtable_set)); } return ret; diff --git a/src/storage/memtable/ob_memtable_context.h b/src/storage/memtable/ob_memtable_context.h index 0d69637c4..93b49fbd6 100644 --- a/src/storage/memtable/ob_memtable_context.h +++ b/src/storage/memtable/ob_memtable_context.h @@ -397,7 +397,9 @@ public: uint64_t get_tenant_id() const; inline bool has_read_elr_data() const { return read_elr_data_; } int remove_callbacks_for_fast_commit(); - int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt, const share::SCN max_applied_scn); + int remove_callback_for_uncommited_txn( + const memtable::ObMemtableSet *memtable_set, + const share::SCN max_applied_scn); int rollback(const int64_t seq_no, const int64_t from_seq_no); bool is_all_redo_submitted(); bool is_for_replay() const { return trans_mgr_.is_for_replay(); } diff --git a/src/storage/memtable/ob_memtable_util.h b/src/storage/memtable/ob_memtable_util.h index 6fd0f5fbe..179e1f6f1 100644 --- a/src/storage/memtable/ob_memtable_util.h +++ b/src/storage/memtable/ob_memtable_util.h @@ -16,6 +16,8 @@ #include "share/ob_define.h" #include "lib/container/ob_iarray.h" #include "lib/thread_local/ob_tsi_factory.h" +#include "lib/hash/ob_hashset.h" +#include "common/rowkey/ob_store_rowkey.h" namespace oceanbase { @@ -24,6 +26,8 @@ using namespace common; namespace memtable { +typedef common::hash::ObHashSet ObMemtableSet; + template const char *strarray(const common::ObIArray &array) { @@ -65,7 +69,7 @@ public: ObFakeStoreRowKey(const char *str, const int64_t size) { for(int64_t i = 0; i < OBJ_CNT; i++) { - obj_array_[i].set_char_value(str, size); + obj_array_[i].set_char_value(str, (ObString::obstr_size_t)size); } rowkey_.assign(obj_array_, OBJ_CNT); } diff --git a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp index 70875497c..0a9ab4724 100644 --- a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp +++ b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp @@ -156,6 +156,8 @@ int ObTenantMetaMemMgr::init() LOG_WARN("fail to init pin set lock", K(ret)); } else if (OB_FAIL(pinned_tablet_set_.create(pin_set_bucket_num))) { LOG_WARN("fail to create pinned tablet set", K(ret)); + } else if (OB_FAIL(gc_memtable_map_.create(10, "GCMemtableMap", "GCMemtableMap", tenant_id_))) { + LOG_WARN("fail to initialize gc memtable map", K(ret)); } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::TenantMetaMemMgr, tg_id_))) { LOG_WARN("fail to create thread for t3m", K(ret)); } else { @@ -233,6 +235,20 @@ void ObTenantMetaMemMgr::destroy() pin_set_lock_.destroy(); pinned_tablet_set_.destroy(); while (!is_all_clean && OB_SUCC(gc_tables_in_queue(is_all_clean))); + for (auto iter = gc_memtable_map_.begin(); + OB_SUCC(ret) && iter != gc_memtable_map_.end(); ++iter) { + memtable::ObMemtableSet *memtable_set = iter->second; + if (OB_NOT_NULL(memtable_set)) { + if (0 != memtable_set->size()) { + LOG_ERROR("leaked memtable", KPC(memtable_set)); + } + if (OB_FAIL(memtable_set->destroy())) { + LOG_ERROR("memtable set destroy failed", K(ret)); + } + ob_free(memtable_set); + } + } + gc_memtable_map_.destroy(); bucket_lock_.destroy(); allocator_.reset(); for (int64_t i = 0; i <= ObITable::TableType::REMOTE_LOGICAL_MINOR_SSTABLE; i++) { @@ -320,8 +336,17 @@ int ObTenantMetaMemMgr::gc_tables_in_queue(bool &all_table_cleaned) ret = OB_ERR_UNEXPECTED; LOG_ERROR("the table type is invalid", K(ret), KP(item), KPC(item), KPC(table), K(table->get_key())); } else { - pool_arr_[index]->free_obj(static_cast(table)); - table_cnt_arr[index]++; + if (ObITable::TableType::DATA_MEMTABLE == table_type) { + if (OB_FAIL(push_memtable_into_gc_map_(static_cast(table)))) { + LOG_WARN("push memtable into gc map failed", K(ret)); + } + } else { + pool_arr_[index]->free_obj(static_cast(table)); + } + + if (OB_SUCC(ret)) { + table_cnt_arr[index]++; + } } } else if (TC_REACH_TIME_INTERVAL(1000 * 1000)) { LOG_INFO("the table is unsafe to destroy", KPC(table)); @@ -343,6 +368,9 @@ int ObTenantMetaMemMgr::gc_tables_in_queue(bool &all_table_cleaned) } } + // batch gc memtable will handle error gracefully + batch_gc_memtable_(); + if (OB_SUCC(ret)) { all_table_cleaned = (0 == free_tables_queue_.size()); } @@ -384,6 +412,97 @@ int ObTenantMetaMemMgr::gc_tables_in_queue(bool &all_table_cleaned) return ret; } +void ObTenantMetaMemMgr::batch_gc_memtable_() +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + + for (auto iter = gc_memtable_map_.begin(); + iter != gc_memtable_map_.end(); ++iter) { + const ObLSID &ls_id = iter->first; + memtable::ObMemtableSet *memtable_set = iter->second; + if (OB_NOT_NULL(memtable_set) + && 0 != memtable_set->size()) { + if (OB_TMP_FAIL(ObMemtable::batch_remove_unused_callback_for_uncommited_txn(ls_id, + memtable_set))) { + LOG_ERROR("batch remove memtable set failed", K(tmp_ret), KPC(memtable_set)); + for (auto set_iter = memtable_set->begin(); set_iter != memtable_set->end(); ++set_iter) { + if (OB_TMP_FAIL(push_table_into_gc_queue((ObITable *)(set_iter->first), + ObITable::TableType::DATA_MEMTABLE))) { + LOG_ERROR("push table into gc queue failed, maybe there will be leak", + K(tmp_ret), KPC(memtable_set)); + } + } + + LOG_INFO("batch gc memtable failed and push into gc queue again", K(memtable_set->size())); + while (OB_TMP_FAIL(memtable_set->clear())) { + LOG_ERROR("clear memtable set failed", K(tmp_ret), KPC(memtable_set)); + } + } else { + for (auto set_iter = memtable_set->begin(); set_iter != memtable_set->end(); ++set_iter) { + pool_arr_[static_cast(ObITable::TableType::DATA_MEMTABLE)]->free_obj((void *)(set_iter->first)); + } + + LOG_INFO("batch gc memtable successfully", K(memtable_set->size())); + while (OB_TMP_FAIL(memtable_set->clear())) { + LOG_ERROR("clear memtable set failed", K(tmp_ret), KPC(memtable_set)); + } + } + } + } + + if (REACH_TENANT_TIME_INTERVAL(1_hour)) { + if (OB_TMP_FAIL(gc_memtable_map_.clear())) { + LOG_ERROR("clear gc memtable map failed", K(tmp_ret)); + } + } +} + +int ObTenantMetaMemMgr::push_memtable_into_gc_map_(memtable::ObMemtable *memtable) +{ + int ret = OB_SUCCESS; + share::ObLSID ls_id; + const ObMemAttr attr(tenant_id_, "memtable_set"); + memtable::ObMemtableSet *memtable_set = nullptr; + + if (OB_FAIL(memtable->get_ls_id(ls_id))) { + LOG_WARN("get memtable ls id failed", K(ret), KPC(memtable)); + } else if (OB_FAIL(gc_memtable_map_.get_refactored(ls_id, memtable_set))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + memtable::ObMemtableSet *tmp_memtable_set; + void *buf = NULL; + if (OB_ISNULL(buf = ob_malloc(sizeof(memtable::ObMemtableSet), attr))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate memory for hash set", K(ret)); + } else if (OB_ISNULL(tmp_memtable_set = new (buf) ObMemtableSet())) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate memory for hash set", K(ret)); + } else if (OB_FAIL(tmp_memtable_set->create(1024))) { + LOG_WARN("fail to create", K(ret)); + } else if (OB_FAIL(gc_memtable_map_.set_refactored(ls_id, tmp_memtable_set))) { + LOG_WARN("fail to set hash set", K(ret)); + } else { + memtable_set = tmp_memtable_set; + } + + if (NULL != buf && OB_FAIL(ret)) { + ob_free(tmp_memtable_set); + } + } else { + LOG_WARN("map get failed", K(ret), KPC(memtable)); + } + } + + if (OB_SUCC(ret)) { + if (OB_FAIL(memtable_set->set_refactored((uint64_t)(memtable), 0/*flag, not overwrite*/))) { + LOG_WARN("map set failed", K(ret), KPC(memtable)); + } + } + + return ret; +} + void ObTenantMetaMemMgr::gc_sstable(ObSSTable *sstable) { if (OB_UNLIKELY(nullptr == sstable)) { diff --git a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.h b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.h index eded2c4e8..c2926b0a8 100644 --- a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.h +++ b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.h @@ -26,6 +26,7 @@ #include "storage/blocksstable/ob_sstable.h" #include "storage/ddl/ob_tablet_ddl_kv_mgr.h" #include "storage/memtable/ob_memtable.h" +#include "storage/memtable/ob_memtable_util.h" #include "storage/meta_mem/ob_meta_obj_struct.h" #include "storage/meta_mem/ob_meta_pointer_map.h" #include "storage/meta_mem/ob_meta_pointer.h" @@ -338,6 +339,7 @@ private: static const int64_t MIN_MINOR_SSTABLE_GC_INTERVAL_US = 1 * 1000 * 1000L; // 1s static const int64_t REFRESH_CONFIG_INTERVAL_US = 10 * 1000 * 1000L; // 10s static const int64_t ONE_ROUND_RECYCLE_COUNT_THRESHOLD = 20000L; + static const int64_t BATCH_MEMTABLE_GC_THRESHOLD = 100L; static const int64_t DEFAULT_TABLET_WASH_HEAP_COUNT = 16; static const int64_t DEFAULT_MINOR_SSTABLE_SET_COUNT = 49999; static const int64_t SSTABLE_GC_MAX_TIME = 500; // 500us @@ -433,6 +435,9 @@ private: common::ObIArray &info) const; int get_allocator_info(common::ObIArray &info) const; int exist_pinned_tablet(const ObTabletMapKey &key); + int push_memtable_into_gc_map_(memtable::ObMemtable *memtable); + void batch_gc_memtable_(); + private: int cmp_ret_; HeapCompare compare_; @@ -453,6 +458,8 @@ private: ObBucketLock pin_set_lock_; PinnedTabletSet pinned_tablet_set_; // tablets which are in multi source data transaction procedure + common::hash::ObHashMap gc_memtable_map_; + ObTenantMetaObjPool memtable_pool_; ObTenantMetaObjPool sstable_pool_; ObTenantMetaObjPool ddl_kv_pool_; diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index f1e1daa02..2af228085 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -556,7 +556,7 @@ int ObLSTxCtxMgr::get_tx_ctx_directly_from_hash_map(const ObTransID &tx_id, ObPa return ret; } -int ObLSTxCtxMgr::remove_callback_for_uncommited_tx(ObMemtable* mt) +int ObLSTxCtxMgr::remove_callback_for_uncommited_tx(const memtable::ObMemtableSet *memtable_set) { int ret = OB_SUCCESS; ObTimeGuard timeguard("remove callback for uncommited txn", 10L * 1000L); @@ -564,17 +564,15 @@ int ObLSTxCtxMgr::remove_callback_for_uncommited_tx(ObMemtable* mt) if (IS_NOT_INIT) { TRANS_LOG(WARN, "ObLSTxCtxMgr not inited", K_(ls_id)); ret = OB_NOT_INIT; - } else if (OB_ISNULL(mt)) { + } else if (OB_ISNULL(memtable_set)) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "memtable is null", K_(ls_id)); - } else if (mt->get_timestamp() < online_ts_) { - TRANS_LOG(INFO, "pass old memtable", KPC(mt), K(online_ts_), K(ls_id_)); } else { - ObRemoveCallbackFunctor fn(mt); + ObRemoveCallbackFunctor fn(memtable_set); if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { - TRANS_LOG(WARN, "for each transaction context error", KR(ret), KP(mt)); + TRANS_LOG(WARN, "for each transaction context error", KR(ret), KPC(memtable_set)); } else { - TRANS_LOG(DEBUG, "remove callback for uncommited txn success", KP(mt)); + TRANS_LOG(DEBUG, "remove callback for uncommited txn success", KPC(memtable_set)); } } return ret; @@ -2037,7 +2035,8 @@ int ObTxCtxMgr::get_min_undecided_scn(const ObLSID &ls_id, SCN &scn) return ret; } -int ObTxCtxMgr::remove_callback_for_uncommited_tx(const ObLSID &ls_id, ObMemtable* mt) +int ObTxCtxMgr::remove_callback_for_uncommited_tx( + const ObLSID ls_id, const memtable::ObMemtableSet *memtable_set) { int ret = OB_SUCCESS; ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; @@ -2049,13 +2048,13 @@ int ObTxCtxMgr::remove_callback_for_uncommited_tx(const ObLSID &ls_id, ObMemtabl TRANS_LOG(WARN, "invalid argument", K(ls_id)); ret = OB_INVALID_ARGUMENT; } else if (OB_FAIL(get_ls_tx_ctx_mgr(ls_id, ls_tx_ctx_mgr))) { - TRANS_LOG(WARN, "get participant transaction context mgr error", KP(mt)); + TRANS_LOG(WARN, "get participant transaction context mgr error", KP(memtable_set)); ret = OB_PARTITION_NOT_EXIST; } else { - if (OB_FAIL(ls_tx_ctx_mgr->remove_callback_for_uncommited_tx(mt))) { - TRANS_LOG(WARN, "get remove callback for uncommited txn failed", KR(ret), KP(mt)); + if (OB_FAIL(ls_tx_ctx_mgr->remove_callback_for_uncommited_tx(memtable_set))) { + TRANS_LOG(WARN, "get remove callback for uncommited txn failed", KR(ret), KP(memtable_set)); } else { - TRANS_LOG(DEBUG, "get remove callback for uncommited txn succeed", KP(mt)); + TRANS_LOG(DEBUG, "get remove callback for uncommited txn succeed", KP(memtable_set)); } revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr); } diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index bc685ba3d..1ccedf601 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -375,7 +375,8 @@ public: // When the memtable has mini-merged, the commit_version of its associated // transaction may be undecided; // @param [in] mt: the memtable point which is going to be deleted; - int remove_callback_for_uncommited_tx(memtable::ObMemtable* mt); + int remove_callback_for_uncommited_tx( + const memtable::ObMemtableSet *memtable_set); // Find the TxCtx and execute the check functor with the tx_data retained on the TxCtx; // Called by the ObTxCtxTable @@ -928,7 +929,9 @@ public: // transaction may be undecided; // @param [in] ls_id: the specified ls_id; // @param [in] mt: the memtable point which will be deleted; - int remove_callback_for_uncommited_tx(const share::ObLSID &ls_id, memtable::ObMemtable* mt); + int remove_callback_for_uncommited_tx( + const ObLSID ls_id, + const memtable::ObMemtableSet *memtable_set); TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(this)); diff --git a/src/storage/tx/ob_trans_functor.h b/src/storage/tx/ob_trans_functor.h index 5eac2119a..5627fa3cf 100644 --- a/src/storage/tx/ob_trans_functor.h +++ b/src/storage/tx/ob_trans_functor.h @@ -1076,17 +1076,19 @@ public: class ObRemoveCallbackFunctor { public: - explicit ObRemoveCallbackFunctor(memtable::ObMemtable *mt) : mt_(mt) {} + explicit ObRemoveCallbackFunctor( + const memtable::ObMemtableSet *memtable_set) + : memtable_set_(memtable_set) {} ~ObRemoveCallbackFunctor() {} OPERATOR_V4(ObRemoveCallbackFunctor) { bool bool_ret = true; int tmp_ret = OB_SUCCESS; - if (!tx_id.is_valid() || OB_ISNULL(tx_ctx) || OB_ISNULL(mt_)) { + if (!tx_id.is_valid() || OB_ISNULL(tx_ctx) || OB_ISNULL(memtable_set_)) { tmp_ret = OB_ERR_UNEXPECTED; TRANS_LOG_RET(WARN, tmp_ret, "invalid argument", K(tx_id)); - } else if (OB_TMP_FAIL(tx_ctx->remove_callback_for_uncommited_txn(mt_))) { + } else if (OB_TMP_FAIL(tx_ctx->remove_callback_for_uncommited_txn(memtable_set_))) { TRANS_LOG_RET(WARN, tmp_ret, "remove callback for unncommitted tx failed", K(tmp_ret), K(tx_id), KP(tx_ctx)); } @@ -1098,7 +1100,7 @@ public: return bool_ret; } private: - memtable::ObMemtable *mt_; + const memtable::ObMemtableSet *memtable_set_; }; class ObTxSubmitLogFunctor diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 2e2411b9b..63f3c9022 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1583,7 +1583,8 @@ int64_t ObPartTransCtx::to_string(char* buf, const int64_t buf_len) const return len1 + len2; } -int ObPartTransCtx::remove_callback_for_uncommited_txn(ObMemtable *mt) +int ObPartTransCtx::remove_callback_for_uncommited_txn( + const memtable::ObMemtableSet *memtable_set) { int ret = OB_SUCCESS; CtxLockGuard guard(lock_); @@ -1591,12 +1592,13 @@ int ObPartTransCtx::remove_callback_for_uncommited_txn(ObMemtable *mt) if (IS_NOT_INIT) { TRANS_LOG(WARN, "ObPartTransCtx not inited"); ret = OB_NOT_INIT; - } else if (OB_ISNULL(mt)) { + } else if (OB_ISNULL(memtable_set)) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "memtable is NULL", K(mt)); + TRANS_LOG(WARN, "memtable is NULL", K(memtable_set)); } else if (OB_UNLIKELY(is_exiting_)) { - } else if (OB_FAIL(mt_ctx_.remove_callback_for_uncommited_txn(mt, exec_info_.max_applied_log_ts_))) { - TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt_ctx_), K(exec_info_.max_applied_log_ts_)); + } else if (OB_FAIL(mt_ctx_.remove_callback_for_uncommited_txn(memtable_set, exec_info_.max_applied_log_ts_))) { + TRANS_LOG(WARN, "fail to remove callback for uncommitted txn", K(ret), K(mt_ctx_), + K(memtable_set), K(exec_info_.max_applied_log_ts_)); } return ret; diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index e5a7ddd1f..f61b0797b 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -174,7 +174,8 @@ public: uint64_t get_lock_for_read_retry_count() const { return mt_ctx_.get_lock_for_read_retry_count(); } int check_scheduler_status(); - int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt); + int remove_callback_for_uncommited_txn( + const memtable::ObMemtableSet *memtable_set); int64_t get_trans_mem_total_size() const { return mt_ctx_.get_trans_mem_total_size(); } void update_max_submitted_seq_no(const int64_t seq_no) diff --git a/src/storage/tx/ob_trans_service.cpp b/src/storage/tx/ob_trans_service.cpp index c6d791670..c4383e0da 100644 --- a/src/storage/tx/ob_trans_service.cpp +++ b/src/storage/tx/ob_trans_service.cpp @@ -609,10 +609,10 @@ int ObTransService::handle_redo_sync_task_(ObDupTableRedoSyncTask *task, bool &n return OB_NOT_SUPPORTED; } -int ObTransService::remove_callback_for_uncommited_txn(memtable::ObMemtable* mt) +int ObTransService::remove_callback_for_uncommited_txn( + const ObLSID ls_id, const memtable::ObMemtableSet *memtable_set) { int ret = OB_SUCCESS; - ObLSID ls_id; if (IS_NOT_INIT) { TRANS_LOG(WARN, "ObTransService not inited"); @@ -620,25 +620,21 @@ int ObTransService::remove_callback_for_uncommited_txn(memtable::ObMemtable* mt) } else if (OB_UNLIKELY(!is_running_)) { TRANS_LOG(WARN, "ObTransService is not running"); ret = OB_NOT_RUNNING; - } else if (OB_ISNULL(mt)) { + } else if (OB_ISNULL(memtable_set)) { TRANS_LOG(WARN, "memtable is NULL"); ret = OB_INVALID_ARGUMENT; - } else if (OB_FAIL(mt->get_ls_id(ls_id))) { - TRANS_LOG(WARN, "get ls id failed", K(ret)); } else if (!ls_id.is_valid()) { ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "unexpected ls id", KR(ret), K(ls_id), KP(mt)); - } else if (OB_FAIL(tx_ctx_mgr_.remove_callback_for_uncommited_tx(ls_id, mt))) { - TRANS_LOG(WARN, "participant remove callback for uncommitt txn failed", KR(ret), K(ls_id), KP(mt)); + TRANS_LOG(ERROR, "unexpected ls id", KR(ret), K(ls_id), KPC(memtable_set)); + } else if (OB_FAIL(tx_ctx_mgr_.remove_callback_for_uncommited_tx(ls_id, memtable_set))) { + TRANS_LOG(WARN, "participant remove callback for uncommitt txn failed", KR(ret), K(ls_id), KP(memtable_set)); } else { - TRANS_LOG(DEBUG, "participant remove callback for uncommitt txn success", K(ls_id), KP(mt)); + TRANS_LOG(DEBUG, "participant remove callback for uncommitt txn success", K(ls_id), KP(memtable_set)); } return ret; } - - /** * get snapshot_version for stmt * diff --git a/src/storage/tx/ob_trans_service.h b/src/storage/tx/ob_trans_service.h index 7a7f9f2c6..d3b807980 100644 --- a/src/storage/tx/ob_trans_service.h +++ b/src/storage/tx/ob_trans_service.h @@ -176,7 +176,9 @@ public: //get the memory used condition of transaction module int iterate_trans_memory_stat(ObTransMemStatIterator &mem_stat_iter); int dump_elr_statistic(); - int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt); + int remove_callback_for_uncommited_txn( + const ObLSID ls_id, + const memtable::ObMemtableSet *memtable_set); int64_t get_tenant_id() const { return tenant_id_; } const common::ObAddr &get_server() { return self_; } ObTransTimer &get_trans_timer() { return timer_; } diff --git a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp index 7c6309151..4d8c571b5 100644 --- a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp +++ b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp @@ -361,20 +361,14 @@ TEST_F(TestTxCallbackList, remove_callback_by_release_memtable) EXPECT_EQ(9, callback_list_.get_length()); - EXPECT_EQ(OB_SUCCESS, - callback_list_.remove_callbacks_for_remove_memtable(memtable2, scn_1/*not used*/)); - - EXPECT_EQ(7, callback_list_.get_length()); - EXPECT_EQ(2, mgr_.get_callback_remove_for_remove_memtable_count()); + memtable::ObMemtableSet memtable_set; + EXPECT_EQ(OB_SUCCESS, memtable_set.create(24)); + EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable2))); + EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable1))); + EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable3))); EXPECT_EQ(OB_SUCCESS, - callback_list_.remove_callbacks_for_remove_memtable(memtable1, scn_1/*not used*/)); - - EXPECT_EQ(5, callback_list_.get_length()); - EXPECT_EQ(4, mgr_.get_callback_remove_for_remove_memtable_count()); - - EXPECT_EQ(OB_SUCCESS, - callback_list_.remove_callbacks_for_remove_memtable(memtable3, scn_1/*not used*/)); + callback_list_.remove_callbacks_for_remove_memtable(&memtable_set, scn_1/*not used*/)); EXPECT_EQ(4, callback_list_.get_length()); EXPECT_EQ(5, mgr_.get_callback_remove_for_remove_memtable_count()); @@ -822,16 +816,13 @@ TEST_F(TestTxCallbackList, checksum_remove_memtable_and_tx_end) EXPECT_EQ(9, callback_list_.get_length()); - EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable2, scn_1/*not used*/)); - EXPECT_EQ(true, is_checksum_equal(5, checksum_)); - EXPECT_EQ(scn_3, callback_list_.checksum_scn_); + memtable::ObMemtableSet memtable_set; + EXPECT_EQ(OB_SUCCESS, memtable_set.create(24)); + EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable2))); + EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable1))); + EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(memtable3))); - EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable3, scn_1/*not used*/)); - EXPECT_EQ(true, is_checksum_equal(5, checksum_)); - EXPECT_EQ(scn_3, callback_list_.checksum_scn_); - - - EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(memtable1, scn_1/*not used*/)); + EXPECT_EQ(OB_SUCCESS, callback_list_.remove_callbacks_for_remove_memtable(&memtable_set, scn_1/*not used*/)); EXPECT_EQ(true, is_checksum_equal(5, checksum_)); EXPECT_EQ(scn_3, callback_list_.checksum_scn_); @@ -1060,8 +1051,11 @@ TEST_F(TestTxCallbackList, checksum_all_and_tx_end_test) { } if (enable) { + memtable::ObMemtableSet memtable_set; + EXPECT_EQ(OB_SUCCESS, memtable_set.create(24)); + EXPECT_EQ(OB_SUCCESS, memtable_set.set_refactored((uint64_t)(mt))); EXPECT_EQ(OB_SUCCESS, - callback_list_.remove_callbacks_for_remove_memtable(mt, scn_1/*not used*/)); + callback_list_.remove_callbacks_for_remove_memtable(&memtable_set, scn_1/*not used*/)); } return enable; @@ -1190,20 +1184,27 @@ int ObTxCallbackList::remove_callbacks_for_fast_commit(bool &has_remove) return ret; } -int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable_for_remove, - const share::SCN) +int ObTxCallbackList::remove_callbacks_for_remove_memtable( + const memtable::ObMemtableSet *memtable_set, + const share::SCN) { int ret = OB_SUCCESS; SpinLockGuard guard(latch_); ObRemoveSyncCallbacksWCondFunctor functor( // condition for remove - [memtable_for_remove](ObITransCallback *callback) -> bool { - if (callback->get_memtable() == memtable_for_remove) { - return true; + [memtable_set](ObITransCallback *callback) -> bool { + int ret = OB_SUCCESS; + int bool_ret = true; + if (OB_HASH_EXIST == (ret = memtable_set->exist_refactored((uint64_t)callback->get_memtable()))) { + bool_ret = true; + } else if (OB_HASH_NOT_EXIST == ret) { + bool_ret = false; } else { - return false; + // We have no idea to handle the error + ob_abort(); } + return bool_ret; }, // condition for stop [](ObITransCallback *) -> bool { return false; @@ -1217,7 +1218,7 @@ int ObTxCallbackList::remove_callbacks_for_remove_memtable(ObIMemtable *memtable callback_mgr_.add_release_memtable_callback_remove_cnt(functor.get_remove_cnt()); ensure_checksum_(functor.get_checksum_last_scn()); if (functor.get_remove_cnt() > 0) { - TRANS_LOG(INFO, "remove callbacks for remove memtable", KP(memtable_for_remove), + TRANS_LOG(INFO, "remove callbacks for remove memtable", KP(memtable_set), K(functor), K(*this)); } }