From 0d7a61ae8c7ea723b3498bf2c19d37129f16737d Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 22 Aug 2023 10:05:17 +0800 Subject: [PATCH] [fix](load) fix duplicate register of memtable writer in memory limiter (#23205) --- be/src/olap/delta_writer.cpp | 4 ++++ be/src/olap/delta_writer.h | 2 -- be/src/olap/memtable_memory_limiter.h | 2 ++ be/src/runtime/exec_env.h | 5 +++++ be/src/runtime/load_channel_mgr.cpp | 1 - be/src/runtime/load_channel_mgr.h | 6 ----- be/src/runtime/tablets_channel.cpp | 15 ------------- be/src/runtime/tablets_channel.h | 3 --- be/test/olap/delta_writer_test.cpp | 3 +++ .../engine_storage_migration_task_test.cpp | 3 +++ be/test/olap/memtable_memory_limiter_test.cpp | 22 ++++++------------- be/test/olap/tablet_cooldown_test.cpp | 4 ++++ 12 files changed, 28 insertions(+), 42 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index f8f73c4801..ed65f1b001 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -93,10 +93,14 @@ DeltaWriter::~DeltaWriter() { } Status DeltaWriter::init() { + if (_is_init) { + return Status::OK(); + } RETURN_IF_ERROR(_rowset_builder.init()); RETURN_IF_ERROR( _memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(), _rowset_builder.tablet()->enable_unique_key_merge_on_write())); + ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; return Status::OK(); } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 764b23d2aa..b9b20a8705 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -112,8 +112,6 @@ public: // For UT DeleteBitmapPtr get_delete_bitmap() { return _rowset_builder.get_delete_bitmap(); } - std::shared_ptr memtable_writer() { return _memtable_writer; } - private: DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile, const UniqueId& load_id); diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index ea66ce62e0..492a266b76 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -49,6 +49,8 @@ public: MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); } + int64_t mem_usage() const { return _mem_usage; } + private: void _refresh_mem_tracker_without_lock(); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 67d804da96..89162e5eaa 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -178,6 +178,11 @@ public: doris::vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } FileMetaCache* file_meta_cache() { return _file_meta_cache; } MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); } +#ifdef BE_TEST + void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) { + _memtable_memory_limiter.reset(limiter); + } +#endif // only for unit test void set_master_info(TMasterInfo* master_info) { this->_master_info = master_info; } diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index e41c1f01a1..9987a34d5b 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -111,7 +111,6 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { } RETURN_IF_ERROR(channel->open(params)); - _register_channel_all_writers(channel); return Status::OK(); } diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 81991eee8c..3c094a4251 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -71,12 +71,6 @@ private: Status _start_bg_worker(); - void _register_channel_all_writers(std::shared_ptr channel) { - for (auto& [_, tablet_channel] : channel->get_tablets_channels()) { - tablet_channel->register_memtable_memory_limiter(); - } - } - protected: // lock protect the load channel map std::mutex _lock; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 9d8346601b..300a7a40e9 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -496,19 +496,4 @@ bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) { return _broken_tablets.find(tablet_id) != _broken_tablets.end(); } -void TabletsChannel::register_memtable_memory_limiter() { - auto memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter(); - _memtable_writers_foreach([memtable_memory_limiter](std::shared_ptr writer) { - memtable_memory_limiter->register_writer(writer); - }); -} - -void TabletsChannel::_memtable_writers_foreach( - std::function)> fn) { - std::lock_guard l(_tablet_writers_lock); - for (auto& [_, delta_writer] : _tablet_writers) { - fn(delta_writer->memtable_writer()); - } -} - } // namespace doris diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index ea8beed799..e3d8d87ec3 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -113,8 +113,6 @@ public: void refresh_profile(); - void register_memtable_memory_limiter(); - private: template Status _get_current_seq(int64_t& cur_seq, const Request& request); @@ -133,7 +131,6 @@ private: int64_t tablet_id, Status error); bool _is_broken_tablet(int64_t tablet_id); void _init_profile(RuntimeProfile* profile); - void _memtable_writers_foreach(std::function)> fn); // id of this load channel TabletsChannelKey _key; diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 51ee5b2236..511904debb 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -84,10 +84,13 @@ static void set_up() { ExecEnv* exec_env = doris::ExecEnv::GetInstance(); exec_env->set_storage_engine(k_engine); + exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter()); k_engine->start_bg_threads(); } static void tear_down() { + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_memtable_memory_limiter(nullptr); if (k_engine != nullptr) { k_engine->stop(); delete k_engine; diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 37afc0ea18..2dca954321 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -82,9 +82,12 @@ static void set_up() { ExecEnv* exec_env = doris::ExecEnv::GetInstance(); exec_env->set_storage_engine(k_engine); k_engine->start_bg_threads(); + exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter()); } static void tear_down() { + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_memtable_memory_limiter(nullptr); if (k_engine != nullptr) { k_engine->stop(); delete k_engine; diff --git a/be/test/olap/memtable_memory_limiter_test.cpp b/be/test/olap/memtable_memory_limiter_test.cpp index 404a4fb61a..dca8544af3 100644 --- a/be/test/olap/memtable_memory_limiter_test.cpp +++ b/be/test/olap/memtable_memory_limiter_test.cpp @@ -83,32 +83,29 @@ protected: std::vector paths; paths.emplace_back(config::storage_root_path, -1); - _mgr = new MemTableMemoryLimiter(); doris::EngineOptions options; options.store_paths = paths; Status s = doris::StorageEngine::open(options, &_engine); ExecEnv* exec_env = doris::ExecEnv::GetInstance(); exec_env->set_storage_engine(_engine); _engine->start_bg_threads(); + exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter()); } void TearDown() override { + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_memtable_memory_limiter(nullptr); if (_engine != nullptr) { _engine->stop(); delete _engine; _engine = nullptr; } - if (_mgr != nullptr) { - delete _mgr; - _mgr = nullptr; - } EXPECT_EQ(system("rm -rf ./data_test"), 0); io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME")) + "/" + UNUSED_PREFIX); } StorageEngine* _engine = nullptr; - MemTableMemoryLimiter* _mgr = nullptr; }; TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) { @@ -143,6 +140,7 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) { profile = std::make_unique("MemTableMemoryLimiterTest"); DeltaWriter::open(&write_req, &delta_writer, profile.get(), TUniqueId()); ASSERT_NE(delta_writer, nullptr); + auto mem_limiter = ExecEnv::GetInstance()->memtable_memory_limiter(); vectorized::Block block; for (const auto& slot_desc : tuple_desc->slots()) { @@ -164,15 +162,9 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) { res = delta_writer->write(&block, {0}); ASSERT_TRUE(res.ok()); } - std::mutex lock; - _mgr->init(100); - auto memtable_writer = delta_writer->memtable_writer(); - { - std::lock_guard l(lock); - _mgr->register_writer(memtable_writer); - } - _mgr->handle_memtable_flush(); - CHECK_EQ(0, memtable_writer->active_memtable_mem_consumption()); + mem_limiter->init(100); + mem_limiter->handle_memtable_flush(); + CHECK_EQ(0, mem_limiter->mem_usage()); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index ac25b378c7..15e78ecc4e 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -261,9 +261,13 @@ public: EngineOptions options; options.store_paths = paths; doris::StorageEngine::open(options, &k_engine); + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter()); } static void TearDownTestSuite() { + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_memtable_memory_limiter(nullptr); if (k_engine != nullptr) { k_engine->stop(); delete k_engine;