use tmp file to reduce the usage of memory when backup

This commit is contained in:
oceanoverflow 2024-11-06 08:14:18 +00:00 committed by ob-robot
parent 816bbe796e
commit f9a60a7c4d
8 changed files with 328 additions and 65 deletions

View File

@ -11,6 +11,7 @@
*/
#include "lib/wait_event/ob_wait_event.h"
#include "lib/stat/ob_latch_define.h"
#include "lib/utility/ob_print_utils.h"
namespace oceanbase

View File

@ -121,6 +121,8 @@ WAIT_EVENT_DEF(TABLET_LOCK_WAIT, 16016, "tablet lock wait", "", "", "", CONCURRE
WAIT_EVENT_DEF(IND_NAME_CACHE_LOCK_WAIT, 16017, "latch:index name cache lock wait", "address", "number", "tries", CONCURRENCY, true, false)
WAIT_EVENT_DEF(ASYNC_COMMITTING_WAIT, 16018, "async commiting wait", "", "", "", COMMIT, true, true)
WAIT_EVENT_DEF(OBCDC_PART_MGR_SCHEMA_VERSION_WAIT, 18000, "oblog part mgr schema version wait", "", "", "", CONCURRENCY, true, true)
WAIT_EVENT_DEF(BACKUP_TMP_FILE_WAIT, 18001, "backup tmp file wait", "", "", "", CONCURRENCY, true, true)
WAIT_EVENT_DEF(BACKUP_TMP_FILE_QUEUE_WAIT, 18002, "backup tmp file queue wait", "", "", "", CONCURRENCY, true, true)
WAIT_EVENT_DEF(SYNC_GET_GTS_WAIT, 18101, "sync get gts timestamp wait", "address", "", "", NETWORK, true, true)
WAIT_EVENT_DEF(BANDWIDTH_THROTTLE_SLEEP, 20000, "sleep: bandwidth throttle sleep wait", "sleep_interval", "", "", NETWORK, true, true)

View File

@ -94,7 +94,7 @@ int ObBackupTmpFile::get_io_info_(const char *buf, const int64_t size, const int
io_info.reset();
io_info.fd_ = file_fd_;
io_info.dir_id_ = file_dir_;
io_info.io_desc_.set_wait_event(2);
io_info.io_desc_.set_wait_event(common::ObWaitEventIds::BACKUP_TMP_FILE_WAIT);
io_info.buf_ = const_cast<char *>(buf);
io_info.size_ = size;
io_info.io_timeout_ms_ = timeout_ms;

View File

@ -16,6 +16,7 @@
#include "storage/backup/ob_backup_data_struct.h"
#include "storage/tmp_file/ob_tmp_file_manager.h"
#include "storage/blocksstable/ob_data_buffer.h"
#include "lib/wait_event/ob_wait_event.h"
namespace oceanbase {
namespace backup {
@ -142,7 +143,7 @@ int ObBackupIndexBufferNode::get_backup_index(T &backup_index)
tmp_file::ObTmpFileIOInfo io_info;
tmp_file::ObTmpFileIOHandle handle;
io_info.fd_ = tmp_file_.get_fd();
io_info.io_desc_.set_wait_event(2);
io_info.io_desc_.set_wait_event(common::ObWaitEventIds::BACKUP_TMP_FILE_WAIT);
io_info.size_ = std::min(need_read_size, estimate_size_ - read_offset_);
io_info.io_timeout_ms_ = timeout_ms;
common::ObArenaAllocator allocator;

View File

@ -44,6 +44,7 @@
#include "share/backup/ob_backup_tablet_reorganize_helper.h"
#include "share/ob_tablet_reorganize_history_table_operator.h"
#include "storage/tablet/ob_mds_schema_helper.h"
#include "lib/wait_event/ob_wait_event.h"
#include <algorithm>
@ -1582,6 +1583,148 @@ bool ObBackupProviderItemCompare::operator()(const ObBackupProviderItem *left, c
return bret;
}
/* ObBackupTmpFileQueue */
ObBackupTmpFileQueue::ObBackupTmpFileQueue()
: is_inited_(false),
tenant_id_(OB_INVALID_ID),
tmp_file_(),
read_offset_(0),
read_count_(0),
write_count_(0),
buffer_writer_(ObModIds::BACKUP)
{}
ObBackupTmpFileQueue::~ObBackupTmpFileQueue()
{
reset();
}
int ObBackupTmpFileQueue::init(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid args", K(ret), K(tenant_id));
} else if (OB_FAIL(tmp_file_.open(tenant_id))) {
LOG_WARN("failed to open tmp file", K(ret), K(tenant_id));
} else {
tenant_id_ = tenant_id;
read_count_ = 0;
write_count_ = 0;
is_inited_ = true;
}
return ret;
}
void ObBackupTmpFileQueue::reset()
{
int tmp_ret = OB_SUCCESS;
if (!tmp_file_.is_opened()) {
// do nothing
} else if (OB_TMP_FAIL(tmp_file_.close())) {
LOG_ERROR_RET(tmp_ret, "failed to close tmp file", K(tmp_ret));
}
}
int ObBackupTmpFileQueue::put_item(const ObBackupProviderItem &item)
{
int ret = OB_SUCCESS;
const int64_t need_write_size = item.get_serialize_size();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("buffer node do not init", K(ret));
} else if (OB_FAIL(buffer_writer_.write_pod(need_write_size))) {
LOG_WARN("failed to write serialize", K(ret), K(need_write_size));
} else if (OB_FAIL(buffer_writer_.write_serialize(item))) {
LOG_WARN("failed to write serialize", K(ret), K(item));
} else if (OB_FAIL(tmp_file_.write(buffer_writer_.data(), buffer_writer_.pos()))) {
LOG_WARN("failed to write to tmp file", K(ret), K_(buffer_writer));
} else {
buffer_writer_.reuse();
write_count_++;
}
return ret;
}
int ObBackupTmpFileQueue::get_item(ObBackupProviderItem &item)
{
int ret = OB_SUCCESS;
item.reset();
const int64_t timeout_ms = 5000;
tmp_file::ObTmpFileIOInfo io_info;
tmp_file::ObTmpFileIOHandle handle;
io_info.fd_ = tmp_file_.get_fd();
io_info.io_desc_.set_wait_event(common::ObWaitEventIds::BACKUP_TMP_FILE_QUEUE_WAIT);
io_info.io_timeout_ms_ = timeout_ms;
common::ObArenaAllocator allocator;
int64_t item_size = 0;
char *buf = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("buffer node do not init", K(ret));
} else if (read_count_ == write_count_) {
ret = OB_ITER_END;
LOG_WARN("iter end", K(ret), K_(read_count), K_(write_count));
} else if (OB_FAIL(get_next_item_size_(item_size))) {
LOG_WARN("failed to get next item size", K(ret));
} else if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(item_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret), K(item_size));
} else if (FALSE_IT(io_info.buf_ = buf)) {
} else if (FALSE_IT(io_info.size_ = item_size)) {
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(MTL_ID(), io_info, read_offset_, handle))) {
LOG_WARN("failed to pread from tmp file", K(ret), K(io_info), K_(read_offset), K(item_size));
} else {
blocksstable::ObBufferReader buffer_reader(buf, item_size);
if (OB_FAIL(buffer_reader.read_serialize(item))) {
LOG_WARN("failed to read serialize", K(ret), K_(read_offset), K_(read_count), K_(write_count));
} else {
read_offset_ += buffer_reader.pos();
read_count_++;
}
}
return ret;
}
int ObBackupTmpFileQueue::get_next_item_size_(int64_t &size)
{
int ret = OB_SUCCESS;
size = 0;
const int64_t timeout_ms = 5000;
tmp_file::ObTmpFileIOInfo io_info;
tmp_file::ObTmpFileIOHandle handle;
io_info.fd_ = tmp_file_.get_fd();
io_info.io_desc_.set_wait_event(common::ObWaitEventIds::BACKUP_TMP_FILE_QUEUE_WAIT);
io_info.size_ = sizeof(int64_t);
io_info.io_timeout_ms_ = timeout_ms;
common::ObArenaAllocator allocator;
int64_t item_size = sizeof(int64_t);
char *buf = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("buffer node do not init", K(ret));
} else if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(item_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret));
} else if (FALSE_IT(io_info.buf_ = buf)) {
} else if (FALSE_IT(io_info.size_ = item_size)) {
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(MTL_ID(), io_info, read_offset_, handle))) {
LOG_WARN("failed to pread from tmp file", K(ret), K(io_info), K_(read_offset), K(item_size));
} else {
blocksstable::ObBufferReader buffer_reader(buf, item_size);
if (OB_FAIL(buffer_reader.read_pod(size))) {
LOG_WARN("failed to read serialize", K(ret), K_(read_offset), K_(read_count), K_(write_count));
} else {
read_offset_ += buffer_reader.pos();
}
}
return ret;
}
/* ObBackupTabletProvider */
ObBackupTabletProvider::ObBackupTabletProvider()
@ -1599,8 +1742,7 @@ ObBackupTabletProvider::ObBackupTabletProvider()
meta_index_store_(),
prev_item_(),
has_prev_item_(false),
lighty_queue_(),
fifo_allocator_()
item_queue_()
{}
ObBackupTabletProvider::~ObBackupTabletProvider()
@ -1622,12 +1764,8 @@ int ObBackupTabletProvider::init(const ObLSBackupParam &param, const share::ObBa
LOG_WARN("get invalid args", K(ret), K(param), K(backup_data_type));
} else if (OB_FAIL(param_.assign(param))) {
LOG_WARN("failed to assign param", K(ret), K(param));
} else if (OB_FAIL(lighty_queue_.init(QUEUE_SIZE, label, tenant_id))) {
LOG_WARN("failed to init lighty queue", K(ret));
} else if (OB_FAIL(fifo_allocator_.init(ObMallocAllocator::get_instance(),
PAGE_SIZE,
lib::ObMemAttr(tenant_id, label)))) {
LOG_WARN("failed to init allocator", K(ret));
} else if (OB_FAIL(item_queue_.init(tenant_id))) {
LOG_WARN("failed to init queue", K(ret));
} else {
backup_data_type_ = backup_data_type;
ls_backup_ctx_ = &ls_backup_ctx;
@ -1647,7 +1785,6 @@ void ObBackupTabletProvider::reset()
is_inited_ = false;
ls_backup_ctx_ = NULL;
free_queue_item_();
fifo_allocator_.reset();
}
void ObBackupTabletProvider::reuse()
@ -1747,7 +1884,7 @@ int ObBackupTabletProvider::inner_get_batch_items_(
while (OB_SUCC(ret) && items.count() < batch_size) {
ObBackupProviderItem item;
if (OB_FAIL(pop_item_from_queue_(item))) {
if (OB_ENTRY_NOT_EXIST == ret) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
} else {
@ -2642,45 +2779,21 @@ int ObBackupTabletProvider::check_tablet_replica_validity_(const uint64_t tenant
int ObBackupTabletProvider::push_item_to_queue_(const ObBackupProviderItem &item)
{
int ret = OB_SUCCESS;
char *buf = NULL;
const int64_t size = sizeof(ObBackupProviderItem);
ObBackupProviderItem *item_ptr = NULL;
if (OB_ISNULL(buf = static_cast<char *>(fifo_allocator_.alloc(size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret));
} else if (OB_ISNULL(item_ptr = new (buf) ObBackupProviderItem)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to new item", K(ret));
fifo_allocator_.free(buf);
buf = NULL;
} else if (OB_FAIL(item_ptr->deep_copy(item))) {
LOG_WARN("failed to deep copy item", K(ret), K(item));
} else if (OB_FAIL(lighty_queue_.push(item_ptr))) {
if (!item.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arg", K(ret), K(item));
} else if (OB_FAIL(item_queue_.put_item(item))) {
LOG_WARN("failed to push back", K(ret));
}
if (OB_FAIL(ret) && OB_NOT_NULL(item_ptr)) {
item_ptr->~ObBackupProviderItem();
fifo_allocator_.free(buf);
buf = NULL;
}
return ret;
}
int ObBackupTabletProvider::pop_item_from_queue_(ObBackupProviderItem &item)
{
int ret = OB_SUCCESS;
void *vp = NULL;
ObBackupProviderItem *item_ptr = NULL;
if (OB_FAIL(lighty_queue_.pop(vp))) {
LOG_WARN("failed to pop from queue", K(ret));
} else {
item_ptr = static_cast<ObBackupProviderItem *>(vp);
item = *item_ptr;
}
if (OB_NOT_NULL(item_ptr)) {
item_ptr->~ObBackupProviderItem();
fifo_allocator_.free(item_ptr);
item_ptr = NULL;
item.reset();
if (OB_FAIL(item_queue_.get_item(item))) {
LOG_WARN("failed to get item", K(ret));
}
return ret;
}
@ -2718,26 +2831,7 @@ int ObBackupTabletProvider::get_tablet_status_(
void ObBackupTabletProvider::free_queue_item_()
{
int ret = OB_SUCCESS;
while (OB_SUCC(ret) && lighty_queue_.is_inited()) {
void *vp = NULL;
ObBackupProviderItem *item_ptr = NULL;
if (OB_FAIL(lighty_queue_.pop(vp))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS; // try best to free
break;
} else {
LOG_ERROR("failed to pop from queue", K(ret));
}
} else {
item_ptr = static_cast<ObBackupProviderItem *>(vp);
}
if (OB_NOT_NULL(item_ptr)) {
item_ptr->~ObBackupProviderItem();
fifo_allocator_.free(item_ptr);
item_ptr = NULL;
}
}
item_queue_.reset();
}
int ObBackupTabletProvider::check_need_reuse_across_sstable_(const common::ObTabletID &tablet_id, const storage::ObITable::TableKey &table_key,
@ -3070,5 +3164,8 @@ bool ObBackupMacroBlockTaskMgr::all_item_is_reused(
return all_no_need_copy;
}
OB_SERIALIZE_MEMBER(ObBackupProviderItem, item_type_, backup_data_type_, logic_id_, macro_block_id_, table_key_, tablet_id_,
nested_offset_, nested_size_, timestamp_, need_copy_, macro_index_, absolute_row_offset_, need_reuse_across_sstable_);
} // namespace backup
} // namespace oceanbase

View File

@ -230,6 +230,7 @@ enum ObBackupProviderItemType {
class ObBackupProviderItem {
friend class ObBackupTabletStat;
friend class ObBackupTabletProvider;
OB_UNIS_VERSION(1);
public:
ObBackupProviderItem();
virtual ~ObBackupProviderItem();
@ -317,6 +318,31 @@ public:
class ObBackupTabletIndexBlockBuilderMgr;
class ObBackupTmpFileQueue final {
public:
ObBackupTmpFileQueue();
~ObBackupTmpFileQueue();
int init(const uint64_t tenant_id);
int put_item(const ObBackupProviderItem &item);
int get_item(ObBackupProviderItem &item);
void reset();
TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(read_count), K_(write_count));
private:
int get_next_item_size_(int64_t &size);
private:
bool is_inited_;
uint64_t tenant_id_;
ObBackupTmpFile tmp_file_;
int64_t read_offset_;
int64_t read_count_;
int64_t write_count_;
blocksstable::ObSelfBufferWriter buffer_writer_;
DISALLOW_COPY_AND_ASSIGN(ObBackupTmpFileQueue);
};
class ObBackupTabletProvider : public ObIBackupTabletProvider {
public:
ObBackupTabletProvider();
@ -415,8 +441,7 @@ private:
ObBackupMetaIndexStore meta_index_store_;
ObBackupProviderItem prev_item_;
bool has_prev_item_;
common::ObLightyQueue lighty_queue_;
common::ObFIFOAllocator fifo_allocator_;
ObBackupTmpFileQueue item_queue_;
DISALLOW_COPY_AND_ASSIGN(ObBackupTabletProvider);
};

View File

@ -115,6 +115,7 @@ storage_unittest(test_backup_device_wrapper backup/test_backup_device_wrapper.cp
storage_unittest(test_backup_sync_io_mock_async_io backup/test_backup_sync_io_mock_async_io.cpp)
storage_unittest(test_backup_device_macro_block_id backup/test_backup_device_macro_block_id.cpp)
storage_unittest(test_backup_compatible backup/test_backup_compatible.cpp)
storage_unittest(test_backup_tmp_file_queue backup/test_backup_tmp_file_queue.cpp)
#storage_unittest(test_create_tablet_clog tx_storage/test_create_tablet_clog.cpp)
storage_unittest(test_safe_destroy_handler tx_storage/test_safe_destroy_handler.cpp)

View File

@ -0,0 +1,136 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE
#include "gtest/gtest.h"
#define private public
#define protected public
#define OK(ass) ASSERT_EQ(OB_SUCCESS, (ass))
#include "lib/ob_errno.h"
#include "storage/backup/ob_backup_tmp_file.h"
#include "storage/backup/ob_backup_data_struct.h"
#include "storage/backup/ob_backup_utils.h"
#include "storage/blocksstable/ob_data_file_prepare.h"
#include "test_backup.h"
#include "mtlenv/mock_tenant_module_env.h"
using namespace oceanbase;
using namespace oceanbase::common;
using namespace oceanbase::share;
using namespace oceanbase::backup;
namespace oceanbase {
namespace backup {
static ObSimpleMemLimitGetter getter;
class TestBackupTmpFileQueue : public TestDataFilePrepare {
public:
TestBackupTmpFileQueue();
virtual ~TestBackupTmpFileQueue();
virtual void SetUp();
virtual void TearDown();
private:
DISALLOW_COPY_AND_ASSIGN(TestBackupTmpFileQueue);
};
TestBackupTmpFileQueue::TestBackupTmpFileQueue() : TestDataFilePrepare(&getter, "TestBackupTmpFileQueue")
{}
TestBackupTmpFileQueue::~TestBackupTmpFileQueue()
{}
void TestBackupTmpFileQueue::SetUp()
{
int ret = OB_SUCCESS;
const int64_t bucket_num = 1024;
const int64_t max_cache_size = 1024 * 1024 * 1024;
const int64_t block_size = common::OB_MALLOC_BIG_BLOCK_SIZE;
TestDataFilePrepare::SetUp();
ret = getter.add_tenant(1, 8L * 1024L * 1024L, 2L * 1024L * 1024L * 1024L);
ASSERT_EQ(OB_SUCCESS, ret);
ret = ObKVGlobalCache::get_instance().init(&getter, bucket_num, max_cache_size, block_size);
if (OB_INIT_TWICE == ret) {
ret = OB_SUCCESS;
} else {
ASSERT_EQ(OB_SUCCESS, ret);
}
// set observer memory limit
CHUNK_MGR.set_limit(8L * 1024L * 1024L * 1024L);
ASSERT_EQ(OB_SUCCESS, common::ObClockGenerator::init());
ASSERT_EQ(OB_SUCCESS, tmp_file::ObTmpBlockCache::get_instance().init("tmp_block_cache", 1));
ASSERT_EQ(OB_SUCCESS, tmp_file::ObTmpPageCache::get_instance().init("sn_tmp_page_cache", 1));
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
tmp_file::ObTenantTmpFileManager *tf_mgr = nullptr;
EXPECT_EQ(OB_SUCCESS, mtl_new_default(tf_mgr));
EXPECT_EQ(OB_SUCCESS, tmp_file::ObTenantTmpFileManager::mtl_init(tf_mgr));
tf_mgr->get_sn_file_manager().page_cache_controller_.write_buffer_pool_.default_wbp_memory_limit_ = 40*1024*1024;
EXPECT_EQ(OB_SUCCESS, tf_mgr->start());
tenant_ctx.set(tf_mgr);
ObTenantEnv::set_tenant(&tenant_ctx);
SERVER_STORAGE_META_SERVICE.is_started_ = true;
}
void TestBackupTmpFileQueue::TearDown()
{
tmp_file::ObTmpBlockCache::get_instance().destroy();
tmp_file::ObTmpPageCache::get_instance().destroy();
ObKVGlobalCache::get_instance().destroy();
TestDataFilePrepare::TearDown();
common::ObClockGenerator::destroy();
}
TEST_F(TestBackupTmpFileQueue, test_backup_tmp_file)
{
int ret = OB_SUCCESS;
ObBackupTmpFileQueue queue;
EXPECT_EQ(OB_SUCCESS, queue.init(1 /*tenant_id*/));
const int64_t count = 1000000;
ObBackupDataType backup_data_type;
backup_data_type.set_major_data_backup();
for (int64_t i = 1; OB_SUCC(ret) && i <= count; ++i) {
ObBackupProviderItem item;
OK(item.set_with_fake(ObBackupProviderItemType::PROVIDER_ITEM_TABLET_AND_SSTABLE_META, ObTabletID(i), backup_data_type));
OK(queue.put_item(item));
}
for (int64_t i = 1; OB_SUCC(ret) && i <= count; ++i) {
ObBackupProviderItem item;
OK(queue.get_item(item));
EXPECT_EQ(item.tablet_id_, ObTabletID(i));
}
}
} // namespace backup
} // namespace oceanbase
int main(int argc, char **argv)
{
system("rm -f test_backup_tmp_file_queue.log*");
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_backup_tmp_file_queue.log", true);
logger.set_log_level("info");
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}