use tenant memmory to alloc ObTxLSLogCb

This commit is contained in:
obdev
2023-05-05 03:08:48 +00:00
committed by ob-robot
parent fe350b62e6
commit 2fd0498ce3
4 changed files with 47 additions and 17 deletions

View File

@ -177,7 +177,7 @@ int ObLSTxCtxMgr::init(const int64_t tenant_id,
TRANS_LOG(WARN, "tx log adapter init error", KR(ret)); TRANS_LOG(WARN, "tx log adapter init error", KR(ret));
} else if (OB_NOT_NULL(log_adapter) && OB_FALSE_IT(tx_log_adapter_ = log_adapter)) { } else if (OB_NOT_NULL(log_adapter) && OB_FALSE_IT(tx_log_adapter_ = log_adapter)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(ls_log_writer_.init(ls_id, tx_log_adapter_, this))) { } else if (OB_FAIL(ls_log_writer_.init(tenant_id, ls_id, tx_log_adapter_, this))) {
TRANS_LOG(WARN, "ls_log_writer init fail", KR(ret)); TRANS_LOG(WARN, "ls_log_writer init fail", KR(ret));
} else { } else {
is_inited_ = true; is_inited_ = true;

View File

@ -148,7 +148,7 @@ int ObTxLSLogCb::alloc_log_buf_()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObMemAttr attr(OB_SERVER_TENANT_ID, "TxLSLogBuf"); ObMemAttr attr(base_wr_->get_tenant_id(), "TxLSLogBuf");
SET_USE_500(attr); SET_USE_500(attr);
if (0 == ObTxLSLogLimit::LOG_BUF_SIZE || nullptr != log_buf_) { if (0 == ObTxLSLogLimit::LOG_BUF_SIZE || nullptr != log_buf_) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
@ -167,7 +167,8 @@ ObTxLSLogWriter::ObTxLSLogWriter() : cbs_lock_(common::ObLatchIds::TX_LS_LOG_WRI
ObTxLSLogWriter::~ObTxLSLogWriter() { reset(); } ObTxLSLogWriter::~ObTxLSLogWriter() { reset(); }
int ObTxLSLogWriter::init(const ObLSID &ls_id, int ObTxLSLogWriter::init(const int64_t tenant_id,
const ObLSID &ls_id,
ObITxLogAdapter * adapter, ObITxLogAdapter * adapter,
ObLSTxCtxMgr *ctx_mgr) ObLSTxCtxMgr *ctx_mgr)
{ {
@ -180,6 +181,7 @@ int ObTxLSLogWriter::init(const ObLSID &ls_id,
TRANS_LOG(WARN, "[TxLsLogWriter] invalid arguments", K(ls_id), KP(adapter), KP(ctx_mgr)); TRANS_LOG(WARN, "[TxLsLogWriter] invalid arguments", K(ls_id), KP(adapter), KP(ctx_mgr));
} else { } else {
ls_id_ = ls_id; ls_id_ = ls_id;
tenant_id_ = tenant_id;
ctx_mgr_ = ctx_mgr; ctx_mgr_ = ctx_mgr;
tx_log_adapter_ = adapter; tx_log_adapter_ = adapter;
ObTxLSLogLimit::decide_log_buf_size(); ObTxLSLogLimit::decide_log_buf_size();

View File

@ -90,7 +90,7 @@ public:
private: private:
bool need_alloc_buf_() { return nullptr == log_buf_; } bool need_alloc_buf_() { return nullptr == log_buf_; }
int alloc_log_buf_(); OB_NOINLINE int alloc_log_buf_();
private: private:
ObTxLogType type_; // Unkown == unused, not init ObTxLogType type_; // Unkown == unused, not init
@ -150,7 +150,8 @@ public:
public: public:
ObTxLSLogWriter(); ObTxLSLogWriter();
~ObTxLSLogWriter(); ~ObTxLSLogWriter();
int init(const share::ObLSID &ls_id, int init(const int64_t tenant_id,
const share::ObLSID &ls_id,
ObITxLogAdapter *adapter, ObITxLogAdapter *adapter,
ObLSTxCtxMgr *ctx_mgr); ObLSTxCtxMgr *ctx_mgr);
int stop(); int stop();
@ -162,6 +163,7 @@ public:
public: public:
int submit_start_working_log(const int64_t &leader_epoch, share::SCN &log_ts); int submit_start_working_log(const int64_t &leader_epoch, share::SCN &log_ts);
int64_t get_tenant_id() const { return tenant_id_; }
public: public:
int on_success(ObTxLSLogCb *cb); int on_success(ObTxLSLogCb *cb);
int on_failure(ObTxLSLogCb *cb); int on_failure(ObTxLSLogCb *cb);
@ -199,6 +201,7 @@ private:
common::ObDList<ObTxLSLogCb> start_working_cbs_; common::ObDList<ObTxLSLogCb> start_working_cbs_;
share::ObLSID ls_id_; share::ObLSID ls_id_;
int64_t tenant_id_;
ObLSTxCtxMgr *ctx_mgr_; ObLSTxCtxMgr *ctx_mgr_;
ObITxLogAdapter *tx_log_adapter_; ObITxLogAdapter *tx_log_adapter_;
}; };

View File

@ -11,23 +11,47 @@
*/ */
#include "ob_mock_tx_log_adapter.h" #include "ob_mock_tx_log_adapter.h"
#include "storage/tx/ob_tx_ls_log_writer.h"
#include "storage/tx/ob_trans_ctx_mgr.h" #include "storage/tx/ob_trans_ctx_mgr.h"
#include "storage/tx/ob_tx_ls_log_writer.h"
#include <gtest/gtest.h> #include <gtest/gtest.h>
namespace oceanbase { namespace oceanbase
{
using namespace transaction; using namespace transaction;
using namespace storage; using namespace storage;
using namespace share; using namespace share;
namespace transaction
{
int ObTxLSLogCb::alloc_log_buf_()
{
int ret = OB_SUCCESS;
namespace unittest { ObMemAttr attr(OB_SERVER_TENANT_ID, "TxLSLogBuf");
SET_USE_500(attr);
if (0 == ObTxLSLogLimit::LOG_BUF_SIZE || nullptr != log_buf_) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "[TxLsLogWriter] invalid arguments", KR(ret), K(ObTxLSLogLimit::LOG_BUF_SIZE),
KP(log_buf_));
} else if (nullptr == (log_buf_ = (char *)ob_malloc(ObTxLSLogLimit::LOG_BUF_SIZE, attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
TRANS_LOG(WARN, "[TxLsLogWriter] allocate memory failed", KR(ret),
K(ObTxLSLogLimit::LOG_BUF_SIZE));
}
return ret;
}
} // namespace transaction
namespace unittest
{
MockTxLogAdapter tx_log_adapter; MockTxLogAdapter tx_log_adapter;
MockTxLogParam param; MockTxLogParam param;
ObTxLSLogWriter ls_log_writer; ObTxLSLogWriter ls_log_writer;
class TestLSLogWriter : public ::testing::Test { class TestLSLogWriter : public ::testing::Test
{
public: public:
virtual void SetUp() virtual void SetUp()
{ {
@ -51,7 +75,7 @@ TEST_F(TestLSLogWriter, submit_start_working_log)
int64_t tmp_tenant_id = 1004; int64_t tmp_tenant_id = 1004;
ObLSTxCtxMgr tmp_mgr; ObLSTxCtxMgr tmp_mgr;
common::ObConcurrentFIFOAllocator tmp_allocator; common::ObConcurrentFIFOAllocator tmp_allocator;
ObTxLogBlock replay_block; ObTxLogBlock replay_block;
int64_t replay_hint = 0; int64_t replay_hint = 0;
share::SCN log_ts; share::SCN log_ts;
@ -59,16 +83,17 @@ TEST_F(TestLSLogWriter, submit_start_working_log)
ObTxLogHeader log_header; ObTxLogHeader log_header;
ObTxStartWorkingLogTempRef tmp_ref; ObTxStartWorkingLogTempRef tmp_ref;
ObTxStartWorkingLog sw_log(tmp_ref); ObTxStartWorkingLog sw_log(tmp_ref);
int64_t test_leader_epoch = 1308; int64_t test_leader_epoch = 1308;
ObTxLogBlockHeader block_header; ObTxLogBlockHeader block_header;
ASSERT_EQ(OB_SUCCESS, ls_log_writer.init(TEST_LS_ID, &tx_log_adapter, (ObLSTxCtxMgr *)&tmp_mgr)); ASSERT_EQ(OB_SUCCESS, ls_log_writer.init(tmp_tenant_id, TEST_LS_ID, &tx_log_adapter,
ASSERT_EQ(OB_SUCCESS, ls_log_writer.submit_start_working_log(test_leader_epoch,log_ts)); (ObLSTxCtxMgr *)&tmp_mgr));
ASSERT_EQ(OB_SUCCESS, ls_log_writer.submit_start_working_log(test_leader_epoch, log_ts));
ASSERT_EQ(true, tx_log_adapter.get_log(log_ts.get_val_for_gts(), log_string)); ASSERT_EQ(true, tx_log_adapter.get_log(log_ts.get_val_for_gts(), log_string));
ASSERT_EQ(OB_SUCCESS, ASSERT_EQ(OB_SUCCESS, replay_block.init_with_header(log_string.c_str(), log_string.size(),
replay_block.init_with_header(log_string.c_str(), log_string.size(), replay_hint, block_header)); replay_hint, block_header));
ASSERT_EQ(OB_SUCCESS, replay_block.get_next_log(log_header)); ASSERT_EQ(OB_SUCCESS, replay_block.get_next_log(log_header));
EXPECT_EQ(ObTxLogType::TX_START_WORKING_LOG, log_header.get_tx_log_type()); EXPECT_EQ(ObTxLogType::TX_START_WORKING_LOG, log_header.get_tx_log_type());
ASSERT_EQ(OB_SUCCESS, replay_block.deserialize_log_body(sw_log)); ASSERT_EQ(OB_SUCCESS, replay_block.deserialize_log_body(sw_log));