[FIX] dynamic expand or shrink tx data hash map when minor freeze to reduce memory use in tx data memtable

This commit is contained in:
ZenoWang 2023-02-15 12:44:35 +00:00 committed by ob-robot
parent 4ca42f308e
commit 7bd346dc46
11 changed files with 142 additions and 38 deletions

View File

@ -30,10 +30,6 @@ CTX_ITEM_DEF(STORAGE_LONG_TERM_META_CTX_ID)
CTX_ITEM_DEF(STORAGE_SHORT_TERM_META_CTX_ID)
CTX_ITEM_DEF(PARTITION_LOG_SERVICE_CTX_ID)
CTX_ITEM_DEF(REPLAY_STATUS_CTX_ID)
CTX_ITEM_DEF(TRANS_PART_CTX_ID)
CTX_ITEM_DEF(TRANS_COORD_CTX_ID)
CTX_ITEM_DEF(TRANS_SCHE_CTX_ID)
CTX_ITEM_DEF(OB_TRANS_LOCAL_TASK_ID)
CTX_ITEM_DEF(PLAN_CACHE_CTX_ID)
CTX_ITEM_DEF(REQ_MANAGER_CTX_ID)
CTX_ITEM_DEF(WORK_AREA)
@ -47,6 +43,7 @@ CTX_ITEM_DEF(META_OBJ_CTX_ID)
CTX_ITEM_DEF(TX_CALLBACK_CTX_ID)
CTX_ITEM_DEF(LOB_CTX_ID)
CTX_ITEM_DEF(PS_CACHE_CTX_ID)
CTX_ITEM_DEF(TX_DATA_TABLE)
CTX_ITEM_DEF(MAX_CTX_ID)
#endif

View File

