[branch-2.1](memory) Add ThreadMemTrackerMgr BE UT (#37654)

## Proposed changes

pick #35518
This commit is contained in:
Xinyi Zou
2024-07-11 21:03:49 +08:00
committed by GitHub
parent fed632bf4a
commit 62e0230523
6 changed files with 467 additions and 10 deletions

View File

@ -263,7 +263,9 @@ public:
this->_dummy_lru_cache = dummy_lru_cache;
}
void set_write_cooldown_meta_executors();
static void set_tracking_memory(bool tracking_memory) {
_s_tracking_memory.store(tracking_memory, std::memory_order_acquire);
}
#endif
LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); }

View File

@ -125,6 +125,9 @@ public:
fmt::to_string(consumer_tracker_buf));
}
int64_t untracked_mem() const { return _untracked_mem; }
int64_t reserved_mem() const { return _reserved_mem; }
private:
// is false: ExecEnv::ready() = false when thread local is initialized
bool _init = false;
@ -190,7 +193,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) {
if (_reserved_mem != 0) {
if (_reserved_mem >= size) {
if (_reserved_mem > size) {
// only need to subtract _reserved_mem, no need to consume MemTracker,
// every time _reserved_mem is minus the sum of size >= SYNC_PROC_RESERVED_INTERVAL_BYTES,
// subtract size from process global reserved memory,
@ -208,7 +211,8 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
}
return;
} else {
// reserved memory is insufficient, the remaining _reserved_mem is subtracted from this memory consumed,
// _reserved_mem <= size, reserved memory used done,
// the remaining _reserved_mem is subtracted from this memory consumed,
// and reset _reserved_mem to 0, and subtract the remaining _reserved_mem from
// process global reserved memory, this means that all reserved memory has been used by BE process.
size -= _reserved_mem;

View File

@ -156,14 +156,12 @@ public:
void attach_task(const TUniqueId& task_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
#ifndef BE_TEST
// will only attach_task at the beginning of the thread function, there should be no duplicate attach_task.
DCHECK(mem_tracker);
// Orphan is thread default tracker.
DCHECK(thread_mem_tracker()->label() == "Orphan")
<< ", thread mem tracker label: " << thread_mem_tracker()->label()
<< ", attach mem tracker label: " << mem_tracker->label();
#endif
_task_id = task_id;
thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
thread_mem_tracker_mgr->set_query_id(_task_id);
@ -374,9 +372,7 @@ public:
class SwitchThreadMemTrackerLimiter {
public:
explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
#ifndef BE_TEST
DCHECK(mem_tracker);
#endif
ThreadLocalHandle::create_thread_local_if_not_exits();
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
@ -385,9 +381,7 @@ public:
explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) {
ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(thread_context()->task_id() == query_thread_context.query_id);
#ifndef BE_TEST
DCHECK(query_thread_context.query_mem_tracker);
#endif
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);

View File

@ -38,7 +38,7 @@ TEST(MemTrackerTest, SingleTrackerNoLimit) {
t->release(5);
}
TEST(MemTestTest, SingleTrackerWithLimit) {
TEST(MemTrackerTest, SingleTrackerWithLimit) {
auto t = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "limit tracker",
11);
EXPECT_TRUE(t->has_limit());

View File

@ -0,0 +1,455 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/memory/thread_mem_tracker_mgr.h"
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include "gtest/gtest_pred_impl.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
namespace doris {
TEST(ThreadMemTrackerMgrTest, ConsumeMemory) {
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
std::shared_ptr<MemTrackerLimiter> t =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ConsumeMemory");
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
thread_context->attach_task(TUniqueId(), t);
thread_context->consume_memory(size1);
// size1 < config::mem_tracker_consume_min_size_bytes, not consume mem tracker.
EXPECT_EQ(t->consumption(), 0);
thread_context->consume_memory(size2);
// size1 + size2 > onfig::mem_tracker_consume_min_size_bytes, consume mem tracker.
EXPECT_EQ(t->consumption(), size1 + size2);
thread_context->consume_memory(-size1);
// std::abs(-size1) < config::mem_tracker_consume_min_size_bytes, not consume mem tracker.
EXPECT_EQ(t->consumption(), size1 + size2);
thread_context->thread_mem_tracker_mgr->flush_untracked_mem();
EXPECT_EQ(t->consumption(), size2);
thread_context->consume_memory(-size2);
// std::abs(-size2) > onfig::mem_tracker_consume_min_size_bytes, consume mem tracker.
EXPECT_EQ(t->consumption(), 0);
thread_context->consume_memory(-size2);
EXPECT_EQ(t->consumption(), -size2);
thread_context->consume_memory(-size1);
EXPECT_EQ(t->consumption(), -size2);
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
thread_context->consume_memory(size2 * 2);
thread_context->consume_memory(size2 * 10);
thread_context->consume_memory(size2 * 100);
thread_context->consume_memory(size2 * 1000);
thread_context->consume_memory(size2 * 10000);
thread_context->consume_memory(-size2 * 2);
thread_context->consume_memory(-size2 * 10);
thread_context->consume_memory(-size2 * 100);
thread_context->consume_memory(-size2 * 1000);
thread_context->consume_memory(-size2 * 10000);
thread_context->detach_task();
EXPECT_EQ(t->consumption(), 0); // detach automatic call flush_untracked_mem.
}
TEST(ThreadMemTrackerMgrTest, Boundary) {
// TODO, Boundary check may not be necessary, add some `IF` maybe increase cost time.
}
TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) {
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
std::shared_ptr<MemTrackerLimiter> t1 = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker1");
std::shared_ptr<MemTrackerLimiter> t2 = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker2");
std::shared_ptr<MemTrackerLimiter> t3 = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker3");
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
thread_context->attach_task(TUniqueId(), t1);
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
EXPECT_EQ(t1->consumption(), size1 + size2);
thread_context->consume_memory(size1);
thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
EXPECT_EQ(t1->consumption(),
size1 + size2 + size1); // attach automatic call flush_untracked_mem.
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
thread_context->consume_memory(size1);
EXPECT_EQ(t1->consumption(), size1 + size2 + size1); // not changed, now consume t2
EXPECT_EQ(t2->consumption(), size1 + size2);
thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); // detach
EXPECT_EQ(t2->consumption(),
size1 + size2 + size1); // detach automatic call flush_untracked_mem.
thread_context->consume_memory(size2);
thread_context->consume_memory(size2);
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
EXPECT_EQ(t2->consumption(), size1 + size2 + size1); // not changed, now consume t1
thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
thread_context->consume_memory(-size1);
thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3);
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
thread_context->consume_memory(size1);
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
EXPECT_EQ(t2->consumption(), size1 + size2); // attach automatic call flush_untracked_mem.
EXPECT_EQ(t3->consumption(), size1 + size2);
thread_context->consume_memory(-size1);
thread_context->consume_memory(-size2);
thread_context->consume_memory(-size1);
EXPECT_EQ(t3->consumption(), size1);
thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t2); // detach
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
EXPECT_EQ(t2->consumption(), size1 + size2);
EXPECT_EQ(t3->consumption(), 0);
thread_context->consume_memory(-size1);
thread_context->consume_memory(-size2);
thread_context->consume_memory(-size1);
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
EXPECT_EQ(t2->consumption(), 0);
thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); // detach
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
EXPECT_EQ(t2->consumption(), -size1);
thread_context->consume_memory(-t1->consumption());
thread_context->detach_task(); // detach t1
EXPECT_EQ(t1->consumption(), 0);
}
TEST(ThreadMemTrackerMgrTest, MultiMemTracker) {
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
std::shared_ptr<MemTrackerLimiter> t1 =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-MultiMemTracker1");
std::shared_ptr<MemTracker> t2 = std::make_shared<MemTracker>("UT-MultiMemTracker2", t1.get());
std::shared_ptr<MemTracker> t3 = std::make_shared<MemTracker>("UT-MultiMemTracker3", t1.get());
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
thread_context->attach_task(TUniqueId(), t1);
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
thread_context->consume_memory(size1);
EXPECT_EQ(t1->consumption(), size1 + size2);
bool rt = thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t2.get());
EXPECT_EQ(rt, true);
EXPECT_EQ(t1->consumption(), size1 + size2);
EXPECT_EQ(t2->consumption(), -size1); // _untracked_mem = size1
thread_context->consume_memory(size2);
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2);
EXPECT_EQ(t2->consumption(), size2);
rt = thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t2.get());
EXPECT_EQ(rt, false);
thread_context->consume_memory(size2);
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
EXPECT_EQ(t2->consumption(), size2 + size2);
rt = thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t3.get());
EXPECT_EQ(rt, true);
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
thread_context->consume_memory(-size1); // _untracked_mem = -size1
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 + size2);
EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2);
EXPECT_EQ(t3->consumption(), size1 + size2);
thread_context->thread_mem_tracker_mgr->pop_consumer_tracker();
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 + size2 - size1);
EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1);
EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
thread_context->consume_memory(-size2);
thread_context->consume_memory(size2);
thread_context->consume_memory(-size2);
thread_context->thread_mem_tracker_mgr->pop_consumer_tracker();
EXPECT_EQ(t1->consumption(),
size1 + size2 + size1 + size2 + size2 + size1 + size2 - size1 - size2);
EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 - size2);
EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
thread_context->consume_memory(-t1->consumption());
thread_context->detach_task(); // detach t1
EXPECT_EQ(t1->consumption(), 0);
EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 - size2);
EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
}
TEST(ThreadMemTrackerMgrTest, ScopedCount) {
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
std::shared_ptr<MemTrackerLimiter> t1 =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ScopedCount");
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
thread_context->attach_task(TUniqueId(), t1);
thread_context->thread_mem_tracker_mgr->start_count_scope_mem();
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
thread_context->consume_memory(size1);
int64_t scope_mem = thread_context->thread_mem_tracker_mgr->stop_count_scope_mem();
EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size1);
EXPECT_EQ(t1->consumption(), scope_mem);
thread_context->consume_memory(-size2);
thread_context->consume_memory(-size1);
thread_context->consume_memory(-size2);
EXPECT_EQ(t1->consumption(), size1 + size1);
EXPECT_EQ(scope_mem, size1 + size2 + size1 + size2 + size1);
}
TEST(ThreadMemTrackerMgrTest, ReserveMemory) {
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
std::shared_ptr<MemTrackerLimiter> t =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ReserveMemory");
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
int64_t size3 = size2 * 1024;
thread_context->attach_task(TUniqueId(), t);
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
EXPECT_EQ(t->consumption(), size1 + size2);
thread_context->try_reserve_memory(size3);
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
thread_context->consume_memory(size2);
thread_context->consume_memory(-size2);
thread_context->consume_memory(size2);
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->consume_memory(-size1);
thread_context->consume_memory(-size1);
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
// std::abs(-size1 - size1) < SYNC_PROC_RESERVED_INTERVAL_BYTES, not update process_reserved_memory.
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->consume_memory(size2 * 1023);
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 + size1);
std::cout << "11111 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
<< thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl;
thread_context->consume_memory(size1);
thread_context->consume_memory(size1);
std::cout << "2222 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
<< thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl;
std::cout << "3333 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
<< thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl;
// reserved memory used done
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
// no reserved memory, normal memory consumption
EXPECT_EQ(t->consumption(), size1 + size2 + size3 + size1 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
thread_context->consume_memory(-size3);
thread_context->consume_memory(-size1);
thread_context->consume_memory(-size2);
EXPECT_EQ(t->consumption(), size1 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
thread_context->try_reserve_memory(size3);
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
thread_context->consume_memory(-size1);
// ThreadMemTrackerMgr _reserved_mem = size3 + size1
// ThreadMemTrackerMgr _untracked_mem = -size1
thread_context->consume_memory(size3);
// ThreadMemTrackerMgr _reserved_mem = size1
// ThreadMemTrackerMgr _untracked_mem = -size1 + size3
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
size1); // size3 + size1 - size3
thread_context->consume_memory(-size3);
// ThreadMemTrackerMgr _reserved_mem = size1 + size3
// ThreadMemTrackerMgr _untracked_mem = 0, std::abs(-size3) > SYNC_PROC_RESERVED_INTERVAL_BYTES,
// so update process_reserved_memory.
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 + size3);
thread_context->consume_memory(size1);
thread_context->consume_memory(size2);
thread_context->consume_memory(size1);
// ThreadMemTrackerMgr _reserved_mem = size1 + size3 - size1 - size2 - size1 = size3 - size2 - size1
// ThreadMemTrackerMgr _untracked_mem = size1
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
// size1 + size3 - (size1 + size2)
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->release_reserved_memory();
// size1 + size2 + size3 - _reserved_mem, size1 + size2 + size3 - (size3 - size2 - size1)
EXPECT_EQ(t->consumption(), size1 + size2 + size1 + size2);
// size3 - size2 - (_reserved_mem + _untracked_mem) = 0, size3 - size2 - ((size3 - size2 - size1) + (size1)) = 0
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
thread_context->detach_task();
EXPECT_EQ(t->consumption(), size1 + size2 + size1 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
}
TEST(ThreadMemTrackerMgrTest, NestedReserveMemory) {
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
std::shared_ptr<MemTrackerLimiter> t = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER, "UT-NestedReserveMemory");
int64_t size2 = 4 * 1024 * 1024;
int64_t size3 = size2 * 1024;
thread_context->attach_task(TUniqueId(), t);
thread_context->try_reserve_memory(size3);
EXPECT_EQ(t->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
thread_context->consume_memory(size2);
// ThreadMemTrackerMgr _reserved_mem = size3 - size2
// ThreadMemTrackerMgr _untracked_mem = 0, size2 > SYNC_PROC_RESERVED_INTERVAL_BYTES,
// update process_reserved_memory.
EXPECT_EQ(t->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->try_reserve_memory(size2);
// ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2
// ThreadMemTrackerMgr _untracked_mem = 0
EXPECT_EQ(t->consumption(), size3 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
size3); // size3 - size2 + size2
thread_context->try_reserve_memory(size3);
thread_context->try_reserve_memory(size3);
thread_context->consume_memory(size3);
thread_context->consume_memory(size2);
thread_context->consume_memory(size3);
// ThreadMemTrackerMgr _reserved_mem = size3 - size2
// ThreadMemTrackerMgr _untracked_mem = 0
EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->release_reserved_memory();
// size3 + size2 + size3 + size3 - _reserved_mem, size3 + size2 + size3 + size3 - (size3 - size2)
EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size2);
// size3 - size2 - (_reserved_mem + _untracked_mem) = 0, size3 - size2 - ((size3 - size2 - size1) + (size1)) = 0
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
thread_context->detach_task();
EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
}
TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) {
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
std::shared_ptr<MemTrackerLimiter> t1 = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTrackerReserveMemory1");
std::shared_ptr<MemTrackerLimiter> t2 = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTrackerReserveMemory2");
std::shared_ptr<MemTrackerLimiter> t3 = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTrackerReserveMemory3");
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
int64_t size3 = size2 * 1024;
thread_context->attach_task(TUniqueId(), t1);
thread_context->try_reserve_memory(size3);
thread_context->consume_memory(size2);
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
thread_context->try_reserve_memory(size3);
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(t2->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2 + size3);
thread_context->consume_memory(size2 + size3); // reserved memory used done
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(t2->consumption(), size3 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3);
thread_context->try_reserve_memory(size3);
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(t2->consumption(), size3 + size2);
EXPECT_EQ(t3->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2 + size3);
thread_context->consume_memory(-size2);
thread_context->consume_memory(-size1);
// ThreadMemTrackerMgr _reserved_mem = size3 + size2 + size1
// ThreadMemTrackerMgr _untracked_mem = -size1
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(t2->consumption(), size3 + size2);
EXPECT_EQ(t3->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
size3 - size2 + size3 + size2);
thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t2); // detach
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(t2->consumption(), size3 + size2);
EXPECT_EQ(t3->consumption(), -size1 - size2); // size3 - _reserved_mem
// size3 - size2 + size3 + size2 - (_reserved_mem + _untracked_mem)
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); // detach
EXPECT_EQ(t1->consumption(), size3);
// not changed, reserved memory used done.
EXPECT_EQ(t2->consumption(), size3 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->detach_task();
EXPECT_EQ(t1->consumption(), size2); // size3 - _reserved_mem
// size3 - size2 - (_reserved_mem + _untracked_mem)
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
}
} // end namespace doris

View File

@ -69,6 +69,8 @@ int main(int argc, char** argv) {
static_cast<void>(service->start());
doris::global_test_http_host = "http://127.0.0.1:" + std::to_string(service->get_real_port());
doris::ExecEnv::GetInstance()->set_tracking_memory(false);
int res = RUN_ALL_TESTS();
return res;
}