[fix](load) fix duplicate register of memtable writer in memory limiter (#23205)
This commit is contained in:
@ -93,10 +93,14 @@ DeltaWriter::~DeltaWriter() {
|
||||
}
|
||||
|
||||
Status DeltaWriter::init() {
|
||||
if (_is_init) {
|
||||
return Status::OK();
|
||||
}
|
||||
RETURN_IF_ERROR(_rowset_builder.init());
|
||||
RETURN_IF_ERROR(
|
||||
_memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(),
|
||||
_rowset_builder.tablet()->enable_unique_key_merge_on_write()));
|
||||
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
|
||||
_is_init = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -112,8 +112,6 @@ public:
|
||||
// For UT
|
||||
DeleteBitmapPtr get_delete_bitmap() { return _rowset_builder.get_delete_bitmap(); }
|
||||
|
||||
std::shared_ptr<MemTableWriter> memtable_writer() { return _memtable_writer; }
|
||||
|
||||
private:
|
||||
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile,
|
||||
const UniqueId& load_id);
|
||||
|
||||
@ -49,6 +49,8 @@ public:
|
||||
|
||||
MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
|
||||
|
||||
int64_t mem_usage() const { return _mem_usage; }
|
||||
|
||||
private:
|
||||
void _refresh_mem_tracker_without_lock();
|
||||
|
||||
|
||||
@ -178,6 +178,11 @@ public:
|
||||
doris::vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; }
|
||||
FileMetaCache* file_meta_cache() { return _file_meta_cache; }
|
||||
MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); }
|
||||
#ifdef BE_TEST
|
||||
void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) {
|
||||
_memtable_memory_limiter.reset(limiter);
|
||||
}
|
||||
#endif
|
||||
|
||||
// only for unit test
|
||||
void set_master_info(TMasterInfo* master_info) { this->_master_info = master_info; }
|
||||
|
||||
@ -111,7 +111,6 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(channel->open(params));
|
||||
_register_channel_all_writers(channel);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -71,12 +71,6 @@ private:
|
||||
|
||||
Status _start_bg_worker();
|
||||
|
||||
void _register_channel_all_writers(std::shared_ptr<doris::LoadChannel> channel) {
|
||||
for (auto& [_, tablet_channel] : channel->get_tablets_channels()) {
|
||||
tablet_channel->register_memtable_memory_limiter();
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
// lock protect the load channel map
|
||||
std::mutex _lock;
|
||||
|
||||
@ -496,19 +496,4 @@ bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) {
|
||||
return _broken_tablets.find(tablet_id) != _broken_tablets.end();
|
||||
}
|
||||
|
||||
void TabletsChannel::register_memtable_memory_limiter() {
|
||||
auto memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
|
||||
_memtable_writers_foreach([memtable_memory_limiter](std::shared_ptr<MemTableWriter> writer) {
|
||||
memtable_memory_limiter->register_writer(writer);
|
||||
});
|
||||
}
|
||||
|
||||
void TabletsChannel::_memtable_writers_foreach(
|
||||
std::function<void(std::shared_ptr<MemTableWriter>)> fn) {
|
||||
std::lock_guard<SpinLock> l(_tablet_writers_lock);
|
||||
for (auto& [_, delta_writer] : _tablet_writers) {
|
||||
fn(delta_writer->memtable_writer());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -113,8 +113,6 @@ public:
|
||||
|
||||
void refresh_profile();
|
||||
|
||||
void register_memtable_memory_limiter();
|
||||
|
||||
private:
|
||||
template <typename Request>
|
||||
Status _get_current_seq(int64_t& cur_seq, const Request& request);
|
||||
@ -133,7 +131,6 @@ private:
|
||||
int64_t tablet_id, Status error);
|
||||
bool _is_broken_tablet(int64_t tablet_id);
|
||||
void _init_profile(RuntimeProfile* profile);
|
||||
void _memtable_writers_foreach(std::function<void(std::shared_ptr<MemTableWriter>)> fn);
|
||||
|
||||
// id of this load channel
|
||||
TabletsChannelKey _key;
|
||||
|
||||
@ -84,10 +84,13 @@ static void set_up() {
|
||||
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
exec_env->set_storage_engine(k_engine);
|
||||
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
|
||||
k_engine->start_bg_threads();
|
||||
}
|
||||
|
||||
static void tear_down() {
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
exec_env->set_memtable_memory_limiter(nullptr);
|
||||
if (k_engine != nullptr) {
|
||||
k_engine->stop();
|
||||
delete k_engine;
|
||||
|
||||
@ -82,9 +82,12 @@ static void set_up() {
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
exec_env->set_storage_engine(k_engine);
|
||||
k_engine->start_bg_threads();
|
||||
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
|
||||
}
|
||||
|
||||
static void tear_down() {
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
exec_env->set_memtable_memory_limiter(nullptr);
|
||||
if (k_engine != nullptr) {
|
||||
k_engine->stop();
|
||||
delete k_engine;
|
||||
|
||||
@ -83,32 +83,29 @@ protected:
|
||||
std::vector<StorePath> paths;
|
||||
paths.emplace_back(config::storage_root_path, -1);
|
||||
|
||||
_mgr = new MemTableMemoryLimiter();
|
||||
doris::EngineOptions options;
|
||||
options.store_paths = paths;
|
||||
Status s = doris::StorageEngine::open(options, &_engine);
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
exec_env->set_storage_engine(_engine);
|
||||
_engine->start_bg_threads();
|
||||
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
exec_env->set_memtable_memory_limiter(nullptr);
|
||||
if (_engine != nullptr) {
|
||||
_engine->stop();
|
||||
delete _engine;
|
||||
_engine = nullptr;
|
||||
}
|
||||
if (_mgr != nullptr) {
|
||||
delete _mgr;
|
||||
_mgr = nullptr;
|
||||
}
|
||||
EXPECT_EQ(system("rm -rf ./data_test"), 0);
|
||||
io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME")) + "/" +
|
||||
UNUSED_PREFIX);
|
||||
}
|
||||
|
||||
StorageEngine* _engine = nullptr;
|
||||
MemTableMemoryLimiter* _mgr = nullptr;
|
||||
};
|
||||
|
||||
TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) {
|
||||
@ -143,6 +140,7 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) {
|
||||
profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest");
|
||||
DeltaWriter::open(&write_req, &delta_writer, profile.get(), TUniqueId());
|
||||
ASSERT_NE(delta_writer, nullptr);
|
||||
auto mem_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
|
||||
|
||||
vectorized::Block block;
|
||||
for (const auto& slot_desc : tuple_desc->slots()) {
|
||||
@ -164,15 +162,9 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) {
|
||||
res = delta_writer->write(&block, {0});
|
||||
ASSERT_TRUE(res.ok());
|
||||
}
|
||||
std::mutex lock;
|
||||
_mgr->init(100);
|
||||
auto memtable_writer = delta_writer->memtable_writer();
|
||||
{
|
||||
std::lock_guard<std::mutex> l(lock);
|
||||
_mgr->register_writer(memtable_writer);
|
||||
}
|
||||
_mgr->handle_memtable_flush();
|
||||
CHECK_EQ(0, memtable_writer->active_memtable_mem_consumption());
|
||||
mem_limiter->init(100);
|
||||
mem_limiter->handle_memtable_flush();
|
||||
CHECK_EQ(0, mem_limiter->mem_usage());
|
||||
|
||||
res = delta_writer->close();
|
||||
EXPECT_EQ(Status::OK(), res);
|
||||
|
||||
@ -261,9 +261,13 @@ public:
|
||||
EngineOptions options;
|
||||
options.store_paths = paths;
|
||||
doris::StorageEngine::open(options, &k_engine);
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
|
||||
}
|
||||
|
||||
static void TearDownTestSuite() {
|
||||
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
|
||||
exec_env->set_memtable_memory_limiter(nullptr);
|
||||
if (k_engine != nullptr) {
|
||||
k_engine->stop();
|
||||
delete k_engine;
|
||||
|
||||
Reference in New Issue
Block a user