fix allocator constructor

This commit is contained in:
obdev 2024-02-09 05:47:27 +00:00 committed by ob-robot
parent a9b7a093d9
commit 997ddd5961
14 changed files with 51 additions and 31 deletions

View File

@ -113,10 +113,11 @@ int TestMetaSnapshot::check_integrity(
blocksstable::MacroBlockId tablet_meta_entry;
ObLinkedMacroBlockItemReader item_reader;
bool found = false;
ObMemAttr mem_attr(OB_SERVER_TENANT_ID, "test");
if (OB_FAIL(ObTenantMetaSnapshotHandler::get_ls_snapshot(snapshot_id, ls_id, tablet_meta_entry))) {
LOG_WARN("fail to get ls snapshot", K(ret), K(snapshot_id), K(ls_id));
} else if (OB_FAIL(item_reader.init(tablet_meta_entry))) {
} else if (OB_FAIL(item_reader.init(tablet_meta_entry, mem_attr))) {
LOG_WARN("fail to init item reader", K(ret), K(tablet_meta_entry));
} else {
ObUpdateTabletLog slog;

View File

@ -956,7 +956,8 @@ int ObSSTableMacroInfo::deserialize_(
char *reader_buf = nullptr;
int64_t pos = 0;
int64_t reader_len = 0;
if (OB_FAIL(block_reader.init(entry_id_))) {
ObMemAttr mem_attr(MTL_ID(), "SSTableBlockId");
if (OB_FAIL(block_reader.init(entry_id_, mem_attr))) {
LOG_WARN("fail to initialize reader", K(ret), K(entry_id_));
} else if (OB_FAIL(block_reader.get_next_item(reader_buf, reader_len, addr))) {// read data ids
LOG_WARN("fail to get next item", K(ret), K(reader_len), K(addr));
@ -1002,10 +1003,11 @@ int ObSSTableMacroInfo::read_block_ids(
{
int ret = OB_SUCCESS;
ObLinkedMacroBlockItemReader block_reader;
ObMemAttr mem_attr(MTL_ID(), "SSTableBlockId");
if (OB_UNLIKELY(!entry_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(entry_id));
} else if (OB_FAIL(block_reader.init(entry_id))) {
} else if (OB_FAIL(block_reader.init(entry_id, mem_attr))) {
LOG_WARN("fail to initialize reader", K(ret), K(entry_id));
} else if (OB_FAIL(read_block_ids(allocator, block_reader, data_blk_ids, data_blk_cnt,
other_blk_ids, other_blk_cnt))) {
@ -1121,11 +1123,12 @@ int ObSSTableMacroInfo::write_block_ids(
int ret = OB_SUCCESS;
const int64_t data_blk_cnt = data_ids.count();
const int64_t other_blk_cnt = other_ids.count();
ObMemAttr mem_attr(MTL_ID(), "SSTableBlockId");
if (OB_UNLIKELY(0 == data_blk_cnt && 0 == other_blk_cnt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("data_blk_cnt and other_blk_cnt shouldn't be both 0", K(ret), K(data_blk_cnt),
K(other_blk_cnt));
} else if (OB_FAIL(writer.init(false /*whether need addr*/))) {
} else if (OB_FAIL(writer.init(false /*whether need addr*/, mem_attr))) {
LOG_WARN("fail to initialize item writer", K(ret));
} else if (OB_FAIL(flush_ids(data_ids, writer))) {
LOG_WARN("fail to flush data block ids", K(ret), K(data_blk_cnt));

View File

@ -28,18 +28,19 @@ using namespace oceanbase::blocksstable;
ObLinkedMacroBlockReader::ObLinkedMacroBlockReader()
: is_inited_(false), handle_pos_(0), macros_handle_(), prefetch_macro_block_idx_(0),
read_macro_block_cnt_(0), allocator_("LMBR_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), io_buf_{nullptr, nullptr}
read_macro_block_cnt_(0), allocator_(), io_buf_{nullptr, nullptr}
{
handles_[0].reset();
handles_[1].reset();
}
int ObLinkedMacroBlockReader::init(const MacroBlockId &entry_block)
int ObLinkedMacroBlockReader::init(const MacroBlockId &entry_block, const ObMemAttr &mem_attr)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObLinkedMacroBlockReader has been inited twice", K(ret));
} else if (FALSE_IT(allocator_.set_attr(mem_attr))) {
} else if (OB_FAIL(get_meta_blocks(entry_block))) {
LOG_WARN("fail to get meta blocks", K(ret));
} else if (OB_ISNULL(io_buf_[0] =
@ -76,7 +77,7 @@ int ObLinkedMacroBlockReader::get_meta_blocks(const MacroBlockId &entry_block)
int64_t handle_pos = 0;
MacroBlockId previous_block_id;
handles_[handle_pos].reset();
common::ObArenaAllocator allocator("META_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
common::ObArenaAllocator allocator;
if (OB_ISNULL(read_info.buf_ = reinterpret_cast<char*>(allocator.alloc(read_info.size_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to alloc macro read info buffer", K(ret), K(read_info.size_));
@ -274,11 +275,11 @@ int ObLinkedMacroBlockReader::get_previous_block_id(
ObLinkedMacroBlockItemReader::ObLinkedMacroBlockItemReader()
: is_inited_(false), common_header_(nullptr), linked_header_(nullptr), block_reader_(),
buf_(nullptr), buf_pos_(0), buf_len_(0), allocator_(ObModIds::OB_CHECKPOINT)
buf_(nullptr), buf_pos_(0), buf_len_(0), allocator_()
{
}
int ObLinkedMacroBlockItemReader::init(const MacroBlockId &entry_block)
int ObLinkedMacroBlockItemReader::init(const MacroBlockId &entry_block, const ObMemAttr &mem_attr)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
@ -287,9 +288,10 @@ int ObLinkedMacroBlockItemReader::init(const MacroBlockId &entry_block)
} else if (OB_UNLIKELY(!entry_block.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(entry_block));
} else if (OB_FAIL(block_reader_.init(entry_block))) {
} else if (OB_FAIL(block_reader_.init(entry_block, mem_attr))) {
LOG_WARN("fail to init ObLinkedMacroBlockReader", K(ret));
} else {
allocator_.set_attr(mem_attr);
is_inited_ = true;
}
return ret;

View File

@ -32,7 +32,7 @@ public:
ObLinkedMacroBlockReader(const ObLinkedMacroBlockReader &) = delete;
ObLinkedMacroBlockReader &operator=(const ObLinkedMacroBlockReader &) = delete;
int init(const blocksstable::MacroBlockId &entry_block);
int init(const blocksstable::MacroBlockId &entry_block, const ObMemAttr &mem_attr);
int iter_read_block(char *&buf, int64_t &buf_len, blocksstable::MacroBlockId &block_id);
void reset();
ObIArray<blocksstable::MacroBlockId> &get_meta_block_list();
@ -69,7 +69,7 @@ public:
ObLinkedMacroBlockItemReader(const ObLinkedMacroBlockItemReader &) = delete;
ObLinkedMacroBlockItemReader &operator=(const ObLinkedMacroBlockItemReader &) = delete;
int init(const blocksstable::MacroBlockId &entry_block);
int init(const blocksstable::MacroBlockId &entry_block, const ObMemAttr &mem_attr);
int get_next_item(char *&item_buf, int64_t &item_buf_len, ObMetaDiskAddr &addr);
void reset();
common::ObIArray<blocksstable::MacroBlockId> &get_meta_block_list();

View File

@ -142,13 +142,12 @@ ObLinkedMacroBlockItemWriter::ObLinkedMacroBlockItemWriter()
: is_inited_(false), is_closed_(false), written_items_cnt_(0),
need_disk_addr_(false), first_inflight_item_idx_(0),
pre_block_inflight_items_cnt_(0), curr_block_inflight_items_cnt_(0),
allocator_(ObModIds::OB_CHECKPOINT, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
block_writer_(), io_buf_(nullptr), io_buf_size_(0),
allocator_(), block_writer_(), io_buf_(nullptr), io_buf_size_(0),
io_buf_pos_(0), common_header_(nullptr), linked_header_(nullptr)
{
}
int ObLinkedMacroBlockItemWriter::init(const bool need_disk_addr)
int ObLinkedMacroBlockItemWriter::init(const bool need_disk_addr, const ObMemAttr &mem_attr)
{
int ret = OB_SUCCESS;
const int64_t macro_block_size = OB_SERVER_BLOCK_MGR.get_macro_block_size();
@ -161,6 +160,7 @@ int ObLinkedMacroBlockItemWriter::init(const bool need_disk_addr)
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", K(ret), K(macro_block_size));
} else {
allocator_.set_attr(mem_attr);
need_disk_addr_ = need_disk_addr;
io_buf_size_ = macro_block_size;
common_header_ = reinterpret_cast<ObMacroBlockCommonHeader *>(io_buf_);

View File

@ -60,7 +60,7 @@ public:
ObLinkedMacroBlockItemWriter(const ObLinkedMacroBlockItemWriter &) = delete;
ObLinkedMacroBlockItemWriter &operator=(const ObLinkedMacroBlockItemWriter &) = delete;
int init(const bool need_disk_addr);
int init(const bool need_disk_addr, const ObMemAttr &mem_attr);
int write_item(const char *item_buf, const int64_t item_buf_len, int64_t *item_idx = nullptr);
int close();
void reset();

View File

@ -41,9 +41,10 @@ int ObServerCheckpointReader::read_checkpoint(const ObServerSuperBlock &super_bl
int ObServerCheckpointReader::read_tenant_meta_checkpoint(const MacroBlockId &entry_block)
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr(OB_SERVER_TENANT_ID, ObModIds::OB_CHECKPOINT);
if (OB_UNLIKELY(!entry_block.is_valid())) {
LOG_INFO("has no tenant config checkpoint");
} else if (OB_FAIL(tenant_meta_item_reader_.init(entry_block))) {
} else if (OB_FAIL(tenant_meta_item_reader_.init(entry_block, mem_attr))) {
LOG_WARN("fail to init tenant config item reader", K(ret));
} else {
char *item_buf = nullptr;

View File

@ -34,6 +34,7 @@ int ObServerCheckpointWriter::init()
int ret = OB_SUCCESS;
const int64_t MEM_LIMIT = 128 << 20; // 128M
const char *MEM_LABEL = "ObServerCheckpointWriter";
ObMemAttr mem_attr(OB_SERVER_TENANT_ID, MEM_LABEL);
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
@ -41,7 +42,7 @@ int ObServerCheckpointWriter::init()
} else if (OB_FAIL(allocator_.init(
common::OB_MALLOC_NORMAL_BLOCK_SIZE, MEM_LABEL, OB_SERVER_TENANT_ID, MEM_LIMIT))) {
LOG_WARN("fail to init fifo allocator", K(ret));
} else if (OB_FAIL(tenant_meta_item_writer_.init(false /*whether need addr*/))) {
} else if (OB_FAIL(tenant_meta_item_writer_.init(false /*whether need addr*/, mem_attr))) {
LOG_WARN("fail to init tenant meta item writer", K(ret));
} else {
is_inited_ = true;

View File

@ -462,6 +462,7 @@ int ObTenantMetaSnapshotHandler::find_tablet_meta_entry(
{
int ret = OB_SUCCESS;
ObLinkedMacroBlockItemReader ls_ckpt_reader;
ObMemAttr mem_attr(MTL_ID(), "Snapshot");
if (OB_UNLIKELY(!ls_meta_entry.is_valid() || !ls_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
@ -469,7 +470,7 @@ int ObTenantMetaSnapshotHandler::find_tablet_meta_entry(
} else if (OB_UNLIKELY(IS_EMPTY_BLOCK_LIST(ls_meta_entry))) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("ls snapshot doesn't exist", K(ret), K(ls_meta_entry));
} else if (OB_FAIL(ls_ckpt_reader.init(ls_meta_entry))) {
} else if (OB_FAIL(ls_ckpt_reader.init(ls_meta_entry, mem_attr))) {
LOG_WARN("fail to init log stream item reader", K(ret), K(ls_meta_entry));
} else {
char *item_buf = nullptr;

View File

@ -37,10 +37,11 @@ int ObTenantStorageCheckpointReader::iter_read_meta_item(
int ret = OB_SUCCESS;
ObLinkedMacroBlockItemReader item_reader;
block_list.reset();
ObMemAttr mem_attr(MTL_ID(), ObModIds::OB_CHECKPOINT);
if (OB_UNLIKELY(IS_EMPTY_BLOCK_LIST(entry_block))) {
LOG_INFO("has no snapshot of log stream", K(ret));
} else if (OB_FAIL(item_reader.init(entry_block))) {
} else if (OB_FAIL(item_reader.init(entry_block, mem_attr))) {
LOG_WARN("failed to init log stream item reader");
} else {
char *item_buf = nullptr;

View File

@ -47,15 +47,16 @@ ObTenantStorageCheckpointWriter::ObTenantStorageCheckpointWriter()
int ObTenantStorageCheckpointWriter::init(const ObTenantStorageMetaType meta_type)
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr(MTL_ID(), ObModIds::OB_CHECKPOINT);
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTenantStorageCheckpointWriter init twice", K(ret));
} else if (OB_UNLIKELY(ObTenantStorageMetaType::INVALID_TYPE == meta_type)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(meta_type));
} else if (OB_FAIL(ls_item_writer_.init(false /*whether need addr*/))) {
} else if (OB_FAIL(ls_item_writer_.init(false /*whether need addr*/, mem_attr))) {
LOG_WARN("fail to init ls item writer", K(ret));
} else if (OB_FAIL(tablet_item_writer_.init(false /*whether need addr*/))) {
} else if (OB_FAIL(tablet_item_writer_.init(false /*whether need addr*/, mem_attr))) {
LOG_WARN("fail to init tablet item writer", K(ret));
} else {
meta_type_ = meta_type;
@ -155,9 +156,10 @@ int ObTenantStorageCheckpointWriter::record_ls_meta(MacroBlockId &ls_entry_block
ls_item_writer_.reset();
tablet_item_writer_.reset();
ObMemAttr mem_attr(MTL_ID(), ObModIds::OB_CHECKPOINT);
if (OB_FAIL(MTL(ObLSService *)->get_ls_iter(ls_iter, ObLSGetMod::STORAGE_MOD))) {
LOG_WARN("failed to get log stream iter", K(ret));
} else if (OB_FAIL(ls_item_writer_.init(false /*whether need addr*/))) {
} else if (OB_FAIL(ls_item_writer_.init(false /*whether need addr*/, mem_attr))) {
LOG_WARN("failed to init log stream item writer", K(ret));
} else {
share::SCN unused_scn;
@ -295,7 +297,8 @@ int ObTenantStorageCheckpointWriter::record_tablet_meta(ObLS &ls, MacroBlockId &
char slog_buf[sizeof(ObUpdateTabletLog)];
tablet_item_writer_.reuse_for_next_round();
if (OB_FAIL(tablet_item_writer_.init(false /*whether need addr*/))) {
ObMemAttr mem_attr(MTL_ID(), ObModIds::OB_CHECKPOINT);
if (OB_FAIL(tablet_item_writer_.init(false /*whether need addr*/, mem_attr))) {
LOG_WARN("failed to init tablet item writer", K(ret));
} else if (OB_FAIL(ls.get_tablet_svr()->build_tablet_iter(tablet_iter))) {
LOG_WARN("fail to build ls tablet iter", K(ret), K(ls));

View File

@ -331,7 +331,8 @@ int ObTabletMacroInfo::persist_macro_ids(
ObLinkedMacroBlockItemWriter &linked_writer)
{
int ret = OB_SUCCESS;
if (OB_FAIL(linked_writer.init(false /*whether need addr*/))) {
ObMemAttr mem_attr(MTL_ID(), "TabletBlockId");
if (OB_FAIL(linked_writer.init(false /*whether need addr*/, mem_attr))) {
LOG_WARN("fail to init linked writer", K(ret));
} else if (OB_FAIL(do_flush_ids(ObTabletMacroType::META_BLOCK, meta_block_info_arr_, allocator, linked_writer))) {
LOG_WARN("fail to persist meta block ids", K(ret));

View File

@ -83,8 +83,9 @@ int ObMacroInfoIterator::init(const ObTabletMacroType target_type, const ObTable
LOG_WARN("invalid args", K(ret), K(macro_info));
} else {
const MacroBlockId &entry_block = macro_info.entry_block_;
ObMemAttr mem_attr(MTL_ID(), "TabletBlockId");
if (!IS_EMPTY_BLOCK_LIST(entry_block)) {
if (OB_FAIL(block_reader_.init(entry_block))) {
if (OB_FAIL(block_reader_.init(entry_block, mem_attr))) {
LOG_WARN("fail to init block reader", K(ret), K(entry_block));
} else {
is_linked_ = true;
@ -122,10 +123,11 @@ int ObMacroInfoIterator::reuse()
if (is_linked_) {
block_reader_.reset();
const MacroBlockId &entry_block = macro_info_->entry_block_;
ObMemAttr mem_attr(MTL_ID(), "TabletBlockId");
if (OB_UNLIKELY(!entry_block.is_valid() || IS_EMPTY_BLOCK_LIST(entry_block))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected entry block", K(ret), K(entry_block), K(is_linked_));
} else if (OB_FAIL(block_reader_.init(entry_block))) {
} else if (OB_FAIL(block_reader_.init(entry_block, mem_attr))) {
LOG_WARN("fail to init block reader", K(ret), K(entry_block));
}
}

View File

@ -116,7 +116,8 @@ void TestLinkedMacroBlock::build_item_buf(ObArray<ItemInfo> &item_arr)
void TestLinkedMacroBlock::write_items(ObArray<ItemInfo> &item_arr)
{
ObLinkedMacroBlockItemWriter item_writer;
ASSERT_EQ(OB_SUCCESS, item_writer.init(true));
ObMemAttr mem_attr(MTL_ID(), "test");
ASSERT_EQ(OB_SUCCESS, item_writer.init(true, mem_attr));
for (int64_t i = 0; i < item_arr.count(); i++) {
ASSERT_EQ(OB_SUCCESS,
item_writer.write_item(item_arr.at(i).buf_, item_arr.at(i).buf_len_, &item_arr.at(i).idx_));
@ -138,7 +139,8 @@ void TestLinkedMacroBlock::write_items(ObArray<ItemInfo> &item_arr)
void TestLinkedMacroBlock::iter_read_items(const ObArray<ItemInfo> &item_arr)
{
ObLinkedMacroBlockItemReader item_reader;
ASSERT_EQ(OB_SUCCESS, item_reader.init(entry_block_));
ObMemAttr mem_attr(OB_SERVER_TENANT_ID, "test");
ASSERT_EQ(OB_SUCCESS, item_reader.init(entry_block_, mem_attr));
char *item_buf = nullptr;
int64_t item_buf_len = 0;
ObMetaDiskAddr addr;
@ -164,7 +166,8 @@ void TestLinkedMacroBlock::iter_read_items(const ObArray<ItemInfo> &item_arr)
void TestLinkedMacroBlock::read_items(const ObArray<ItemInfo> &item_arr)
{
ObLinkedMacroBlockItemReader item_reader;
ASSERT_EQ(OB_SUCCESS, item_reader.init(entry_block_));
ObMemAttr mem_attr(OB_SERVER_TENANT_ID, "test");
ASSERT_EQ(OB_SUCCESS, item_reader.init(entry_block_, mem_attr));
char *item_buf = nullptr;
int64_t item_buf_len = 0;
for (int64_t i = 0; i < item_arr.count(); i++) {
@ -191,7 +194,8 @@ TEST_F(TestLinkedMacroBlock, empty_read_test)
entry_block_ = ObServerSuperBlock::EMPTY_LIST_ENTRY_BLOCK;
ASSERT_TRUE(entry_block_.is_valid());
ObLinkedMacroBlockItemReader item_reader;
ASSERT_EQ(OB_SUCCESS, item_reader.init(entry_block_));
ObMemAttr mem_attr(OB_SERVER_TENANT_ID, "test");
ASSERT_EQ(OB_SUCCESS, item_reader.init(entry_block_, mem_attr));
char *item_buf = nullptr;
int64_t item_buf_len = 0;
ObMetaDiskAddr addr;