fix memory hold too much

This commit is contained in:
wxhwang 2023-05-16 10:16:43 +00:00 committed by ob-robot
parent 11437edf50
commit 3810f27cb6
6 changed files with 51 additions and 34 deletions

View File

@ -119,7 +119,8 @@ class TestRestoreTaskMgr : public testing::Test
{
public:
virtual void SetUp() {
task_mgr_.init();
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
EXPECT_EQ(OB_SUCCESS, task_mgr_.init());
}
virtual void TearDown() {
}
@ -207,7 +208,6 @@ public:
virtual ~TestLSRestoreHandler() = default;
static void SetUpTestCase()
{
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
SAFE_DESTROY_INSTANCE.init();
SAFE_DESTROY_INSTANCE.start();
ObServerCheckpointSlogHandler::get_instance().is_started_ = true;

View File

@ -167,43 +167,45 @@ int ObLSBackupRestoreUtil::read_macro_block_id_mapping_metas(const common::ObStr
}
int ObLSBackupRestoreUtil::read_macro_block_data(const common::ObString &path, const share::ObBackupStorageInfo *storage_info,
const ObBackupMacroBlockIndex &macro_index, const int64_t align_size, common::ObIAllocator &allocator,
const ObBackupMacroBlockIndex &macro_index, const int64_t align_size, blocksstable::ObBufferReader &read_buffer,
blocksstable::ObBufferReader &data_buffer)
{
int ret = OB_SUCCESS;
char *buf = NULL;
if (path.empty() || !macro_index.is_valid()) {
char *buf = read_buffer.current();
if (path.empty() || !macro_index.is_valid() || !read_buffer.is_valid() || !data_buffer.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid args", K(ret), K(path), K(macro_index));
LOG_WARN("get invalid args", K(ret), K(path), K(macro_index), K(read_buffer), K(data_buffer));
} else if (align_size <= 0 || !common::is_io_aligned(macro_index.length_) || !common::is_io_aligned(align_size)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid args", K(macro_index), K(align_size));
} else if (OB_ISNULL(buf = reinterpret_cast<char *>(allocator.alloc(macro_index.length_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc read buf", K(ret), K(macro_index));
LOG_WARN("get invalid args", K(ret), K(macro_index), K(align_size));
} else if (read_buffer.remain() < macro_index.length_) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("read buffer not enough", K(ret), K(path), K(macro_index), K(read_buffer), K(data_buffer));
} else if (OB_FAIL(pread_file(path, storage_info, macro_index.offset_, macro_index.length_, buf))) {
LOG_WARN("failed to pread buffer", K(ret), K(path), K(macro_index));
} else {
blocksstable::ObBufferReader buffer_reader(buf, macro_index.length_);
const ObBackupCommonHeader *common_header = NULL;
if (OB_FAIL(buffer_reader.get(common_header))) {
LOG_WARN("failed to get common_header", K(ret), K(path), K(macro_index), K(buffer_reader));
if (OB_FAIL(read_buffer.get(common_header))) {
LOG_WARN("failed to get common_header", K(ret), K(path), K(macro_index), K(read_buffer));
} else if (OB_ISNULL(common_header)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common header is null", K(ret), K(path), K(macro_index), K(buffer_reader));
LOG_WARN("common header is null", K(ret), K(path), K(macro_index), K(read_buffer));
} else if (OB_FAIL(common_header->check_valid())) {
LOG_WARN("common_header is not valid", K(ret), K(path), K(macro_index), K(buffer_reader));
} else if (common_header->data_zlength_ > buffer_reader.remain()) {
LOG_WARN("common_header is not valid", K(ret), K(path), K(macro_index), K(read_buffer));
} else if (common_header->data_zlength_ > read_buffer.remain()) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("buffer_reader not enough", K(ret), K(path), K(macro_index), K(buffer_reader));
} else if (OB_FAIL(common_header->check_data_checksum(buffer_reader.current(), common_header->data_zlength_))) {
LOG_WARN("failed to check data checksum", K(ret), K(*common_header), K(path), K(macro_index), K(buffer_reader));
LOG_WARN("read_buffer not enough", K(ret), K(path), K(macro_index), K(read_buffer));
} else if (OB_FAIL(common_header->check_data_checksum(read_buffer.current(), common_header->data_zlength_))) {
LOG_WARN("failed to check data checksum", K(ret), K(*common_header), K(path), K(macro_index), K(read_buffer));
} else {
const int64_t common_header_size = common_header->header_length_;
MEMMOVE(buf, buffer_reader.current(), macro_index.length_ - common_header_size);
data_buffer.assign(buf, macro_index.length_, macro_index.length_);
LOG_INFO("read macro block data", K(path), K(macro_index), K(data_buffer));
char *dest = data_buffer.current();
const int64_t macro_block_size = macro_index.length_ - common_header->header_length_;
if (OB_FAIL(data_buffer.set_pos(macro_index.length_/* IO size should be aligned */))) {
LOG_WARN("failed to set pos", K(ret), K(*common_header), K(path), K(macro_index), K(read_buffer), K(data_buffer), K(macro_block_size));
} else {
MEMCPY(dest, read_buffer.current(), macro_block_size);
LOG_INFO("read macro block data", K(path), K(macro_index), K(data_buffer));
}
}
}
return ret;

View File

@ -33,7 +33,7 @@ public:
static int read_macro_block_id_mapping_metas(const common::ObString &path, const share::ObBackupStorageInfo *storage_info,
const ObBackupMetaIndex &meta_index, ObBackupMacroBlockIDMappingsMeta &id_mappings_meta);
static int read_macro_block_data(const common::ObString &path, const share::ObBackupStorageInfo *storage_info,
const ObBackupMacroBlockIndex &macro_index, const int64_t align_size, common::ObIAllocator &allocator,
const ObBackupMacroBlockIndex &macro_index, const int64_t align_size, blocksstable::ObBufferReader &read_buffer,
blocksstable::ObBufferReader &data_buffer);
static int pread_file(
const ObString &path, const share::ObBackupStorageInfo *storage_info, const int64_t offset, const int64_t read_size, char *buf);

View File

@ -380,11 +380,13 @@ ObCopyMacroBlockRestoreReader::ObCopyMacroBlockRestoreReader()
second_meta_index_store_(nullptr),
restore_macro_block_id_mgr_(nullptr),
data_buffer_(),
allocator_("CMBReReader"),
allocator_(),
macro_block_index_(0),
macro_block_count_(0),
data_size_(0)
{
ObMemAttr attr(MTL_ID(), "CMBReReader");
allocator_.set_attr(attr);
}
ObCopyMacroBlockRestoreReader::~ObCopyMacroBlockRestoreReader()
@ -424,15 +426,26 @@ int ObCopyMacroBlockRestoreReader::init(
int ObCopyMacroBlockRestoreReader::alloc_buffers()
{
int ret = OB_SUCCESS;
const int64_t READ_BUFFER_SIZE = OB_DEFAULT_MACRO_BLOCK_SIZE * 2;
char *buf = NULL;
char *read_buf = NULL;
// used in init() func, should not check is_inited_
if (NULL == (buf = reinterpret_cast<char*>(allocator_.alloc(OB_DEFAULT_MACRO_BLOCK_SIZE)))) {
if (OB_ISNULL(buf = reinterpret_cast<char*>(allocator_.alloc(OB_DEFAULT_MACRO_BLOCK_SIZE)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc buf", K(ret));
} else if (OB_ISNULL(read_buf = reinterpret_cast<char*>(allocator_.alloc(READ_BUFFER_SIZE)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc read_buf", K(ret));
} else {
data_buffer_.assign(buf, OB_DEFAULT_MACRO_BLOCK_SIZE);
read_buffer_.assign(read_buf, READ_BUFFER_SIZE);
}
if (OB_FAIL(ret)) {
allocator_.reset();
}
return ret;
}
@ -468,6 +481,8 @@ int ObCopyMacroBlockRestoreReader::get_next_macro_block(
share::ObBackupStorageInfo storage_info;
share::ObRestoreBackupSetBriefInfo backup_set_brief_info;
share::ObBackupDest backup_set_dest;
data_buffer_.set_pos(0);
read_buffer_.set_pos(0);
if (OB_FAIL(restore_macro_block_id_mgr_->get_macro_block_id(macro_block_index_, logic_block_id, physic_block_id))) {
LOG_WARN("failed to get macro block id", K(ret), K(macro_block_index_), K(table_key_), KPC(restore_base_info_));
} else if (OB_FAIL(physic_block_id.get_backup_macro_block_index(logic_block_id, macro_index))) {
@ -482,7 +497,7 @@ int ObCopyMacroBlockRestoreReader::get_next_macro_block(
data_type, macro_index.turn_id_, macro_index.retry_id_, macro_index.file_id_, backup_path))) {
LOG_WARN("failed to get macro block index", K(ret), K(restore_base_info_), K(macro_index), KPC(restore_base_info_));
} else if (OB_FAIL(backup::ObLSBackupRestoreUtil::read_macro_block_data(backup_path.get_obstr(),
restore_base_info_->backup_dest_.get_storage_info(), macro_index, align_size, allocator_, data_buffer_))) {
restore_base_info_->backup_dest_.get_storage_info(), macro_index, align_size, read_buffer_, data_buffer_))) {
LOG_WARN("failed to read macro block data", K(ret), K(table_key_), K(macro_index), K(physic_block_id), KPC(restore_base_info_));
} else {
data_size_ += data_buffer_.length();

View File

@ -137,6 +137,7 @@ private:
backup::ObBackupMetaIndexStoreWrapper *second_meta_index_store_;
ObRestoreMacroBlockIdMgr *restore_macro_block_id_mgr_;
blocksstable::ObBufferReader data_buffer_; // Data used to assemble macroblocks
blocksstable::ObBufferReader read_buffer_; // Buffer used to read macro data
common::ObArenaAllocator allocator_;
int64_t macro_block_index_;
int64_t macro_block_count_;

View File

@ -39,17 +39,16 @@ ObLSRestoreTaskMgr::~ObLSRestoreTaskMgr()
int ObLSRestoreTaskMgr::init()
{
int ret = OB_SUCCESS;
const char *tablet_dag_net_task = "tabletDagNetTask";
ObMemAttr attr(MTL_ID(), "RestoreTaskMgr");
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_FAIL(tablet_map_.create(OB_RESTORE_MAX_DAG_NET_NUM, tablet_dag_net_task))) {
} else if (OB_FAIL(tablet_map_.create(OB_RESTORE_MAX_DAG_NET_NUM, attr))) {
LOG_WARN("fail to create tablet_map_", K(ret));
} else if (OB_FAIL(schedule_tablet_set_.create(OB_LS_RESTORE_MAX_TABLET_NUM))) {
LOG_WARN("fail to create schedule_tablet_set_", K(ret));
} else if (OB_FAIL(wait_tablet_set_.create(OB_LS_RESTORE_MAX_TABLET_NUM))) {
} else if (OB_FAIL(schedule_tablet_set_.create(OB_LS_RESTORE_MAX_TABLET_NUM, attr))) {
LOG_WARN("fail to create schedule_tablet_set_", K(ret));
} else if (OB_FAIL(wait_tablet_set_.create(OB_LS_RESTORE_MAX_TABLET_NUM, attr))) {
LOG_WARN("fail to create wait_tablet_set_", K(ret));
} else {
is_inited_ = true;
}