From 2fd0498ce37a66d99334d2e47e6addcc7d3f2000 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 5 May 2023 03:08:48 +0000 Subject: [PATCH] use tenant memmory to alloc ObTxLSLogCb --- src/storage/tx/ob_trans_ctx_mgr_v4.cpp | 2 +- src/storage/tx/ob_tx_ls_log_writer.cpp | 6 ++- src/storage/tx/ob_tx_ls_log_writer.h | 7 +++- unittest/storage/tx/test_ls_log_writer.cpp | 49 ++++++++++++++++------ 4 files changed, 47 insertions(+), 17 deletions(-) diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index cab5ae9a0c..690e7f533e 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -177,7 +177,7 @@ int ObLSTxCtxMgr::init(const int64_t tenant_id, TRANS_LOG(WARN, "tx log adapter init error", KR(ret)); } else if (OB_NOT_NULL(log_adapter) && OB_FALSE_IT(tx_log_adapter_ = log_adapter)) { ret = OB_ERR_UNEXPECTED; - } else if (OB_FAIL(ls_log_writer_.init(ls_id, tx_log_adapter_, this))) { + } else if (OB_FAIL(ls_log_writer_.init(tenant_id, ls_id, tx_log_adapter_, this))) { TRANS_LOG(WARN, "ls_log_writer init fail", KR(ret)); } else { is_inited_ = true; diff --git a/src/storage/tx/ob_tx_ls_log_writer.cpp b/src/storage/tx/ob_tx_ls_log_writer.cpp index 436e9e4f36..3174de627e 100644 --- a/src/storage/tx/ob_tx_ls_log_writer.cpp +++ b/src/storage/tx/ob_tx_ls_log_writer.cpp @@ -148,7 +148,7 @@ int ObTxLSLogCb::alloc_log_buf_() { int ret = OB_SUCCESS; - ObMemAttr attr(OB_SERVER_TENANT_ID, "TxLSLogBuf"); + ObMemAttr attr(base_wr_->get_tenant_id(), "TxLSLogBuf"); SET_USE_500(attr); if (0 == ObTxLSLogLimit::LOG_BUF_SIZE || nullptr != log_buf_) { ret = OB_INVALID_ARGUMENT; @@ -167,7 +167,8 @@ ObTxLSLogWriter::ObTxLSLogWriter() : cbs_lock_(common::ObLatchIds::TX_LS_LOG_WRI ObTxLSLogWriter::~ObTxLSLogWriter() { reset(); } -int ObTxLSLogWriter::init(const ObLSID &ls_id, +int ObTxLSLogWriter::init(const int64_t tenant_id, + const ObLSID &ls_id, ObITxLogAdapter * adapter, ObLSTxCtxMgr *ctx_mgr) { @@ -180,6 +181,7 @@ int ObTxLSLogWriter::init(const ObLSID &ls_id, TRANS_LOG(WARN, "[TxLsLogWriter] invalid arguments", K(ls_id), KP(adapter), KP(ctx_mgr)); } else { ls_id_ = ls_id; + tenant_id_ = tenant_id; ctx_mgr_ = ctx_mgr; tx_log_adapter_ = adapter; ObTxLSLogLimit::decide_log_buf_size(); diff --git a/src/storage/tx/ob_tx_ls_log_writer.h b/src/storage/tx/ob_tx_ls_log_writer.h index 1e57452b00..a877920e4b 100644 --- a/src/storage/tx/ob_tx_ls_log_writer.h +++ b/src/storage/tx/ob_tx_ls_log_writer.h @@ -90,7 +90,7 @@ public: private: bool need_alloc_buf_() { return nullptr == log_buf_; } - int alloc_log_buf_(); + OB_NOINLINE int alloc_log_buf_(); private: ObTxLogType type_; // Unkown == unused, not init @@ -150,7 +150,8 @@ public: public: ObTxLSLogWriter(); ~ObTxLSLogWriter(); - int init(const share::ObLSID &ls_id, + int init(const int64_t tenant_id, + const share::ObLSID &ls_id, ObITxLogAdapter *adapter, ObLSTxCtxMgr *ctx_mgr); int stop(); @@ -162,6 +163,7 @@ public: public: int submit_start_working_log(const int64_t &leader_epoch, share::SCN &log_ts); + int64_t get_tenant_id() const { return tenant_id_; } public: int on_success(ObTxLSLogCb *cb); int on_failure(ObTxLSLogCb *cb); @@ -199,6 +201,7 @@ private: common::ObDList start_working_cbs_; share::ObLSID ls_id_; + int64_t tenant_id_; ObLSTxCtxMgr *ctx_mgr_; ObITxLogAdapter *tx_log_adapter_; }; diff --git a/unittest/storage/tx/test_ls_log_writer.cpp b/unittest/storage/tx/test_ls_log_writer.cpp index a0051731ed..cf8fea149c 100644 --- a/unittest/storage/tx/test_ls_log_writer.cpp +++ b/unittest/storage/tx/test_ls_log_writer.cpp @@ -11,23 +11,47 @@ */ #include "ob_mock_tx_log_adapter.h" -#include "storage/tx/ob_tx_ls_log_writer.h" #include "storage/tx/ob_trans_ctx_mgr.h" +#include "storage/tx/ob_tx_ls_log_writer.h" #include -namespace oceanbase { +namespace oceanbase +{ using namespace transaction; using namespace storage; using namespace share; +namespace transaction +{ +int ObTxLSLogCb::alloc_log_buf_() +{ + int ret = OB_SUCCESS; -namespace unittest { + ObMemAttr attr(OB_SERVER_TENANT_ID, "TxLSLogBuf"); + SET_USE_500(attr); + if (0 == ObTxLSLogLimit::LOG_BUF_SIZE || nullptr != log_buf_) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "[TxLsLogWriter] invalid arguments", KR(ret), K(ObTxLSLogLimit::LOG_BUF_SIZE), + KP(log_buf_)); + } else if (nullptr == (log_buf_ = (char *)ob_malloc(ObTxLSLogLimit::LOG_BUF_SIZE, attr))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + TRANS_LOG(WARN, "[TxLsLogWriter] allocate memory failed", KR(ret), + K(ObTxLSLogLimit::LOG_BUF_SIZE)); + } + + return ret; +} +} // namespace transaction + +namespace unittest +{ MockTxLogAdapter tx_log_adapter; MockTxLogParam param; ObTxLSLogWriter ls_log_writer; -class TestLSLogWriter : public ::testing::Test { +class TestLSLogWriter : public ::testing::Test +{ public: virtual void SetUp() { @@ -51,7 +75,7 @@ TEST_F(TestLSLogWriter, submit_start_working_log) int64_t tmp_tenant_id = 1004; ObLSTxCtxMgr tmp_mgr; common::ObConcurrentFIFOAllocator tmp_allocator; - + ObTxLogBlock replay_block; int64_t replay_hint = 0; share::SCN log_ts; @@ -59,16 +83,17 @@ TEST_F(TestLSLogWriter, submit_start_working_log) ObTxLogHeader log_header; ObTxStartWorkingLogTempRef tmp_ref; ObTxStartWorkingLog sw_log(tmp_ref); - int64_t test_leader_epoch = 1308; + int64_t test_leader_epoch = 1308; ObTxLogBlockHeader block_header; - - ASSERT_EQ(OB_SUCCESS, ls_log_writer.init(TEST_LS_ID, &tx_log_adapter, (ObLSTxCtxMgr *)&tmp_mgr)); - ASSERT_EQ(OB_SUCCESS, ls_log_writer.submit_start_working_log(test_leader_epoch,log_ts)); - + + ASSERT_EQ(OB_SUCCESS, ls_log_writer.init(tmp_tenant_id, TEST_LS_ID, &tx_log_adapter, + (ObLSTxCtxMgr *)&tmp_mgr)); + ASSERT_EQ(OB_SUCCESS, ls_log_writer.submit_start_working_log(test_leader_epoch, log_ts)); + ASSERT_EQ(true, tx_log_adapter.get_log(log_ts.get_val_for_gts(), log_string)); - ASSERT_EQ(OB_SUCCESS, - replay_block.init_with_header(log_string.c_str(), log_string.size(), replay_hint, block_header)); + ASSERT_EQ(OB_SUCCESS, replay_block.init_with_header(log_string.c_str(), log_string.size(), + replay_hint, block_header)); ASSERT_EQ(OB_SUCCESS, replay_block.get_next_log(log_header)); EXPECT_EQ(ObTxLogType::TX_START_WORKING_LOG, log_header.get_tx_log_type()); ASSERT_EQ(OB_SUCCESS, replay_block.deserialize_log_body(sw_log));