@ -433,18 +433,18 @@ int ObTenantFreezer::check_and_freeze_tx_data_()
LOG_WARN("[TenantFreezer] get tenant tx data mem used failed.", KR(ret));
} else {
int64_t total_memory = lib::get_tenant_memory_limit(tenant_info_.tenant_id_);
int64_t hold_memory = lib::get_tenant_memory_hold(tenant_info_.tenant_id_);
int64_t memstore_hold_memory = lib::get_tenant_memory_hold(tenant_info_.tenant_id_, ObCtxIds::MEMSTORE_CTX_ID);
int64_t self_freeze_min_limit_ = total_memory * (ObTxDataTable::TX_DATA_FREEZE_TRIGGER_MIN_PERCENTAGE / 100);
int64_t self_freeze_max_limit_ = total_memory * (ObTxDataTable::TX_DATA_FREEZE_TRIGGER_MAX_PERCENTAGE / 100);
int64_t self_freeze_tenant_hold_limit_
= (total_memory * (double(get_freeze_trigger_percentage_()) / 100));
if ((tenant_tx_data_mem_used > self_freeze_max_limit_)
|| ((hold_memory > self_freeze_tenant_hold_limit_)
|| ((memstore_hold_memory > self_freeze_tenant_hold_limit_)
&& (tenant_tx_data_mem_used > self_freeze_min_limit_))) {
// trigger tx data self freeze
LOG_INFO("[TenantFreezer] Trigger Tx Data Table Self Freeze. ", K(tenant_info_.tenant_id_),
K(tenant_tx_data_mem_used), K(self_freeze_max_limit_), K(hold_memory),
K(tenant_tx_data_mem_used), K(self_freeze_max_limit_), K(memstore_hold_memory),
K(self_freeze_tenant_hold_limit_), K(self_freeze_min_limit_));
int tmp_ret = OB_SUCCESS;

View File

@ -31,7 +31,7 @@ void ObTxDataHashMap::destroy()
curr = next;
}
}
ob_free(buckets_);
allocator_.free(buckets_);
total_cnt_ = 0;
}
}
@ -40,7 +40,7 @@ int ObTxDataHashMap::init()
{
int ret = OB_SUCCESS;
const int64_t alloc_size = BUCKETS_CNT * sizeof(ObTxDataHashHeader);
void *ptr = ob_malloc(alloc_size);
void *ptr = allocator_.alloc(alloc_size);
if (OB_ISNULL(ptr)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "allocate memory failed when init tx data hash map", KR(ret), K(alloc_size), K(BUCKETS_CNT));

View File

@ -34,10 +34,17 @@ class ObTxDataHashMap {
private:
static const int64_t MAX_CONCURRENCY = 32;
static const int64_t MAX_CONCURRENCY_MASK = MAX_CONCURRENCY - 1;
public:
static const int64_t MIN_BUCKETS_CNT = 65536; /* 1 << 16 (MOD_MASK = 0xFFFF) 1MB */
static const int64_t DEFAULT_BUCKETS_CNT = 1048576; /* 1 << 20 (MOD_MASK = 0xFFFFF) 16MB */
static const int64_t MAX_BUCKETS_CNT = 16777216; /* 1 << 24 (MOD_MASK = 0xFFFFFF) 256MB */
static constexpr double LOAD_FACTORY_MAX_LIMIT = 0.7;
static constexpr double LOAD_FACTORY_MIN_LIMIT = 0.2;
public:
ObTxDataHashMap(const uint64_t buckets_cnt)
: BUCKETS_CNT(buckets_cnt),
ObTxDataHashMap(ObIAllocator &allocator, const uint64_t buckets_cnt)
: allocator_(allocator),
BUCKETS_CNT(buckets_cnt),
BUCKETS_MOD_MASK(buckets_cnt - 1),
buckets_(nullptr),
total_cnt_(0) {}
@ -67,7 +74,16 @@ public:
return ATOMIC_LOAD(&total_cnt_);
}
private:
OB_INLINE double load_factory() const
{
if (BUCKETS_CNT <= 0) {
return 0;
} else {
return double(total_cnt_) / double(BUCKETS_CNT);
}
}
public:
struct ObTxDataHashHeader {
ObTxData *next_;
ObTxData *hot_cache_val_;
@ -86,6 +102,7 @@ private:
};
private:
ObIAllocator &allocator_;
const int64_t BUCKETS_CNT;
const int64_t BUCKETS_MOD_MASK;
ObTxDataHashHeader *buckets_;

View File

@ -37,7 +37,8 @@ int64_t ObTxDataMemtable::PERIODICAL_SELECT_INTERVAL_NS = 1000LL * 1000LL * 1000
int ObTxDataMemtable::init(const ObITable::TableKey &table_key,
SliceAllocator *slice_allocator,
ObTxDataMemtableMgr *memtable_mgr)
ObTxDataMemtableMgr *memtable_mgr,
const int64_t buckets_cnt)
{
int ret = OB_SUCCESS;
@ -50,7 +51,7 @@ int ObTxDataMemtable::init(const ObITable::TableKey &table_key,
} else if (OB_FAIL(ObITable::init(table_key))) {
STORAGE_LOG(WARN, "ObITable::init fail", KR(ret), K(table_key), KPC(memtable_mgr));
} else if (FALSE_IT(init_arena_allocator_())) {
} else if (OB_FAIL(init_tx_data_map_())) {
} else if (OB_FAIL(init_tx_data_map_(buckets_cnt))) {
STORAGE_LOG(WARN, "init tx data map failed.", KR(ret), K(table_key), KPC(memtable_mgr));
} else if (OB_FAIL(buf_.reserve(common::OB_MAX_VARCHAR_LENGTH))) {
STORAGE_LOG(WARN, "reserve space for tx data memtable failed.", KR(ret), K(table_key), KPC(memtable_mgr));
@ -81,7 +82,7 @@ int ObTxDataMemtable::init(const ObITable::TableKey &table_key,
return ret;
}
int ObTxDataMemtable::init_tx_data_map_()
int ObTxDataMemtable::init_tx_data_map_(const int64_t buckets_cnt)
{
int ret = OB_SUCCESS;
@ -90,7 +91,13 @@ int ObTxDataMemtable::init_tx_data_map_()
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "allocate memory of tx_data_map_ failed", KR(ret));
} else {
tx_data_map_ = new (data_map_ptr) TxDataMap(1 << 20/*2097152*/);
int64_t real_buckets_cnt = buckets_cnt;
if (real_buckets_cnt < ObTxDataHashMap::MIN_BUCKETS_CNT) {
real_buckets_cnt = ObTxDataHashMap::MIN_BUCKETS_CNT;
} else if (real_buckets_cnt > ObTxDataHashMap::MAX_BUCKETS_CNT) {
real_buckets_cnt = ObTxDataHashMap::MAX_BUCKETS_CNT;
}
tx_data_map_ = new (data_map_ptr) TxDataMap(arena_allocator_, real_buckets_cnt);
if (OB_FAIL(tx_data_map_->init())) {
STORAGE_LOG(WARN, "tx_data_map_ init failed", KR(ret));
}
@ -102,8 +109,8 @@ void ObTxDataMemtable::init_arena_allocator_()
{
ObMemAttr attr;
attr.tenant_id_ = MTL_ID();
attr.label_ = "TX_DATA_TABLE";
attr.ctx_id_ = ObCtxIds::DEFAULT_CTX_ID;
attr.label_ = "MEMTABLE_ARENA";
attr.ctx_id_ = ObCtxIds::TX_DATA_TABLE;
arena_allocator_.set_attr(attr);
}
@ -1139,7 +1146,7 @@ void ObTxDataMemtable::TEST_reset_tx_data_map_()
{
int ret = OB_SUCCESS;
tx_data_map_ = nullptr;
init_tx_data_map_();
init_tx_data_map_(ObTxDataHashMap::DEFAULT_BUCKETS_CNT);
}

View File

@ -144,7 +144,8 @@ public: // ObTxDataMemtable
void reset();
int init(const ObITable::TableKey &table_key,
SliceAllocator *slice_allocator,
ObTxDataMemtableMgr *memtable_mgr);
ObTxDataMemtableMgr *memtable_mgr,
const int64_t buckets_cnt);
/**
* @brief Insert the tx data into this tx data memtable
@ -257,6 +258,7 @@ public: /* derived from ObIMemtable */
virtual int64_t get_occupied_size() const
{
int64_t res = 0;
res += (get_buckets_cnt() * sizeof(ObTxDataHashMap::ObTxDataHashHeader));
for (int i = 0; i < MAX_TX_DATA_TABLE_CONCURRENCY; i++) {
res += occupied_size_[i];
}
@ -317,6 +319,7 @@ public: // getter && setter
int64_t inc_write_ref() { return ATOMIC_AAF(&write_ref_, 1); }
int64_t dec_write_ref() { return ATOMIC_AAF(&write_ref_, -1); }
int64_t get_write_ref() const override { return ATOMIC_LOAD(&write_ref_); }
int64_t get_buckets_cnt() const { return tx_data_map_->get_buckets_cnt(); }
ObTxDataMemtable::State get_state() { return state_; }
ObTxDataLinkNode *get_sorted_list_head() { return &sort_list_head_; }
const char* get_state_string();
@ -356,6 +359,7 @@ public: // getter && setter
share::SCN get_end_scn() { return key_.scn_range_.end_scn_;}
double load_factory() { return OB_ISNULL(tx_data_map_) ? 0 : tx_data_map_->load_factory(); }
private: // ObTxDataMemtable
void atomic_update_(ObTxData *tx_data);
@ -367,7 +371,7 @@ private: // ObTxDataMemtable
int construct_list_for_sort_();
int init_tx_data_map_();
int init_tx_data_map_(const int64_t buckets_cnt);
int pre_process_commit_version_row_(ObTxData *fake_tx_data);

View File

@ -140,17 +140,18 @@ int ObTxDataMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
STORAGE_LOG(WARN, "slice_allocator_ has not been set.");
} else {
MemMgrWLockGuard lock_guard(lock_);
if (OB_FAIL(create_memtable_(clog_checkpoint_scn, schema_version))) {
if (OB_FAIL(create_memtable_(clog_checkpoint_scn, schema_version, ObTxDataHashMap::DEFAULT_BUCKETS_CNT))) {
STORAGE_LOG(WARN, "create memtable fail.", KR(ret));
} else {
// create memtable success
}
}
return ret;
}
int ObTxDataMemtableMgr::create_memtable_(const SCN clog_checkpoint_scn, int64_t schema_version)
int ObTxDataMemtableMgr::create_memtable_(const SCN clog_checkpoint_scn,
int64_t schema_version,
const int64_t buckets_cnt)
{
UNUSED(schema_version);
int ret = OB_SUCCESS;
@ -172,7 +173,7 @@ int ObTxDataMemtableMgr::create_memtable_(const SCN clog_checkpoint_scn, int64_t
} else if (OB_ISNULL(tx_data_memtable)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "dynamic cast failed", KR(ret), KPC(this));
} else if (OB_FAIL(tx_data_memtable->init(table_key, slice_allocator_, this))) {
} else if (OB_FAIL(tx_data_memtable->init(table_key, slice_allocator_, this, buckets_cnt))) {
STORAGE_LOG(WARN, "memtable init fail.", KR(ret), KPC(tx_data_memtable));
} else if (OB_FAIL(add_memtable_(handle))) {
STORAGE_LOG(WARN, "add memtable fail.", KR(ret));
@ -220,6 +221,7 @@ int ObTxDataMemtableMgr::freeze_()
int64_t pre_memtable_tail = memtable_tail_;
SCN clog_checkpoint_scn = SCN::base_scn();
int64_t schema_version = 1;
int64_t new_buckets_cnt = ObTxDataHashMap::DEFAULT_BUCKETS_CNT;
// FIXME : @gengli remove this condition after upper_trans_version is not needed
if (get_memtable_count_() >= MAX_TX_DATA_MEMTABLE_CNT) {
@ -238,8 +240,22 @@ int ObTxDataMemtableMgr::freeze_()
} else if (0 == freeze_memtable->get_tx_data_count()) {
ret = OB_STATE_NOT_MATCH;
STORAGE_LOG(WARN, "tx data memtable is empty. do not need freeze.", KR(ret), KPC(freeze_memtable));
} else if (OB_FAIL(create_memtable_(clog_checkpoint_scn, schema_version))) {
STORAGE_LOG(WARN, "create memtable fail.", KR(ret), K(clog_checkpoint_scn), K(schema_version));
} else if (OB_FAIL(calc_new_memtable_buckets_cnt_(
freeze_memtable->load_factory(), freeze_memtable->get_buckets_cnt(), new_buckets_cnt))) {
STORAGE_LOG(WARN,
"calculate new memtable buckets cnt failed",
KR(ret),
"load_factory", freeze_memtable->load_factory(),
"old_buckets_cnt", freeze_memtable->get_buckets_cnt(),
K(new_buckets_cnt));
} else if (OB_FAIL(create_memtable_(clog_checkpoint_scn, schema_version, new_buckets_cnt))) {
STORAGE_LOG(WARN,
"create memtable fail.",
KR(ret),
K(clog_checkpoint_scn),
K(schema_version),
"old_buckets_cnt", freeze_memtable->get_buckets_cnt(),
K(new_buckets_cnt));
} else {
ObTxDataMemtable *new_memtable = static_cast<ObTxDataMemtable *>(tables_[get_memtable_idx(memtable_tail_ - 1)]);
if (OB_ISNULL(new_memtable) && OB_UNLIKELY(new_memtable->is_tx_data_memtable())) {
@ -277,6 +293,41 @@ int ObTxDataMemtableMgr::freeze_()
return ret;
}
int ObTxDataMemtableMgr::calc_new_memtable_buckets_cnt_(const double load_factory,
const int64_t old_buckets_cnt,
int64_t &new_buckets_cnt)
{
// acquire the max memory which tx data memtable buckets can use
int64_t remain_memory = lib::get_tenant_memory_remain(MTL_ID());
int64_t buckets_size_limit = remain_memory >> 4; /* remain_memory * (1/16) */
int64_t expect_buckets_cnt = old_buckets_cnt;
if (load_factory > ObTxDataHashMap::LOAD_FACTORY_MAX_LIMIT &&
expect_buckets_cnt < ObTxDataHashMap::MAX_BUCKETS_CNT) {
expect_buckets_cnt <<= 1;
} else if (load_factory < ObTxDataHashMap::LOAD_FACTORY_MIN_LIMIT &&
expect_buckets_cnt > ObTxDataHashMap::MIN_BUCKETS_CNT) {
expect_buckets_cnt >>= 1;
}
int64_t expect_buckets_size = expect_buckets_cnt * sizeof(ObTxDataHashMap::ObTxDataHashHeader);
while (expect_buckets_size > buckets_size_limit && expect_buckets_cnt > ObTxDataHashMap::MIN_BUCKETS_CNT) {
expect_buckets_cnt >>= 1;
expect_buckets_size = expect_buckets_cnt * sizeof(ObTxDataHashMap::ObTxDataHashHeader);
}
new_buckets_cnt = expect_buckets_cnt;
STORAGE_LOG(INFO,
"finish calculate new tx data memtable buckets cnt",
K(ls_id_),
K(load_factory),
K(old_buckets_cnt),
K(new_buckets_cnt),
K(remain_memory));
return OB_SUCCESS;
}
int ObTxDataMemtableMgr::get_active_memtable(ObTableHandleV2 &handle) const
{
int ret = OB_SUCCESS;

View File

@ -121,9 +121,14 @@ protected:
const bool force);
private: // ObTxDataMemtableMgr
int create_memtable_(const share::SCN clog_checkpoint_scn, const int64_t schema_version);
int create_memtable_(const share::SCN clog_checkpoint_scn,
const int64_t schema_version,
const int64_t buckets_cnt);
int freeze_();
int calc_new_memtable_buckets_cnt_(const double load_factory,
const int64_t old_buckests_cnt,
int64_t &new_buckest_cnt);
int get_all_memtables_(ObTableHdlArray &handles);

View File

@ -43,21 +43,17 @@ int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table)
STATIC_ASSERT(sizeof(ObUndoAction) == UNDO_ACTION_SZIE, "Size of ObUndoAction Overflow.");
STATIC_ASSERT(sizeof(ObUndoStatusNode) <= TX_DATA_SLICE_SIZE, "Size of ObUndoStatusNode Overflow");
ObMemAttr mem_attr;
mem_attr.label_ = "TX_DATA_TABLE";
mem_attr.tenant_id_ = MTL_ID();
mem_attr.ctx_id_ = ObCtxIds::DEFAULT_CTX_ID;
ObMemtableMgrHandle memtable_mgr_handle;
if (OB_ISNULL(ls) || OB_ISNULL(tx_ctx_table)) {
ret = OB_ERR_NULL_VALUE;
STORAGE_LOG(WARN, "ls tablet service or tx ctx table is nullptr", KR(ret));
} else if (OB_FAIL(slice_allocator_.init(TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE,
common::default_blk_alloc, mem_attr))) {
} else if (OB_FAIL(init_slice_allocator_())) {
STORAGE_LOG(ERROR, "slice_allocator_ init fail");
} else if (FALSE_IT(ls_tablet_svr_ = ls->get_tablet_svr())) {
} else if (OB_FAIL(ls_tablet_svr_->get_tx_data_memtable_mgr(memtable_mgr_handle))) {
STORAGE_LOG(WARN, "get tx data memtable mgr fail.", KR(ret), K(tablet_id_));
} else if (FALSE_IT(arena_allocator_.set_attr(mem_attr))) {
} else if (OB_FAIL(init_arena_allocator_())) {
STORAGE_LOG(ERROR, "slice_allocator_ init fail");
} else if (OB_FAIL(init_tx_data_read_schema_())) {
STORAGE_LOG(WARN, "init tx data read ctx failed.", KR(ret), K(tablet_id_));
} else {
@ -75,6 +71,27 @@ int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table)
return ret;
}
int ObTxDataTable::init_slice_allocator_()
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr;
mem_attr.label_ = "TX_DATA_SLICE";
mem_attr.tenant_id_ = MTL_ID();
mem_attr.ctx_id_ = ObCtxIds::TX_DATA_TABLE;
ret = slice_allocator_.init(TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, mem_attr);
return ret;
}
int ObTxDataTable::init_arena_allocator_()
{
ObMemAttr mem_attr;
mem_attr.label_ = "TX_DATA_ARENA";
mem_attr.tenant_id_ = MTL_ID();
mem_attr.ctx_id_ = ObCtxIds::TX_DATA_TABLE;
arena_allocator_.set_attr(mem_attr);
return OB_SUCCESS;
}
int ObTxDataTable::init_tx_data_read_schema_()
{
int ret = OB_SUCCESS;

View File

@ -107,10 +107,10 @@ public:
// cache cleaning task will delete at least 11w tx data.
static const int64_t DEFAULT_CACHE_RETAINED_TIME = 100_ms; // 100ms
// The tx data memtable cannot freeze it self if its memory use is less than 1%
// The tx data memtable do not need freeze it self if its memory use is less than 1%
static constexpr double TX_DATA_FREEZE_TRIGGER_MIN_PERCENTAGE = 1;
// The tx data memtable will trigger a freeze if its memory use is more than 5%
// The tx data memtable will trigger a freeze if its memory use is more than 10%
static constexpr double TX_DATA_FREEZE_TRIGGER_MAX_PERCENTAGE = 5;
enum COLUMN_ID_LIST
@ -248,8 +248,13 @@ private:
int get_ls_min_end_scn_in_latest_tablets_(share::SCN &min_end_ts);
int init_slice_allocator_();
int init_arena_allocator_();
int init_sstable_cache_();
int register_clean_cache_task_();
int check_tx_data_in_memtable_(const transaction::ObTransID tx_id, ObITxDataCheckFunctor &fn);

View File

@ -36,7 +36,7 @@ namespace transaction {
class ObFakeTxDataTable : public ObTxDataTable {
public:
ObFakeTxDataTable() : map_(1 << 20 /*2097152*/)
ObFakeTxDataTable() : arena_allocator_(), map_(arena_allocator_, 1 << 20 /*2097152*/)
{
IGNORE_RETURN map_.init();
ObMemAttr mem_attr;
@ -109,6 +109,7 @@ public:
if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_TRANS_CTX_NOT_EXIST; }
return ret;
}
ObArenaAllocator arena_allocator_;
ObTxDataHashMap map_;
};