[FEAT MERGE] Implement Resource Throttle

This commit is contained in:
ZenoWang
2024-02-07 17:25:56 +00:00
committed by ob-robot
parent df5ef10e8f
commit 33a6f4973d
93 changed files with 3350 additions and 1664 deletions

View File

@ -44,7 +44,7 @@ storage_unittest(test_hash_performance)
storage_unittest(test_row_fuse)
#storage_unittest(test_keybtree memtable/mvcc/test_keybtree.cpp)
storage_unittest(test_query_engine memtable/mvcc/test_query_engine.cpp)
storage_unittest(test_memtable_basic memtable/test_memtable_basic.cpp)
#storage_unittest(test_memtable_basic memtable/test_memtable_basic.cpp)
storage_unittest(test_mvcc_callback memtable/mvcc/test_mvcc_callback.cpp)
# storage_unittest(test_mds_compile multi_data_source/test_mds_compile.cpp)
storage_unittest(test_mds_list multi_data_source/test_mds_list.cpp)

View File

@ -57,16 +57,16 @@ int check_sequence_set_violation(const concurrent_control::ObWriteFlag ,
}
} // concurrent_control
namespace common
namespace share
{
// override the function
int ObGMemstoreAllocator::set_memstore_threshold_without_lock(uint64_t tenant_id)
int ObMemstoreAllocator::set_memstore_threshold_without_lock()
{
int64_t memstore_threshold = INT64_MAX;
arena_.set_memstore_threshold(memstore_threshold);
return OB_SUCCESS;
}
void* ObGMemstoreAllocator::alloc(AllocHandle& handle, int64_t size)
void* ObMemstoreAllocator::alloc(AllocHandle& handle, int64_t size)
{
int64_t align_size = upper_align(size, sizeof(int64_t));
if (!handle.is_id_valid()) {
@ -78,7 +78,7 @@ void* ObGMemstoreAllocator::alloc(AllocHandle& handle, int64_t size)
}
}
if (arena_.allocator_ == nullptr) {
if (arena_.init(OB_SERVER_TENANT_ID) != OB_SUCCESS) {
if (arena_.init() != OB_SUCCESS) {
abort();
}
}

View File

@ -22,12 +22,46 @@
#include "../mock_utils/async_util.h"
#include "test_tx_dsl.h"
#include "tx_node.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
namespace oceanbase
{
using namespace ::testing;
using namespace transaction;
using namespace share;
static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR;
namespace share {
int ObTenantTxDataAllocator::init(const char *label)
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr;
throttle_tool_ = &(MTL_MEM_ALLOC_MGR.share_resource_throttle_tool());
if (OB_FAIL(slice_allocator_.init(
storage::TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE, block_alloc_, mem_attr))) {
SHARE_LOG(WARN, "init slice allocator failed", KR(ret));
} else {
slice_allocator_.set_nway(ObTenantTxDataAllocator::ALLOC_TX_DATA_MAX_CONCURRENCY);
is_inited_ = true;
}
return ret;
}
int ObMemstoreAllocator::init()
{
throttle_tool_ = &MTL_MEM_ALLOC_MGR.share_resource_throttle_tool();
return arena_.init();
}
int ObMemstoreAllocator::AllocHandle::init()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = 1;
ObSharedMemAllocMgr *mtl_alloc_mgr = &MTL_MEM_ALLOC_MGR;
ObMemstoreAllocator &host = mtl_alloc_mgr->memstore_allocator();
(void)host.init_handle(*this);
return ret;
}
}; // namespace share
namespace concurrent_control
{
int check_sequence_set_violation(const concurrent_control::ObWriteFlag,
@ -102,6 +136,7 @@ public:
const testing::TestInfo *const test_info =
testing::UnitTest::GetInstance()->current_test_info();
auto test_name = test_info->name();
MTL_MEM_ALLOC_MGR.init();
_TRANS_LOG(INFO, ">>>> starting test : %s", test_name);
}
virtual void TearDown() override

View File

@ -21,12 +21,47 @@
#include "tx_node.h"
#include "../mock_utils/async_util.h"
#include "test_tx_dsl.h"
namespace oceanbase
{
using namespace ::testing;
using namespace transaction;
using namespace share;
static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR;
namespace share {
int ObTenantTxDataAllocator::init(const char *label)
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr;
throttle_tool_ = &(MTL_MEM_ALLOC_MGR.share_resource_throttle_tool());
if (OB_FAIL(slice_allocator_.init(
storage::TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE, block_alloc_, mem_attr))) {
SHARE_LOG(WARN, "init slice allocator failed", KR(ret));
} else {
slice_allocator_.set_nway(ObTenantTxDataAllocator::ALLOC_TX_DATA_MAX_CONCURRENCY);
is_inited_ = true;
}
return ret;
}
int ObMemstoreAllocator::init()
{
throttle_tool_ = &MTL_MEM_ALLOC_MGR.share_resource_throttle_tool();
return arena_.init();
}
int ObMemstoreAllocator::AllocHandle::init()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = 1;
ObSharedMemAllocMgr *mtl_alloc_mgr = &MTL_MEM_ALLOC_MGR;
ObMemstoreAllocator &host = mtl_alloc_mgr->memstore_allocator();
(void)host.init_handle(*this);
return ret;
}
}; // namespace share
namespace concurrent_control
{
int check_sequence_set_violation(const concurrent_control::ObWriteFlag ,
@ -55,6 +90,7 @@ public:
ObClockGenerator::init();
const testing::TestInfo* const test_info =
testing::UnitTest::GetInstance()->current_test_info();
MTL_MEM_ALLOC_MGR.init();
auto test_name = test_info->name();
_TRANS_LOG(INFO, ">>>> starting test : %s", test_name);
}

View File

@ -21,12 +21,46 @@
#include "../mock_utils/async_util.h"
#include "test_tx_dsl.h"
#include "tx_node.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
namespace oceanbase
{
using namespace ::testing;
using namespace transaction;
using namespace share;
static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR;
namespace share {
int ObTenantTxDataAllocator::init(const char *label)
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr;
throttle_tool_ = &(MTL_MEM_ALLOC_MGR.share_resource_throttle_tool());
if (OB_FAIL(slice_allocator_.init(
storage::TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE, block_alloc_, mem_attr))) {
SHARE_LOG(WARN, "init slice allocator failed", KR(ret));
} else {
slice_allocator_.set_nway(ObTenantTxDataAllocator::ALLOC_TX_DATA_MAX_CONCURRENCY);
is_inited_ = true;
}
return ret;
}
int ObMemstoreAllocator::init()
{
throttle_tool_ = &MTL_MEM_ALLOC_MGR.share_resource_throttle_tool();
return arena_.init();
}
int ObMemstoreAllocator::AllocHandle::init()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = 1;
ObSharedMemAllocMgr *mtl_alloc_mgr = &MTL_MEM_ALLOC_MGR;
ObMemstoreAllocator &host = mtl_alloc_mgr->memstore_allocator();
(void)host.init_handle(*this);
return ret;
}
}; // namespace share
namespace concurrent_control
{
int check_sequence_set_violation(const concurrent_control::ObWriteFlag,
@ -55,6 +89,7 @@ public:
ObClockGenerator::init();
const testing::TestInfo *const test_info =
testing::UnitTest::GetInstance()->current_test_info();
MTL_MEM_ALLOC_MGR.init();
auto test_name = test_info->name();
_TRANS_LOG(INFO, ">>>> starting test : %s", test_name);
}

View File

@ -22,11 +22,46 @@
#include "tx_node.h"
#include "../mock_utils/async_util.h"
#include "test_tx_dsl.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
namespace oceanbase
{
using namespace ::testing;
using namespace transaction;
using namespace share;
static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR;
namespace share {
int ObTenantTxDataAllocator::init(const char *label)
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr;
throttle_tool_ = &(MTL_MEM_ALLOC_MGR.share_resource_throttle_tool());
if (OB_FAIL(slice_allocator_.init(
storage::TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE, block_alloc_, mem_attr))) {
SHARE_LOG(WARN, "init slice allocator failed", KR(ret));
} else {
slice_allocator_.set_nway(ObTenantTxDataAllocator::ALLOC_TX_DATA_MAX_CONCURRENCY);
is_inited_ = true;
}
return ret;
}
int ObMemstoreAllocator::init()
{
throttle_tool_ = &MTL_MEM_ALLOC_MGR.share_resource_throttle_tool();
return arena_.init();
}
int ObMemstoreAllocator::AllocHandle::init()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = 1;
ObSharedMemAllocMgr *mtl_alloc_mgr = &MTL_MEM_ALLOC_MGR;
ObMemstoreAllocator &host = mtl_alloc_mgr->memstore_allocator();
(void)host.init_handle(*this);
return ret;
}
}; // namespace share
namespace omt {
bool the_ctrl_of_enable_transaction_free_route = true;
ObTenantConfig *ObTenantConfigMgr::get_tenant_config_with_lock(const uint64_t tenant_id,
@ -621,6 +656,7 @@ public:
common::ObClusterVersion::get_instance().update_cluster_version(CLUSTER_VERSION_4_1_0_0);
const testing::TestInfo* const test_info =
testing::UnitTest::GetInstance()->current_test_info();
MTL_MEM_ALLOC_MGR.init();
auto test_name = test_info->name();
_TRANS_LOG(INFO, ">>>> starting test : %s", test_name);
}

View File

@ -22,9 +22,9 @@
} while(0);
namespace oceanbase {
namespace common
namespace share
{
void* ObGMemstoreAllocator::alloc(AllocHandle& handle, int64_t size)
void* ObMemstoreAllocator::alloc(AllocHandle& handle, int64_t size, const int64_t expire_ts)
{
int ret = OB_SUCCESS;
int64_t align_size = upper_align(size, sizeof(int64_t));

View File

@ -33,12 +33,14 @@
#include "../mock_utils/msg_bus.h"
#include "../mock_utils/basic_fake_define.h"
#include "../mock_utils/ob_fake_tx_rpc.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
namespace oceanbase {
using namespace transaction;
using namespace share;
using namespace common;
namespace transaction {
template<class T>
class QueueConsumer : public share::ObThreadPool

View File

@ -35,6 +35,10 @@ using namespace memtable;
namespace transaction {
class ObFakeTxDataTable : public ObTxDataTable {
public:
ObSliceAlloc slice_allocator_;
ObTenantTxDataAllocator *FAKE_ALLOCATOR = (ObTenantTxDataAllocator *)0x1;
public:
ObFakeTxDataTable() : arena_allocator_(), map_(arena_allocator_, 1 << 20 /*2097152*/)
{
@ -46,7 +50,7 @@ public:
ObMemtableMgrHandle memtable_mgr_handle;
OB_ASSERT(OB_SUCCESS == slice_allocator_.init(
sizeof(ObTxData), OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, mem_attr));
slice_allocator_.set_nway(ObTxDataTable::TX_DATA_MAX_CONCURRENCY);
slice_allocator_.set_nway(32);
is_inited_ = true;
}
virtual int init(ObLS *ls, ObTxCtxTable *tx_ctx_table) override
@ -57,12 +61,15 @@ public:
virtual void stop() override {}
virtual void reset() override {}
virtual void destroy() override {}
virtual int alloc_tx_data(ObTxDataGuard &tx_data_guard) override
virtual int alloc_tx_data(ObTxDataGuard &tx_data_guard,
const bool enable_throttle,
const int64_t abs_expire_time)
{
void *ptr = slice_allocator_.alloc();
ObMemAttr attr;
void *ptr = ob_malloc(TX_DATA_SLICE_SIZE, attr);
ObTxData *tx_data = new (ptr) ObTxData();
tx_data->ref_cnt_ = 100;
tx_data->slice_allocator_ = &slice_allocator_;
tx_data->tx_data_allocator_ = FAKE_ALLOCATOR;
tx_data->flag_ = 269381;
tx_data_guard.init(tx_data);
return OB_ISNULL(tx_data) ? OB_ALLOCATE_MEMORY_FAILED : OB_SUCCESS;
@ -74,7 +81,7 @@ public:
ObTxData *to = new (ptr) ObTxData();
ObTxData *from = (ObTxData*)from_guard.tx_data();
to->ref_cnt_ = 100;
to->slice_allocator_ = &slice_allocator_;
to->tx_data_allocator_ = FAKE_ALLOCATOR;
to->flag_ = 269381;
to_guard.init(to);
OX (*to = *from);

View File

@ -309,9 +309,9 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
part1.exec_info_.prepare_version_.convert_for_tx(90);
part2.set_downstream_state(ObTxState::COMMIT);
ObTxData part2_tx_data;
ObSliceAlloc slice_allocator;
ObTenantTxDataAllocator tx_data_allocator;
part2_tx_data.ref_cnt_ = 1000;
part2_tx_data.slice_allocator_ = &slice_allocator;
part2_tx_data.tx_data_allocator_ = &tx_data_allocator;
part2.ctx_tx_data_.tx_data_guard_.init(&part2_tx_data);
part2.ctx_tx_data_.tx_data_guard_.tx_data()->commit_version_.convert_for_tx(90);
part3.set_downstream_state(ObTxState::UNKNOWN);

View File

@ -10,7 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#include "storage/tx/ob_trans_hashmap.h"
#include "share/ob_light_hashmap.h"
#include <gtest/gtest.h>
#include "share/ob_errno.h"
#include "lib/oblog/ob_log.h"
@ -40,7 +40,7 @@ public :
const char *TestObTrans::LOCAL_IP = "127.0.0.1";
const int64_t TIME_OUT = 1;
class ObTransTestValue : public ObTransHashLink<ObTransTestValue>
class ObTransTestValue : public share::ObLightHashLink<ObTransTestValue>
{
public:
ObTransTestValue() {}
@ -84,7 +84,7 @@ public:
}
};
typedef ObTransHashMap<ObTransID, ObTransTestValue, ObTransTestValueAlloc, common::SpinRWLock> TestHashMap;
typedef share::ObLightHashMap<ObTransID, ObTransTestValue, ObTransTestValueAlloc, common::SpinRWLock> TestHashMap;
class ForeachFunctor
{

View File

@ -36,6 +36,29 @@ using namespace storage;
using namespace blocksstable;
using namespace share;
namespace share
{
int ObTenantTxDataAllocator::init(const char* label)
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr;
if (OB_FAIL(slice_allocator_.init(
storage::TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, mem_attr))) {
SHARE_LOG(WARN, "init slice allocator failed", KR(ret));
} else {
slice_allocator_.set_nway(ALLOC_TX_DATA_MAX_CONCURRENCY);
is_inited_ = true;
}
return ret;
}
void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t abs_expire_time)
{
void *res = slice_allocator_.alloc();
return res;
}
};
int storage::ObTenantMetaMemMgr::fetch_tenant_config()
{
return OB_SUCCESS;
@ -166,6 +189,7 @@ public:
ObTenantMetaMemMgr t3m_;
ObIMemtableMgr *mt_mgr_;
ObTxCtxMemtableMgr *ctx_mt_mgr_;
ObTenantTxDataAllocator tx_data_allocator_;
ObTenantBase tenant_base_;
};
@ -275,7 +299,8 @@ TEST_F(TestTxCtxTable, test_tx_ctx_memtable_mgr)
ObTxDataTable tx_data_table;
ObMemAttr attr;
attr.tenant_id_ = MTL_ID();
tx_data_table.slice_allocator_.init(sizeof(ObTxData), OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, attr);
tx_data_allocator_.init("test");
tx_data_table.tx_data_allocator_ = &tx_data_allocator_;
ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01),
(transaction::ObDupTableLSHandler *)(0x02));