reduce memory use in backup_reader
This commit is contained in:
@ -198,7 +198,7 @@ ObIMacroBlockBackupReader::~ObIMacroBlockBackupReader()
|
||||
|
||||
ObMacroBlockBackupReader::ObMacroBlockBackupReader()
|
||||
: ObIMacroBlockBackupReader(), is_data_ready_(false), result_code_(), macro_handle_(),
|
||||
buffer_reader_(), io_allocator_("BUR_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
|
||||
buffer_reader_()
|
||||
{}
|
||||
|
||||
ObMacroBlockBackupReader::~ObMacroBlockBackupReader()
|
||||
@ -225,13 +225,13 @@ int ObMacroBlockBackupReader::init(const ObBackupMacroBlockId ¯o_id)
|
||||
}
|
||||
|
||||
int ObMacroBlockBackupReader::get_macro_block_data(
|
||||
blocksstable::ObBufferReader &buffer_reader, blocksstable::ObLogicMacroBlockId &logic_id)
|
||||
blocksstable::ObBufferReader &buffer_reader, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited", K(ret));
|
||||
} else if (OB_FAIL(process_())) {
|
||||
} else if (OB_FAIL(process_(io_allocator))) {
|
||||
LOG_WARN("failed to process", K(ret));
|
||||
} else if (OB_FAIL(result_code_)) {
|
||||
LOG_WARN("error happened during fetch macro block", K(ret));
|
||||
@ -254,18 +254,20 @@ void ObMacroBlockBackupReader::reset()
|
||||
block_info_.reset();
|
||||
}
|
||||
|
||||
int ObMacroBlockBackupReader::process_()
|
||||
int ObMacroBlockBackupReader::process_(ObIAllocator *io_allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
blocksstable::ObMacroBlockReadInfo read_info;
|
||||
read_info.io_timeout_ms_ = GCONF._data_storage_io_timeout / 1000L;
|
||||
io_allocator_.reuse();
|
||||
if (is_data_ready_) {
|
||||
LOG_INFO("macro data is ready, no need fetch", K(ret));
|
||||
} else if (OB_FAIL(get_macro_read_info_(logic_id_, read_info))) {
|
||||
LOG_WARN("failed to get macro block read info", K(ret), K(logic_id_));
|
||||
} else if (OB_ISNULL(io_allocator)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("io allocator is null", K(ret));
|
||||
} else if (OB_ISNULL(read_info.buf_ =
|
||||
reinterpret_cast<char*>(io_allocator_.alloc(read_info.size_)))) {
|
||||
reinterpret_cast<char*>(io_allocator->alloc(read_info.size_)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
STORAGE_LOG(WARN, "failed to alloc macro read info buffer", K(ret), K(read_info.size_));
|
||||
} else {
|
||||
@ -386,7 +388,7 @@ int ObMultiMacroBlockBackupReader::init(const uint64_t tenant_id,
|
||||
}
|
||||
|
||||
int ObMultiMacroBlockBackupReader::get_next_macro_block(
|
||||
blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id)
|
||||
blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
@ -395,7 +397,7 @@ int ObMultiMacroBlockBackupReader::get_next_macro_block(
|
||||
} else if (reader_idx_ >= readers_.count()) {
|
||||
ret = OB_ITER_END;
|
||||
LOG_DEBUG("multi macro block backup reader get end", K(ret));
|
||||
} else if (OB_FAIL(fetch_macro_block_with_retry_(data, logic_id))) {
|
||||
} else if (OB_FAIL(fetch_macro_block_with_retry_(data, logic_id, io_allocator))) {
|
||||
LOG_WARN("failed to fetch macro block with retry", K(ret));
|
||||
} else {
|
||||
++reader_idx_;
|
||||
@ -456,7 +458,7 @@ int ObMultiMacroBlockBackupReader::prepare_macro_block_reader_(const int64_t idx
|
||||
}
|
||||
|
||||
int ObMultiMacroBlockBackupReader::fetch_macro_block_with_retry_(
|
||||
blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id)
|
||||
blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t retry_times = 0;
|
||||
@ -464,7 +466,7 @@ int ObMultiMacroBlockBackupReader::fetch_macro_block_with_retry_(
|
||||
if (retry_times >= 1) {
|
||||
LOG_WARN("retry get macro block", K(retry_times));
|
||||
}
|
||||
if (OB_FAIL(fetch_macro_block_(data, logic_id))) {
|
||||
if (OB_FAIL(fetch_macro_block_(data, logic_id, io_allocator))) {
|
||||
LOG_WARN("failed to fetch macro block", K(ret), K(retry_times));
|
||||
}
|
||||
#ifdef ERRSIM
|
||||
@ -489,7 +491,7 @@ int ObMultiMacroBlockBackupReader::fetch_macro_block_with_retry_(
|
||||
}
|
||||
|
||||
int ObMultiMacroBlockBackupReader::fetch_macro_block_(
|
||||
blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id)
|
||||
blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t idx = reader_idx_;
|
||||
@ -499,7 +501,7 @@ int ObMultiMacroBlockBackupReader::fetch_macro_block_(
|
||||
LOG_WARN("get invalid args", K(ret), K(idx));
|
||||
} else if (OB_FAIL(prepare_macro_block_reader_(idx))) {
|
||||
LOG_WARN("failed to prepare macro block reader", K(ret));
|
||||
} else if (OB_FAIL(readers_.at(idx)->get_macro_block_data(data, logic_id))) {
|
||||
} else if (OB_FAIL(readers_.at(idx)->get_macro_block_data(data, logic_id, io_allocator))) {
|
||||
LOG_WARN("failed to get macro block data", K(ret), K(idx), K(readers_.count()));
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -146,7 +146,7 @@ public:
|
||||
virtual ~ObIMacroBlockBackupReader();
|
||||
virtual int init(const ObBackupMacroBlockId ¯o_id) = 0;
|
||||
virtual int get_macro_block_data(
|
||||
blocksstable::ObBufferReader &buffer_reader, blocksstable::ObLogicMacroBlockId &logic_id) = 0;
|
||||
blocksstable::ObBufferReader &buffer_reader, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator) = 0;
|
||||
virtual void reset() = 0;
|
||||
virtual ObMacroBlockReaderType get_type() const = 0;
|
||||
TO_STRING_KV(K_(logic_id));
|
||||
@ -164,7 +164,7 @@ public:
|
||||
virtual ~ObMacroBlockBackupReader();
|
||||
int init(const ObBackupMacroBlockId ¯o_id);
|
||||
virtual int get_macro_block_data(
|
||||
blocksstable::ObBufferReader &buffer_reader, blocksstable::ObLogicMacroBlockId &logic_id) override;
|
||||
blocksstable::ObBufferReader &buffer_reader, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator) override;
|
||||
virtual void reset() override;
|
||||
virtual ObMacroBlockReaderType get_type() const override
|
||||
{
|
||||
@ -173,7 +173,7 @@ public:
|
||||
TO_STRING_KV(K_(logic_id), K_(block_info));
|
||||
|
||||
private:
|
||||
int process_();
|
||||
int process_(ObIAllocator *io_allocator);
|
||||
int get_macro_read_info_(const blocksstable::ObLogicMacroBlockId &logic_id, blocksstable::ObMacroBlockReadInfo &read_info);
|
||||
int get_macro_block_size_(const blocksstable::ObBufferReader &buffer, int64_t &size);
|
||||
|
||||
@ -182,7 +182,6 @@ private:
|
||||
int64_t result_code_;
|
||||
blocksstable::ObMacroBlockHandle macro_handle_;
|
||||
blocksstable::ObBufferReader buffer_reader_;
|
||||
common::ObArenaAllocator io_allocator_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObMacroBlockBackupReader);
|
||||
};
|
||||
|
||||
@ -191,15 +190,15 @@ public:
|
||||
ObMultiMacroBlockBackupReader();
|
||||
virtual ~ObMultiMacroBlockBackupReader();
|
||||
int init(const uint64_t tenant_id, const ObMacroBlockReaderType &reader_type, const common::ObIArray<ObBackupMacroBlockId> &list);
|
||||
int get_next_macro_block(blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id);
|
||||
int get_next_macro_block(blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator);
|
||||
void reset();
|
||||
|
||||
private:
|
||||
int alloc_macro_block_reader_(const uint64_t tenant_id, const ObMacroBlockReaderType &reader_type, ObIMacroBlockBackupReader *&reader);
|
||||
int prepare_macro_block_reader_(const int64_t idx);
|
||||
void reset_prev_macro_block_reader_(const int64_t idx);
|
||||
int fetch_macro_block_with_retry_(blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id);
|
||||
int fetch_macro_block_(blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id);
|
||||
int fetch_macro_block_with_retry_(blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator);
|
||||
int fetch_macro_block_(blocksstable::ObBufferReader &data, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator);
|
||||
|
||||
private:
|
||||
static const int64_t FETCH_MACRO_BLOCK_RETRY_INTERVAL = 1 * 1000 * 1000L; // 1s
|
||||
|
||||
@ -2697,6 +2697,7 @@ int ObLSBackupDataTask::do_backup_macro_block_data_()
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<ObBackupMacroBlockId> macro_list;
|
||||
ObMultiMacroBlockBackupReader *reader = NULL;
|
||||
ObArenaAllocator io_allocator("BUR_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_);
|
||||
if (OB_ISNULL(ls_backup_ctx_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls backup ctx should not be null", K(ret));
|
||||
@ -2709,6 +2710,7 @@ int ObLSBackupDataTask::do_backup_macro_block_data_()
|
||||
} else {
|
||||
LOG_INFO("start backup macro block data", K(macro_list));
|
||||
while (OB_SUCC(ret)) {
|
||||
io_allocator.reuse();
|
||||
ObBufferReader buffer_reader;
|
||||
ObLogicMacroBlockId logic_id;
|
||||
ObBackupMacroBlockIndex macro_index;
|
||||
@ -2720,7 +2722,7 @@ int ObLSBackupDataTask::do_backup_macro_block_data_()
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(get_next_macro_block_data_(reader, buffer_reader, logic_id))) {
|
||||
} else if (OB_FAIL(get_next_macro_block_data_(reader, buffer_reader, logic_id, &io_allocator))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
LOG_INFO("iterator meet end", K(logic_id));
|
||||
ret = OB_SUCCESS;
|
||||
@ -3072,13 +3074,13 @@ int ObLSBackupDataTask::prepare_tablet_meta_reader_(const common::ObTabletID &ta
|
||||
}
|
||||
|
||||
int ObLSBackupDataTask::get_next_macro_block_data_(
|
||||
ObMultiMacroBlockBackupReader *reader, ObBufferReader &buffer_reader, blocksstable::ObLogicMacroBlockId &logic_id)
|
||||
ObMultiMacroBlockBackupReader *reader, ObBufferReader &buffer_reader, blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(reader)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", K(ret));
|
||||
} else if (OB_FAIL(reader->get_next_macro_block(buffer_reader, logic_id))) {
|
||||
} else if (OB_FAIL(reader->get_next_macro_block(buffer_reader, logic_id, io_allocator))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
// do nothing
|
||||
} else {
|
||||
|
||||
@ -541,7 +541,7 @@ private:
|
||||
int prepare_tablet_meta_reader_(const common::ObTabletID &tablet_id, const ObTabletMetaReaderType &reader_type,
|
||||
storage::ObTabletHandle &tablet_handle, ObITabletMetaBackupReader *&reader);
|
||||
int get_next_macro_block_data_(ObMultiMacroBlockBackupReader *reader, blocksstable::ObBufferReader &buffer_reader,
|
||||
blocksstable::ObLogicMacroBlockId &logic_id);
|
||||
blocksstable::ObLogicMacroBlockId &logic_id, ObIAllocator *io_allocator);
|
||||
int check_macro_block_data_(const blocksstable::ObBufferReader &data);
|
||||
int write_macro_block_data_(const blocksstable::ObBufferReader &data, const blocksstable::ObLogicMacroBlockId &logic_id,
|
||||
ObBackupMacroBlockIndex ¯o_index);
|
||||
|
||||
Reference in New Issue
Block a user