From f9a60a7c4d96a40aa1868d4193fd0ca18529ba17 Mon Sep 17 00:00:00 2001 From: oceanoverflow Date: Wed, 6 Nov 2024 08:14:18 +0000 Subject: [PATCH] use tmp file to reduce the usage of memory when backup --- .../src/lib/wait_event/ob_wait_event.cpp | 1 + deps/oblib/src/lib/wait_event/ob_wait_event.h | 2 + src/storage/backup/ob_backup_tmp_file.cpp | 2 +- src/storage/backup/ob_backup_tmp_file.h | 3 +- src/storage/backup/ob_backup_utils.cpp | 219 +++++++++++++----- src/storage/backup/ob_backup_utils.h | 29 ++- unittest/storage/CMakeLists.txt | 1 + .../backup/test_backup_tmp_file_queue.cpp | 136 +++++++++++ 8 files changed, 328 insertions(+), 65 deletions(-) create mode 100644 unittest/storage/backup/test_backup_tmp_file_queue.cpp diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event.cpp b/deps/oblib/src/lib/wait_event/ob_wait_event.cpp index 22337b90f..4c827ac05 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event.cpp +++ b/deps/oblib/src/lib/wait_event/ob_wait_event.cpp @@ -11,6 +11,7 @@ */ #include "lib/wait_event/ob_wait_event.h" +#include "lib/stat/ob_latch_define.h" #include "lib/utility/ob_print_utils.h" namespace oceanbase diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event.h b/deps/oblib/src/lib/wait_event/ob_wait_event.h index efbc978c8..c79213591 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event.h +++ b/deps/oblib/src/lib/wait_event/ob_wait_event.h @@ -121,6 +121,8 @@ WAIT_EVENT_DEF(TABLET_LOCK_WAIT, 16016, "tablet lock wait", "", "", "", CONCURRE WAIT_EVENT_DEF(IND_NAME_CACHE_LOCK_WAIT, 16017, "latch:index name cache lock wait", "address", "number", "tries", CONCURRENCY, true, false) WAIT_EVENT_DEF(ASYNC_COMMITTING_WAIT, 16018, "async commiting wait", "", "", "", COMMIT, true, true) WAIT_EVENT_DEF(OBCDC_PART_MGR_SCHEMA_VERSION_WAIT, 18000, "oblog part mgr schema version wait", "", "", "", CONCURRENCY, true, true) +WAIT_EVENT_DEF(BACKUP_TMP_FILE_WAIT, 18001, "backup tmp file wait", "", "", "", CONCURRENCY, true, true) +WAIT_EVENT_DEF(BACKUP_TMP_FILE_QUEUE_WAIT, 18002, "backup tmp file queue wait", "", "", "", CONCURRENCY, true, true) WAIT_EVENT_DEF(SYNC_GET_GTS_WAIT, 18101, "sync get gts timestamp wait", "address", "", "", NETWORK, true, true) WAIT_EVENT_DEF(BANDWIDTH_THROTTLE_SLEEP, 20000, "sleep: bandwidth throttle sleep wait", "sleep_interval", "", "", NETWORK, true, true) diff --git a/src/storage/backup/ob_backup_tmp_file.cpp b/src/storage/backup/ob_backup_tmp_file.cpp index 22e0e0693..2991efdde 100644 --- a/src/storage/backup/ob_backup_tmp_file.cpp +++ b/src/storage/backup/ob_backup_tmp_file.cpp @@ -94,7 +94,7 @@ int ObBackupTmpFile::get_io_info_(const char *buf, const int64_t size, const int io_info.reset(); io_info.fd_ = file_fd_; io_info.dir_id_ = file_dir_; - io_info.io_desc_.set_wait_event(2); + io_info.io_desc_.set_wait_event(common::ObWaitEventIds::BACKUP_TMP_FILE_WAIT); io_info.buf_ = const_cast(buf); io_info.size_ = size; io_info.io_timeout_ms_ = timeout_ms; diff --git a/src/storage/backup/ob_backup_tmp_file.h b/src/storage/backup/ob_backup_tmp_file.h index e1f21bf66..4e3b5fe6e 100644 --- a/src/storage/backup/ob_backup_tmp_file.h +++ b/src/storage/backup/ob_backup_tmp_file.h @@ -16,6 +16,7 @@ #include "storage/backup/ob_backup_data_struct.h" #include "storage/tmp_file/ob_tmp_file_manager.h" #include "storage/blocksstable/ob_data_buffer.h" +#include "lib/wait_event/ob_wait_event.h" namespace oceanbase { namespace backup { @@ -142,7 +143,7 @@ int ObBackupIndexBufferNode::get_backup_index(T &backup_index) tmp_file::ObTmpFileIOInfo io_info; tmp_file::ObTmpFileIOHandle handle; io_info.fd_ = tmp_file_.get_fd(); - io_info.io_desc_.set_wait_event(2); + io_info.io_desc_.set_wait_event(common::ObWaitEventIds::BACKUP_TMP_FILE_WAIT); io_info.size_ = std::min(need_read_size, estimate_size_ - read_offset_); io_info.io_timeout_ms_ = timeout_ms; common::ObArenaAllocator allocator; diff --git a/src/storage/backup/ob_backup_utils.cpp b/src/storage/backup/ob_backup_utils.cpp index 7df2031ea..d3694dc4c 100644 --- a/src/storage/backup/ob_backup_utils.cpp +++ b/src/storage/backup/ob_backup_utils.cpp @@ -44,6 +44,7 @@ #include "share/backup/ob_backup_tablet_reorganize_helper.h" #include "share/ob_tablet_reorganize_history_table_operator.h" #include "storage/tablet/ob_mds_schema_helper.h" +#include "lib/wait_event/ob_wait_event.h" #include @@ -1582,6 +1583,148 @@ bool ObBackupProviderItemCompare::operator()(const ObBackupProviderItem *left, c return bret; } +/* ObBackupTmpFileQueue */ + +ObBackupTmpFileQueue::ObBackupTmpFileQueue() + : is_inited_(false), + tenant_id_(OB_INVALID_ID), + tmp_file_(), + read_offset_(0), + read_count_(0), + write_count_(0), + buffer_writer_(ObModIds::BACKUP) +{} + +ObBackupTmpFileQueue::~ObBackupTmpFileQueue() +{ + reset(); +} + +int ObBackupTmpFileQueue::init(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("init twice", K(ret)); + } else if (OB_INVALID_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid args", K(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_.open(tenant_id))) { + LOG_WARN("failed to open tmp file", K(ret), K(tenant_id)); + } else { + tenant_id_ = tenant_id; + read_count_ = 0; + write_count_ = 0; + is_inited_ = true; + } + return ret; +} + +void ObBackupTmpFileQueue::reset() +{ + int tmp_ret = OB_SUCCESS; + if (!tmp_file_.is_opened()) { + // do nothing + } else if (OB_TMP_FAIL(tmp_file_.close())) { + LOG_ERROR_RET(tmp_ret, "failed to close tmp file", K(tmp_ret)); + } +} + +int ObBackupTmpFileQueue::put_item(const ObBackupProviderItem &item) +{ + int ret = OB_SUCCESS; + const int64_t need_write_size = item.get_serialize_size(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("buffer node do not init", K(ret)); + } else if (OB_FAIL(buffer_writer_.write_pod(need_write_size))) { + LOG_WARN("failed to write serialize", K(ret), K(need_write_size)); + } else if (OB_FAIL(buffer_writer_.write_serialize(item))) { + LOG_WARN("failed to write serialize", K(ret), K(item)); + } else if (OB_FAIL(tmp_file_.write(buffer_writer_.data(), buffer_writer_.pos()))) { + LOG_WARN("failed to write to tmp file", K(ret), K_(buffer_writer)); + } else { + buffer_writer_.reuse(); + write_count_++; + } + return ret; +} + +int ObBackupTmpFileQueue::get_item(ObBackupProviderItem &item) +{ + int ret = OB_SUCCESS; + item.reset(); + const int64_t timeout_ms = 5000; + tmp_file::ObTmpFileIOInfo io_info; + tmp_file::ObTmpFileIOHandle handle; + io_info.fd_ = tmp_file_.get_fd(); + io_info.io_desc_.set_wait_event(common::ObWaitEventIds::BACKUP_TMP_FILE_QUEUE_WAIT); + io_info.io_timeout_ms_ = timeout_ms; + common::ObArenaAllocator allocator; + int64_t item_size = 0; + char *buf = NULL; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("buffer node do not init", K(ret)); + } else if (read_count_ == write_count_) { + ret = OB_ITER_END; + LOG_WARN("iter end", K(ret), K_(read_count), K_(write_count)); + } else if (OB_FAIL(get_next_item_size_(item_size))) { + LOG_WARN("failed to get next item size", K(ret)); + } else if (OB_ISNULL(buf = static_cast(allocator.alloc(item_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc memory", K(ret), K(item_size)); + } else if (FALSE_IT(io_info.buf_ = buf)) { + } else if (FALSE_IT(io_info.size_ = item_size)) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(MTL_ID(), io_info, read_offset_, handle))) { + LOG_WARN("failed to pread from tmp file", K(ret), K(io_info), K_(read_offset), K(item_size)); + } else { + blocksstable::ObBufferReader buffer_reader(buf, item_size); + if (OB_FAIL(buffer_reader.read_serialize(item))) { + LOG_WARN("failed to read serialize", K(ret), K_(read_offset), K_(read_count), K_(write_count)); + } else { + read_offset_ += buffer_reader.pos(); + read_count_++; + } + } + return ret; +} + +int ObBackupTmpFileQueue::get_next_item_size_(int64_t &size) +{ + int ret = OB_SUCCESS; + size = 0; + const int64_t timeout_ms = 5000; + tmp_file::ObTmpFileIOInfo io_info; + tmp_file::ObTmpFileIOHandle handle; + io_info.fd_ = tmp_file_.get_fd(); + io_info.io_desc_.set_wait_event(common::ObWaitEventIds::BACKUP_TMP_FILE_QUEUE_WAIT); + io_info.size_ = sizeof(int64_t); + io_info.io_timeout_ms_ = timeout_ms; + common::ObArenaAllocator allocator; + int64_t item_size = sizeof(int64_t); + char *buf = NULL; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("buffer node do not init", K(ret)); + } else if (OB_ISNULL(buf = static_cast(allocator.alloc(item_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc memory", K(ret)); + } else if (FALSE_IT(io_info.buf_ = buf)) { + } else if (FALSE_IT(io_info.size_ = item_size)) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(MTL_ID(), io_info, read_offset_, handle))) { + LOG_WARN("failed to pread from tmp file", K(ret), K(io_info), K_(read_offset), K(item_size)); + } else { + blocksstable::ObBufferReader buffer_reader(buf, item_size); + if (OB_FAIL(buffer_reader.read_pod(size))) { + LOG_WARN("failed to read serialize", K(ret), K_(read_offset), K_(read_count), K_(write_count)); + } else { + read_offset_ += buffer_reader.pos(); + } + } + return ret; +} + /* ObBackupTabletProvider */ ObBackupTabletProvider::ObBackupTabletProvider() @@ -1599,8 +1742,7 @@ ObBackupTabletProvider::ObBackupTabletProvider() meta_index_store_(), prev_item_(), has_prev_item_(false), - lighty_queue_(), - fifo_allocator_() + item_queue_() {} ObBackupTabletProvider::~ObBackupTabletProvider() @@ -1622,12 +1764,8 @@ int ObBackupTabletProvider::init(const ObLSBackupParam ¶m, const share::ObBa LOG_WARN("get invalid args", K(ret), K(param), K(backup_data_type)); } else if (OB_FAIL(param_.assign(param))) { LOG_WARN("failed to assign param", K(ret), K(param)); - } else if (OB_FAIL(lighty_queue_.init(QUEUE_SIZE, label, tenant_id))) { - LOG_WARN("failed to init lighty queue", K(ret)); - } else if (OB_FAIL(fifo_allocator_.init(ObMallocAllocator::get_instance(), - PAGE_SIZE, - lib::ObMemAttr(tenant_id, label)))) { - LOG_WARN("failed to init allocator", K(ret)); + } else if (OB_FAIL(item_queue_.init(tenant_id))) { + LOG_WARN("failed to init queue", K(ret)); } else { backup_data_type_ = backup_data_type; ls_backup_ctx_ = &ls_backup_ctx; @@ -1647,7 +1785,6 @@ void ObBackupTabletProvider::reset() is_inited_ = false; ls_backup_ctx_ = NULL; free_queue_item_(); - fifo_allocator_.reset(); } void ObBackupTabletProvider::reuse() @@ -1747,7 +1884,7 @@ int ObBackupTabletProvider::inner_get_batch_items_( while (OB_SUCC(ret) && items.count() < batch_size) { ObBackupProviderItem item; if (OB_FAIL(pop_item_from_queue_(item))) { - if (OB_ENTRY_NOT_EXIST == ret) { + if (OB_ITER_END == ret) { ret = OB_SUCCESS; break; } else { @@ -2642,45 +2779,21 @@ int ObBackupTabletProvider::check_tablet_replica_validity_(const uint64_t tenant int ObBackupTabletProvider::push_item_to_queue_(const ObBackupProviderItem &item) { int ret = OB_SUCCESS; - char *buf = NULL; - const int64_t size = sizeof(ObBackupProviderItem); - ObBackupProviderItem *item_ptr = NULL; - if (OB_ISNULL(buf = static_cast(fifo_allocator_.alloc(size)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to alloc memory", K(ret)); - } else if (OB_ISNULL(item_ptr = new (buf) ObBackupProviderItem)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to new item", K(ret)); - fifo_allocator_.free(buf); - buf = NULL; - } else if (OB_FAIL(item_ptr->deep_copy(item))) { - LOG_WARN("failed to deep copy item", K(ret), K(item)); - } else if (OB_FAIL(lighty_queue_.push(item_ptr))) { + if (!item.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arg", K(ret), K(item)); + } else if (OB_FAIL(item_queue_.put_item(item))) { LOG_WARN("failed to push back", K(ret)); } - if (OB_FAIL(ret) && OB_NOT_NULL(item_ptr)) { - item_ptr->~ObBackupProviderItem(); - fifo_allocator_.free(buf); - buf = NULL; - } return ret; } int ObBackupTabletProvider::pop_item_from_queue_(ObBackupProviderItem &item) { int ret = OB_SUCCESS; - void *vp = NULL; - ObBackupProviderItem *item_ptr = NULL; - if (OB_FAIL(lighty_queue_.pop(vp))) { - LOG_WARN("failed to pop from queue", K(ret)); - } else { - item_ptr = static_cast(vp); - item = *item_ptr; - } - if (OB_NOT_NULL(item_ptr)) { - item_ptr->~ObBackupProviderItem(); - fifo_allocator_.free(item_ptr); - item_ptr = NULL; + item.reset(); + if (OB_FAIL(item_queue_.get_item(item))) { + LOG_WARN("failed to get item", K(ret)); } return ret; } @@ -2718,26 +2831,7 @@ int ObBackupTabletProvider::get_tablet_status_( void ObBackupTabletProvider::free_queue_item_() { - int ret = OB_SUCCESS; - while (OB_SUCC(ret) && lighty_queue_.is_inited()) { - void *vp = NULL; - ObBackupProviderItem *item_ptr = NULL; - if (OB_FAIL(lighty_queue_.pop(vp))) { - if (OB_ENTRY_NOT_EXIST == ret) { - ret = OB_SUCCESS; // try best to free - break; - } else { - LOG_ERROR("failed to pop from queue", K(ret)); - } - } else { - item_ptr = static_cast(vp); - } - if (OB_NOT_NULL(item_ptr)) { - item_ptr->~ObBackupProviderItem(); - fifo_allocator_.free(item_ptr); - item_ptr = NULL; - } - } + item_queue_.reset(); } int ObBackupTabletProvider::check_need_reuse_across_sstable_(const common::ObTabletID &tablet_id, const storage::ObITable::TableKey &table_key, @@ -3070,5 +3164,8 @@ bool ObBackupMacroBlockTaskMgr::all_item_is_reused( return all_no_need_copy; } +OB_SERIALIZE_MEMBER(ObBackupProviderItem, item_type_, backup_data_type_, logic_id_, macro_block_id_, table_key_, tablet_id_, + nested_offset_, nested_size_, timestamp_, need_copy_, macro_index_, absolute_row_offset_, need_reuse_across_sstable_); + } // namespace backup } // namespace oceanbase diff --git a/src/storage/backup/ob_backup_utils.h b/src/storage/backup/ob_backup_utils.h index b7bf0c983..4be112df3 100644 --- a/src/storage/backup/ob_backup_utils.h +++ b/src/storage/backup/ob_backup_utils.h @@ -230,6 +230,7 @@ enum ObBackupProviderItemType { class ObBackupProviderItem { friend class ObBackupTabletStat; friend class ObBackupTabletProvider; + OB_UNIS_VERSION(1); public: ObBackupProviderItem(); virtual ~ObBackupProviderItem(); @@ -317,6 +318,31 @@ public: class ObBackupTabletIndexBlockBuilderMgr; +class ObBackupTmpFileQueue final { +public: + ObBackupTmpFileQueue(); + ~ObBackupTmpFileQueue(); + int init(const uint64_t tenant_id); + int put_item(const ObBackupProviderItem &item); + int get_item(ObBackupProviderItem &item); + void reset(); + + TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(read_count), K_(write_count)); + +private: + int get_next_item_size_(int64_t &size); + +private: + bool is_inited_; + uint64_t tenant_id_; + ObBackupTmpFile tmp_file_; + int64_t read_offset_; + int64_t read_count_; + int64_t write_count_; + blocksstable::ObSelfBufferWriter buffer_writer_; + DISALLOW_COPY_AND_ASSIGN(ObBackupTmpFileQueue); +}; + class ObBackupTabletProvider : public ObIBackupTabletProvider { public: ObBackupTabletProvider(); @@ -415,8 +441,7 @@ private: ObBackupMetaIndexStore meta_index_store_; ObBackupProviderItem prev_item_; bool has_prev_item_; - common::ObLightyQueue lighty_queue_; - common::ObFIFOAllocator fifo_allocator_; + ObBackupTmpFileQueue item_queue_; DISALLOW_COPY_AND_ASSIGN(ObBackupTabletProvider); }; diff --git a/unittest/storage/CMakeLists.txt b/unittest/storage/CMakeLists.txt index 7591dc2f1..7a2c73768 100644 --- a/unittest/storage/CMakeLists.txt +++ b/unittest/storage/CMakeLists.txt @@ -115,6 +115,7 @@ storage_unittest(test_backup_device_wrapper backup/test_backup_device_wrapper.cp storage_unittest(test_backup_sync_io_mock_async_io backup/test_backup_sync_io_mock_async_io.cpp) storage_unittest(test_backup_device_macro_block_id backup/test_backup_device_macro_block_id.cpp) storage_unittest(test_backup_compatible backup/test_backup_compatible.cpp) +storage_unittest(test_backup_tmp_file_queue backup/test_backup_tmp_file_queue.cpp) #storage_unittest(test_create_tablet_clog tx_storage/test_create_tablet_clog.cpp) storage_unittest(test_safe_destroy_handler tx_storage/test_safe_destroy_handler.cpp) diff --git a/unittest/storage/backup/test_backup_tmp_file_queue.cpp b/unittest/storage/backup/test_backup_tmp_file_queue.cpp new file mode 100644 index 000000000..3c3b51f5d --- /dev/null +++ b/unittest/storage/backup/test_backup_tmp_file_queue.cpp @@ -0,0 +1,136 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX STORAGE +#include "gtest/gtest.h" +#define private public +#define protected public + +#define OK(ass) ASSERT_EQ(OB_SUCCESS, (ass)) + +#include "lib/ob_errno.h" +#include "storage/backup/ob_backup_tmp_file.h" +#include "storage/backup/ob_backup_data_struct.h" +#include "storage/backup/ob_backup_utils.h" +#include "storage/blocksstable/ob_data_file_prepare.h" +#include "test_backup.h" +#include "mtlenv/mock_tenant_module_env.h" + +using namespace oceanbase; +using namespace oceanbase::common; +using namespace oceanbase::share; +using namespace oceanbase::backup; + +namespace oceanbase { +namespace backup { + +static ObSimpleMemLimitGetter getter; + +class TestBackupTmpFileQueue : public TestDataFilePrepare { +public: + TestBackupTmpFileQueue(); + virtual ~TestBackupTmpFileQueue(); + virtual void SetUp(); + virtual void TearDown(); + +private: + DISALLOW_COPY_AND_ASSIGN(TestBackupTmpFileQueue); +}; + +TestBackupTmpFileQueue::TestBackupTmpFileQueue() : TestDataFilePrepare(&getter, "TestBackupTmpFileQueue") +{} + +TestBackupTmpFileQueue::~TestBackupTmpFileQueue() +{} + +void TestBackupTmpFileQueue::SetUp() +{ + int ret = OB_SUCCESS; + const int64_t bucket_num = 1024; + const int64_t max_cache_size = 1024 * 1024 * 1024; + const int64_t block_size = common::OB_MALLOC_BIG_BLOCK_SIZE; + TestDataFilePrepare::SetUp(); + + ret = getter.add_tenant(1, 8L * 1024L * 1024L, 2L * 1024L * 1024L * 1024L); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObKVGlobalCache::get_instance().init(&getter, bucket_num, max_cache_size, block_size); + if (OB_INIT_TWICE == ret) { + ret = OB_SUCCESS; + } else { + ASSERT_EQ(OB_SUCCESS, ret); + } + // set observer memory limit + CHUNK_MGR.set_limit(8L * 1024L * 1024L * 1024L); + ASSERT_EQ(OB_SUCCESS, common::ObClockGenerator::init()); + ASSERT_EQ(OB_SUCCESS, tmp_file::ObTmpBlockCache::get_instance().init("tmp_block_cache", 1)); + ASSERT_EQ(OB_SUCCESS, tmp_file::ObTmpPageCache::get_instance().init("sn_tmp_page_cache", 1)); + + static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID); + ObTenantEnv::set_tenant(&tenant_ctx); + ObTenantIOManager *io_service = nullptr; + EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service)); + EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service)); + EXPECT_EQ(OB_SUCCESS, io_service->start()); + tenant_ctx.set(io_service); + + tmp_file::ObTenantTmpFileManager *tf_mgr = nullptr; + EXPECT_EQ(OB_SUCCESS, mtl_new_default(tf_mgr)); + EXPECT_EQ(OB_SUCCESS, tmp_file::ObTenantTmpFileManager::mtl_init(tf_mgr)); + tf_mgr->get_sn_file_manager().page_cache_controller_.write_buffer_pool_.default_wbp_memory_limit_ = 40*1024*1024; + EXPECT_EQ(OB_SUCCESS, tf_mgr->start()); + tenant_ctx.set(tf_mgr); + + ObTenantEnv::set_tenant(&tenant_ctx); + SERVER_STORAGE_META_SERVICE.is_started_ = true; +} + +void TestBackupTmpFileQueue::TearDown() +{ + tmp_file::ObTmpBlockCache::get_instance().destroy(); + tmp_file::ObTmpPageCache::get_instance().destroy(); + ObKVGlobalCache::get_instance().destroy(); + TestDataFilePrepare::TearDown(); + common::ObClockGenerator::destroy(); +} + +TEST_F(TestBackupTmpFileQueue, test_backup_tmp_file) +{ + int ret = OB_SUCCESS; + ObBackupTmpFileQueue queue; + EXPECT_EQ(OB_SUCCESS, queue.init(1 /*tenant_id*/)); + const int64_t count = 1000000; + ObBackupDataType backup_data_type; + backup_data_type.set_major_data_backup(); + for (int64_t i = 1; OB_SUCC(ret) && i <= count; ++i) { + ObBackupProviderItem item; + OK(item.set_with_fake(ObBackupProviderItemType::PROVIDER_ITEM_TABLET_AND_SSTABLE_META, ObTabletID(i), backup_data_type)); + OK(queue.put_item(item)); + } + for (int64_t i = 1; OB_SUCC(ret) && i <= count; ++i) { + ObBackupProviderItem item; + OK(queue.get_item(item)); + EXPECT_EQ(item.tablet_id_, ObTabletID(i)); + } +} + +} // namespace backup +} // namespace oceanbase + +int main(int argc, char **argv) +{ + system("rm -f test_backup_tmp_file_queue.log*"); + ObLogger &logger = ObLogger::get_logger(); + logger.set_file_name("test_backup_tmp_file_queue.log", true); + logger.set_log_level("info"); + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}