[fix](memory tracker) Fix lru cache, compaction tracker, add USE_MEM_TRACKER compile (#9661)
1. Fix Lru Cache MemTracker consumption value is negative. 2. Fix compaction Cache MemTracker has no track. 3. Add USE_MEM_TRACKER compile option. 4. Make sure the malloc/free hook is not stopped at any time.
This commit is contained in:
@ -438,6 +438,14 @@ if (WITH_LZO)
|
||||
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DDORIS_WITH_LZO")
|
||||
endif()
|
||||
|
||||
# Enable memory tracker, which allows BE to limit the memory of tasks such as query, load,
|
||||
# and compaction,and observe the memory of BE through be_ip:http_port/MemTracker.
|
||||
# Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn off the memory tracker,
|
||||
# which will bring about a 2% performance improvement, which may be useful in performance POC.
|
||||
if (USE_MEM_TRACKER)
|
||||
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DUSE_MEM_TRACKER")
|
||||
endif()
|
||||
|
||||
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
|
||||
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -faligned-new")
|
||||
endif()
|
||||
|
||||
@ -1609,6 +1609,8 @@ void TaskWorkerPool::_random_sleep(int second) {
|
||||
}
|
||||
|
||||
void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
|
||||
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION,
|
||||
StorageEngine::instance()->compaction_mem_tracker());
|
||||
while (_is_work) {
|
||||
TAgentTaskRequest agent_task_req;
|
||||
TCompactionReq compaction_req;
|
||||
|
||||
@ -64,6 +64,10 @@ public:
|
||||
// Get next tuple
|
||||
Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override;
|
||||
|
||||
Status get_next(vectorized::Block* block, bool* eof) override {
|
||||
return Status::NotSupported("Not Implemented get block");
|
||||
}
|
||||
|
||||
// Close this scanner
|
||||
void close() override;
|
||||
|
||||
|
||||
@ -1442,7 +1442,6 @@ char* SimpleItoaWithCommas(__int128_t i, char* buffer, int32_t buffer_size) {
|
||||
return p;
|
||||
}
|
||||
|
||||
|
||||
// ----------------------------------------------------------------------
|
||||
// ItoaKMGT()
|
||||
// Description: converts an integer to a string
|
||||
@ -1480,7 +1479,7 @@ string ItoaKMGT(int64 i) {
|
||||
}
|
||||
|
||||
string AccurateItoaKMGT(int64 i) {
|
||||
const char *sign = "";
|
||||
const char* sign = "";
|
||||
if (i < 0) {
|
||||
// We lose some accuracy if the caller passes LONG_LONG_MIN, but
|
||||
// that's OK as this function is only for human readability
|
||||
@ -1489,31 +1488,39 @@ string AccurateItoaKMGT(int64 i) {
|
||||
i = -i;
|
||||
}
|
||||
|
||||
string ret = std::to_string(i) + " : " + StringPrintf("%s", sign);
|
||||
string ret = std::to_string(i) + " = " + StringPrintf("%s", sign);
|
||||
int64 val;
|
||||
if ((val = (i >> 40)) > 1) {
|
||||
ret += StringPrintf("%" PRId64 "%s", val, "T");
|
||||
ret += StringPrintf("%" PRId64
|
||||
"%s"
|
||||
" + ",
|
||||
val, "T");
|
||||
i = i - (val << 40);
|
||||
}
|
||||
if ((val = (i >> 30)) > 1) {
|
||||
ret += StringPrintf(" %" PRId64 "%s", val, "G");
|
||||
ret += StringPrintf("%" PRId64
|
||||
"%s"
|
||||
" + ",
|
||||
val, "G");
|
||||
i = i - (val << 30);
|
||||
}
|
||||
if ((val = (i >> 20)) > 1) {
|
||||
ret += StringPrintf(" %" PRId64 "%s", val, "M");
|
||||
ret += StringPrintf("%" PRId64
|
||||
"%s"
|
||||
" + ",
|
||||
val, "M");
|
||||
i = i - (val << 20);
|
||||
}
|
||||
if ((val = (i >> 10)) > 1) {
|
||||
ret += StringPrintf(" %" PRId64 "%s", val, "K");
|
||||
ret += StringPrintf("%" PRId64 "%s", val, "K");
|
||||
i = i - (val << 10);
|
||||
} else {
|
||||
ret += StringPrintf(" %" PRId64 "%s", i, "K");
|
||||
ret += StringPrintf("%" PRId64 "%s", i, "K");
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// DEPRECATED(wadetregaskis).
|
||||
// These are non-inline because some BUILD files turn on -Wformat-non-literal.
|
||||
|
||||
|
||||
@ -26,11 +26,18 @@ using std::vector;
|
||||
namespace doris {
|
||||
|
||||
Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
|
||||
: _mem_tracker(MemTracker::create_tracker(-1, label, nullptr, MemTrackerLevel::INSTANCE)),
|
||||
_tablet(tablet),
|
||||
: _tablet(tablet),
|
||||
_input_rowsets_size(0),
|
||||
_input_row_num(0),
|
||||
_state(CompactionState::INITED) {}
|
||||
_state(CompactionState::INITED) {
|
||||
#ifndef BE_TEST
|
||||
_mem_tracker = MemTracker::create_tracker(-1, label,
|
||||
StorageEngine::instance()->compaction_mem_tracker(),
|
||||
MemTrackerLevel::INSTANCE);
|
||||
#else
|
||||
_mem_tracker = MemTracker::get_process_tracker();
|
||||
#endif
|
||||
}
|
||||
|
||||
Compaction::~Compaction() {}
|
||||
|
||||
|
||||
@ -53,6 +53,8 @@ public:
|
||||
Status execute_compact();
|
||||
virtual Status execute_compact_impl() = 0;
|
||||
|
||||
std::shared_ptr<MemTracker>& get_mem_tracker() { return _mem_tracker; }
|
||||
|
||||
protected:
|
||||
virtual Status pick_rowsets_to_compact() = 0;
|
||||
virtual std::string compaction_name() const = 0;
|
||||
|
||||
@ -283,7 +283,7 @@ void LRUCache::_evict_one_entry(LRUHandle* e) {
|
||||
|
||||
Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
|
||||
void (*deleter)(const CacheKey& key, void* value),
|
||||
CachePriority priority) {
|
||||
CachePriority priority, MemTracker* tracker) {
|
||||
size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
|
||||
LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
|
||||
e->value = value;
|
||||
@ -296,7 +296,12 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value,
|
||||
e->next = e->prev = nullptr;
|
||||
e->in_cache = true;
|
||||
e->priority = priority;
|
||||
e->mem_tracker = tracker;
|
||||
memcpy(e->key_data, key.data(), key.size());
|
||||
// The memory of the parameter value should be recorded in the tls mem tracker,
|
||||
// transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
|
||||
if (tracker)
|
||||
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(tracker, e->total_size);
|
||||
LRUHandle* to_remove_head = nullptr;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_mutex);
|
||||
@ -433,7 +438,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
|
||||
: _name(name),
|
||||
_last_id(1),
|
||||
_mem_tracker(MemTracker::create_tracker(-1, name, nullptr, MemTrackerLevel::OVERVIEW)) {
|
||||
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(_mem_tracker);
|
||||
const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
|
||||
for (int s = 0; s < kNumShards; s++) {
|
||||
_shards[s] = new LRUCache(type);
|
||||
@ -452,7 +456,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
|
||||
}
|
||||
|
||||
ShardedLRUCache::~ShardedLRUCache() {
|
||||
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
|
||||
for (int s = 0; s < kNumShards; s++) {
|
||||
delete _shards[s];
|
||||
}
|
||||
@ -463,12 +466,9 @@ ShardedLRUCache::~ShardedLRUCache() {
|
||||
Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge,
|
||||
void (*deleter)(const CacheKey& key, void* value),
|
||||
CachePriority priority) {
|
||||
// The memory of the parameter value should be recorded in the tls mem tracker,
|
||||
// transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
|
||||
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), charge);
|
||||
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
|
||||
const uint32_t hash = _hash_slice(key);
|
||||
return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority);
|
||||
return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority,
|
||||
_mem_tracker.get());
|
||||
}
|
||||
|
||||
Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
|
||||
@ -477,13 +477,11 @@ Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
|
||||
}
|
||||
|
||||
void ShardedLRUCache::release(Handle* handle) {
|
||||
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
|
||||
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
|
||||
_shards[_shard(h->hash)]->release(handle);
|
||||
}
|
||||
|
||||
void ShardedLRUCache::erase(const CacheKey& key) {
|
||||
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
|
||||
const uint32_t hash = _hash_slice(key);
|
||||
_shards[_shard(hash)]->erase(key, hash);
|
||||
}
|
||||
@ -502,7 +500,6 @@ uint64_t ShardedLRUCache::new_id() {
|
||||
}
|
||||
|
||||
int64_t ShardedLRUCache::prune() {
|
||||
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
|
||||
int64_t num_prune = 0;
|
||||
for (int s = 0; s < kNumShards; s++) {
|
||||
num_prune += _shards[s]->prune();
|
||||
@ -511,7 +508,6 @@ int64_t ShardedLRUCache::prune() {
|
||||
}
|
||||
|
||||
int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
|
||||
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
|
||||
int64_t num_prune = 0;
|
||||
for (int s = 0; s < kNumShards; s++) {
|
||||
num_prune += _shards[s]->prune_if(pred);
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
|
||||
#include "olap/olap_common.h"
|
||||
#include "runtime/mem_tracker.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/metrics.h"
|
||||
#include "util/slice.h"
|
||||
|
||||
@ -236,6 +237,7 @@ typedef struct LRUHandle {
|
||||
uint32_t refs;
|
||||
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
|
||||
CachePriority priority = CachePriority::NORMAL;
|
||||
MemTracker* mem_tracker;
|
||||
char key_data[1]; // Beginning of key
|
||||
|
||||
CacheKey key() const {
|
||||
@ -250,6 +252,9 @@ typedef struct LRUHandle {
|
||||
|
||||
void free() {
|
||||
(*deleter)(key(), value);
|
||||
if (mem_tracker)
|
||||
mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(),
|
||||
total_size);
|
||||
::free(this);
|
||||
}
|
||||
|
||||
@ -308,7 +313,8 @@ public:
|
||||
// Like Cache methods, but with an extra "hash" parameter.
|
||||
Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
|
||||
void (*deleter)(const CacheKey& key, void* value),
|
||||
CachePriority priority = CachePriority::NORMAL);
|
||||
CachePriority priority = CachePriority::NORMAL,
|
||||
MemTracker* tracker = nullptr);
|
||||
Cache::Handle* lookup(const CacheKey& key, uint32_t hash);
|
||||
void release(Cache::Handle* handle);
|
||||
void erase(const CacheKey& key, uint32_t hash);
|
||||
|
||||
@ -564,6 +564,8 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
|
||||
Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits);
|
||||
if (st.ok() && permits > 0 && _permit_limiter.request(permits)) {
|
||||
auto st = _compaction_thread_pool->submit_func([=]() {
|
||||
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION,
|
||||
tablet->get_compaction_mem_tracker(compaction_type));
|
||||
CgroupsMgr::apply_system_cgroup();
|
||||
tablet->execute_compaction(compaction_type);
|
||||
_permit_limiter.release(permits);
|
||||
|
||||
@ -1504,6 +1504,7 @@ Status Tablet::create_rowset_writer(const int64_t& txn_id, const PUniqueId& load
|
||||
void Tablet::_init_context_common_fields(RowsetWriterContext& context) {
|
||||
context.rowset_id = StorageEngine::instance()->next_rowset_id();
|
||||
context.tablet_uid = tablet_uid();
|
||||
|
||||
context.tablet_id = tablet_id();
|
||||
context.partition_id = partition_id();
|
||||
context.tablet_schema_hash = schema_hash();
|
||||
@ -1522,4 +1523,12 @@ Status Tablet::create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* r
|
||||
return RowsetFactory::create_rowset(&tablet_schema(), tablet_path_desc(), rowset_meta, rowset);
|
||||
}
|
||||
|
||||
std::shared_ptr<MemTracker>& Tablet::get_compaction_mem_tracker(CompactionType compaction_type) {
|
||||
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
|
||||
return _cumulative_compaction->get_mem_tracker();
|
||||
} else {
|
||||
return _base_compaction->get_mem_tracker();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -253,6 +253,8 @@ public:
|
||||
return _cumulative_compaction_policy;
|
||||
}
|
||||
|
||||
std::shared_ptr<MemTracker>& get_compaction_mem_tracker(CompactionType compaction_type);
|
||||
|
||||
inline bool all_beta() const {
|
||||
std::shared_lock rdlock(_meta_lock);
|
||||
return _tablet_meta->all_beta();
|
||||
|
||||
@ -137,7 +137,7 @@ bool LoadChannel::is_finished() {
|
||||
}
|
||||
|
||||
Status LoadChannel::cancel() {
|
||||
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
|
||||
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
for (auto& it : _tablets_channels) {
|
||||
it.second->cancel();
|
||||
|
||||
@ -178,7 +178,7 @@ void MemTracker::init_virtual() {
|
||||
MemTracker::~MemTracker() {
|
||||
consume(_untracked_mem.exchange(0)); // before memory_leak_check
|
||||
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
|
||||
if (_label == "Process") GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER();
|
||||
if (_label == "Process") STOP_THREAD_LOCAL_MEM_TRACKER(false);
|
||||
if (!_virtual && config::memory_leak_detection) MemTracker::memory_leak_check(this);
|
||||
if (!_virtual && parent()) {
|
||||
// Do not call release on the parent tracker to avoid repeated releases.
|
||||
|
||||
@ -122,6 +122,7 @@ public:
|
||||
|
||||
// Increases consumption of this tracker and its ancestors by 'bytes'.
|
||||
void consume(int64_t bytes) {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
if (bytes <= 0) {
|
||||
release(-bytes);
|
||||
return;
|
||||
@ -129,6 +130,7 @@ public:
|
||||
for (auto& tracker : _all_trackers) {
|
||||
tracker->_consumption->add(bytes);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// Increases consumption of this tracker and its ancestors by 'bytes' only if
|
||||
@ -136,6 +138,7 @@ public:
|
||||
// no MemTrackers are updated. Returns true if the consumption was successfully updated.
|
||||
WARN_UNUSED_RESULT
|
||||
Status try_consume(int64_t bytes) {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
if (bytes <= 0) {
|
||||
release(-bytes);
|
||||
return Status::OK();
|
||||
@ -166,11 +169,13 @@ public:
|
||||
}
|
||||
// Everyone succeeded, return.
|
||||
DCHECK_EQ(i, -1);
|
||||
#endif
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Decreases consumption of this tracker and its ancestors by 'bytes'.
|
||||
void release(int64_t bytes) {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
if (bytes < 0) {
|
||||
consume(-bytes);
|
||||
return;
|
||||
@ -181,6 +186,7 @@ public:
|
||||
for (auto& tracker : _all_trackers) {
|
||||
tracker->_consumption->add(-bytes);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static void batch_consume(int64_t bytes,
|
||||
@ -247,22 +253,26 @@ public:
|
||||
// ancestor. This happens when we want to update tracking on a particular mem tracker but the consumption
|
||||
// against the limit recorded in one of its ancestors already happened.
|
||||
void consume_local(int64_t bytes, MemTracker* end_tracker) {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
DCHECK(end_tracker);
|
||||
if (bytes == 0) return;
|
||||
for (auto& tracker : _all_trackers) {
|
||||
if (tracker == end_tracker) return;
|
||||
tracker->_consumption->add(bytes);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// up to (but not including) end_tracker.
|
||||
void release_local(int64_t bytes, MemTracker* end_tracker) {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
DCHECK(end_tracker);
|
||||
if (bytes == 0) return;
|
||||
for (auto& tracker : _all_trackers) {
|
||||
if (tracker == end_tracker) return;
|
||||
tracker->_consumption->add(-bytes);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// Transfer 'bytes' of consumption from this tracker to 'dst'.
|
||||
@ -273,6 +283,7 @@ public:
|
||||
|
||||
WARN_UNUSED_RESULT
|
||||
Status try_transfer_to(MemTracker* dst, int64_t bytes) {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
if (id() == dst->id()) return Status::OK();
|
||||
// Must release first, then consume
|
||||
release_cache(bytes);
|
||||
@ -281,14 +292,17 @@ public:
|
||||
consume_cache(bytes);
|
||||
return st;
|
||||
}
|
||||
#endif
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Forced transfer, 'dst' may limit exceed, and more ancestor trackers will be updated.
|
||||
void transfer_to(MemTracker* dst, int64_t bytes) {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
if (id() == dst->id()) return;
|
||||
release_cache(bytes);
|
||||
dst->consume_cache(bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
// Returns true if a valid limit of this tracker or one of its ancestors is exceeded.
|
||||
|
||||
@ -88,10 +88,9 @@ Status ResultSink::open(RuntimeState* state) {
|
||||
}
|
||||
|
||||
Status ResultSink::send(RuntimeState* state, RowBatch* batch) {
|
||||
// The memory consumption in the process of sending the results is not recorded in the query memory.
|
||||
// 1. Avoid the query being cancelled when the memory limit is reached after the query result comes out.
|
||||
// 2. If record this memory, also need to record on the receiving end, need to consider the life cycle of MemTracker.
|
||||
SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER();
|
||||
// The memory consumption in the process of sending the results is not check query memory limit.
|
||||
// Avoid the query being cancelled when the memory limit is reached after the query result comes out.
|
||||
STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
|
||||
return _writer->append_row_batch(batch);
|
||||
}
|
||||
|
||||
|
||||
@ -36,7 +36,9 @@ AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, const st
|
||||
const TUniqueId& fragment_instance_id,
|
||||
const std::shared_ptr<doris::MemTracker>& mem_tracker) {
|
||||
DCHECK(task_id != "");
|
||||
#ifdef USE_MEM_TRACKER
|
||||
tls_ctx()->attach(type, task_id, fragment_instance_id, mem_tracker);
|
||||
#endif
|
||||
}
|
||||
|
||||
AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type,
|
||||
@ -44,7 +46,9 @@ AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type,
|
||||
#ifndef BE_TEST
|
||||
DCHECK(mem_tracker);
|
||||
#endif
|
||||
#ifdef USE_MEM_TRACKER
|
||||
tls_ctx()->attach(type, "", TUniqueId(), mem_tracker);
|
||||
#endif
|
||||
}
|
||||
|
||||
AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
|
||||
@ -52,7 +56,9 @@ AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
|
||||
#ifndef BE_TEST
|
||||
DCHECK(mem_tracker);
|
||||
#endif
|
||||
#ifdef USE_MEM_TRACKER
|
||||
tls_ctx()->attach(query_to_task_type(query_type), "", TUniqueId(), mem_tracker);
|
||||
#endif
|
||||
}
|
||||
|
||||
AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
|
||||
@ -64,7 +70,9 @@ AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
|
||||
DCHECK(fragment_instance_id != TUniqueId());
|
||||
DCHECK(mem_tracker);
|
||||
#endif
|
||||
#ifdef USE_MEM_TRACKER
|
||||
tls_ctx()->attach(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker);
|
||||
#endif
|
||||
}
|
||||
|
||||
AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state,
|
||||
@ -74,19 +82,24 @@ AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state,
|
||||
DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
|
||||
DCHECK(mem_tracker);
|
||||
#endif
|
||||
#ifdef USE_MEM_TRACKER
|
||||
tls_ctx()->attach(query_to_task_type(runtime_state->query_type()),
|
||||
print_id(runtime_state->query_id()), runtime_state->fragment_instance_id(),
|
||||
mem_tracker);
|
||||
#endif
|
||||
}
|
||||
|
||||
AttachTaskThread::~AttachTaskThread() {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
tls_ctx()->detach();
|
||||
DorisMetrics::instance()->attach_task_thread_count->increment(1);
|
||||
#endif
|
||||
}
|
||||
|
||||
template <bool Existed>
|
||||
SwitchThreadMemTracker<Existed>::SwitchThreadMemTracker(
|
||||
const std::shared_ptr<doris::MemTracker>& mem_tracker, bool in_task) {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
if (config::memory_verbose_track) {
|
||||
#ifndef BE_TEST
|
||||
DCHECK(mem_tracker);
|
||||
@ -100,41 +113,49 @@ SwitchThreadMemTracker<Existed>::SwitchThreadMemTracker(
|
||||
_old_tracker_id =
|
||||
tls_ctx()->_thread_mem_tracker_mgr->update_tracker<false>(mem_tracker);
|
||||
}
|
||||
#endif
|
||||
#endif // BE_TEST
|
||||
#ifndef NDEBUG
|
||||
tls_ctx()->_thread_mem_tracker_mgr->switch_count += 1;
|
||||
#endif
|
||||
#endif // NDEBUG
|
||||
}
|
||||
#endif // USE_MEM_TRACKER
|
||||
}
|
||||
|
||||
template <bool Existed>
|
||||
SwitchThreadMemTracker<Existed>::~SwitchThreadMemTracker() {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
if (config::memory_verbose_track) {
|
||||
#ifndef NDEBUG
|
||||
tls_ctx()->_thread_mem_tracker_mgr->switch_count -= 1;
|
||||
DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
|
||||
#endif
|
||||
#endif // NDEBUG
|
||||
#ifndef BE_TEST
|
||||
tls_ctx()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
|
||||
#endif
|
||||
#endif // BE_TEST
|
||||
}
|
||||
#endif // USE_MEM_TRACKER
|
||||
}
|
||||
|
||||
SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack(
|
||||
const std::string& action_type, bool cancel_work, ERRCALLBACK err_call_back_func) {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
DCHECK(action_type != std::string());
|
||||
_old_tracker_cb = tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(
|
||||
action_type, cancel_work, err_call_back_func);
|
||||
#endif
|
||||
}
|
||||
|
||||
SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb);
|
||||
#ifndef NDEBUG
|
||||
DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1);
|
||||
#endif
|
||||
#endif // USE_MEM_TRACKER
|
||||
}
|
||||
|
||||
SwitchBthread::SwitchBthread() {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
tls = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
|
||||
// First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
|
||||
if (tls == nullptr) {
|
||||
@ -148,16 +169,19 @@ SwitchBthread::SwitchBthread() {
|
||||
}
|
||||
tls->_thread_mem_tracker_mgr->init();
|
||||
tls->set_type(ThreadContext::TaskType::BRPC);
|
||||
#endif
|
||||
}
|
||||
|
||||
SwitchBthread::~SwitchBthread() {
|
||||
#ifdef USE_MEM_TRACKER
|
||||
DCHECK(tls != nullptr);
|
||||
tls->_thread_mem_tracker_mgr->clear_untracked_mems();
|
||||
tls->_thread_mem_tracker_mgr->init();
|
||||
tls->set_type(ThreadContext::TaskType::UNKNOWN);
|
||||
#ifndef NDEBUG
|
||||
DorisMetrics::instance()->switch_bthread_count->increment(1);
|
||||
#endif
|
||||
#endif // NDEBUG
|
||||
#endif // USE_MEM_TRACKER
|
||||
}
|
||||
|
||||
template class SwitchThreadMemTracker<true>;
|
||||
|
||||
@ -35,10 +35,8 @@
|
||||
// Be careful to stop the thread mem tracker, because the actual order of malloc and free memory
|
||||
// may be different from the order of execution of instructions, which will cause the position of
|
||||
// the memory track to be unexpected.
|
||||
#define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \
|
||||
auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(true)
|
||||
#define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \
|
||||
auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(false)
|
||||
#define STOP_THREAD_LOCAL_MEM_TRACKER(scope) \
|
||||
auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(scope)
|
||||
// Switch thread mem tracker during task execution.
|
||||
// After the non-query thread switches the mem tracker, if the thread will not switch the mem
|
||||
// tracker again in the short term, can consider manually clear_untracked_mems.
|
||||
@ -80,9 +78,11 @@
|
||||
#define ADD_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
|
||||
doris::tls_ctx()->_thread_mem_tracker_mgr->add_tracker(mem_tracker)
|
||||
#define CONSUME_THREAD_LOCAL_MEM_TRACKER(size) \
|
||||
doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(size)
|
||||
doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_try_consume(size)
|
||||
#define RELEASE_THREAD_LOCAL_MEM_TRACKER(size) \
|
||||
doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(-size)
|
||||
doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_try_consume(-size)
|
||||
#define STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER() \
|
||||
auto VARNAME_LINENUM(switch_bthread) = StopCheckLimitThreadMemTracker()
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -281,7 +281,9 @@ public:
|
||||
~SwitchThreadMemTracker();
|
||||
|
||||
protected:
|
||||
#ifdef USE_MEM_TRACKER
|
||||
int64_t _old_tracker_id = 0;
|
||||
#endif
|
||||
};
|
||||
|
||||
class SwitchThreadMemTrackerEndClear : public SwitchThreadMemTracker<false> {
|
||||
@ -303,7 +305,9 @@ public:
|
||||
~SwitchThreadMemTrackerErrCallBack();
|
||||
|
||||
private:
|
||||
#ifdef USE_MEM_TRACKER
|
||||
ConsumeErrCallBackInfo _old_tracker_cb;
|
||||
#endif
|
||||
};
|
||||
|
||||
class SwitchBthread {
|
||||
@ -313,7 +317,20 @@ public:
|
||||
~SwitchBthread();
|
||||
|
||||
private:
|
||||
#ifdef USE_MEM_TRACKER
|
||||
ThreadContext* tls;
|
||||
#endif
|
||||
};
|
||||
|
||||
class StopCheckLimitThreadMemTracker {
|
||||
public:
|
||||
explicit StopCheckLimitThreadMemTracker() {
|
||||
tls_ctx()->_thread_mem_tracker_mgr->update_check_limit(false);
|
||||
}
|
||||
|
||||
~StopCheckLimitThreadMemTracker() {
|
||||
tls_ctx()->_thread_mem_tracker_mgr->update_check_limit(true);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -63,6 +63,10 @@ public:
|
||||
|
||||
~ThreadMemTrackerMgr() {
|
||||
clear_untracked_mems();
|
||||
_consume_err_cb.init();
|
||||
_mem_trackers.clear();
|
||||
_untracked_mems.clear();
|
||||
_mem_tracker_labels.clear();
|
||||
start_thread_mem_tracker = false;
|
||||
}
|
||||
|
||||
@ -107,12 +111,14 @@ public:
|
||||
// must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
|
||||
void cache_consume(int64_t size);
|
||||
|
||||
void noncache_consume(int64_t size);
|
||||
void noncache_try_consume(int64_t size);
|
||||
|
||||
bool is_attach_task() { return _task_id != ""; }
|
||||
|
||||
std::shared_ptr<MemTracker> mem_tracker();
|
||||
|
||||
void update_check_limit(bool check_limit) { _check_limit = check_limit; }
|
||||
|
||||
int64_t switch_count = 0;
|
||||
|
||||
std::string print_debug_string() {
|
||||
@ -163,6 +169,8 @@ private:
|
||||
// we can confirm the tracker label that was added through _mem_tracker_labels.
|
||||
// Because for performance, all map keys are tracker id.
|
||||
phmap::flat_hash_map<int64_t, std::string> _mem_tracker_labels;
|
||||
// If true, call memtracker try_consume, otherwise call consume.
|
||||
bool _check_limit;
|
||||
|
||||
int64_t _tracker_id;
|
||||
// Avoid memory allocation in functions.
|
||||
@ -184,6 +192,7 @@ inline void ThreadMemTrackerMgr::init() {
|
||||
_untracked_mems[0] = 0;
|
||||
_mem_tracker_labels.clear();
|
||||
_mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
|
||||
_check_limit = true;
|
||||
}
|
||||
|
||||
inline void ThreadMemTrackerMgr::clear_untracked_mems() {
|
||||
@ -244,21 +253,26 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
|
||||
if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
|
||||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
|
||||
DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string();
|
||||
// Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, infinite recursion.
|
||||
// Needs to ensure that all memory allocated in mem_tracker.consume/try_consume is freed in time to avoid tracking misses.
|
||||
start_thread_mem_tracker = false;
|
||||
// When switching to the current tracker last time, the remaining untracked memory.
|
||||
if (_untracked_mems[_tracker_id] != 0) {
|
||||
_untracked_mem += _untracked_mems[_tracker_id];
|
||||
_untracked_mems[_tracker_id] = 0;
|
||||
}
|
||||
noncache_consume(_untracked_mem);
|
||||
// Allocating memory in the Hook command causes the TCMalloc Hook to be entered again,
|
||||
// will enter infinite recursion. So the temporary memory allocated in mem_tracker.try_consume
|
||||
// and mem_limit_exceeded will directly call consume.
|
||||
if (_check_limit) {
|
||||
_check_limit = false;
|
||||
noncache_try_consume(_untracked_mem);
|
||||
_check_limit = true;
|
||||
} else {
|
||||
mem_tracker()->consume(_untracked_mem);
|
||||
}
|
||||
_untracked_mem = 0;
|
||||
start_thread_mem_tracker = true;
|
||||
}
|
||||
}
|
||||
|
||||
inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) {
|
||||
inline void ThreadMemTrackerMgr::noncache_try_consume(int64_t size) {
|
||||
Status st = mem_tracker()->try_consume(size);
|
||||
if (!st) {
|
||||
// The memory has been allocated, so when TryConsume fails, need to continue to complete
|
||||
|
||||
@ -332,9 +332,11 @@ int main(int argc, char** argv) {
|
||||
fprintf(stderr, "Failed to change TCMalloc total thread cache size.\n");
|
||||
return -1;
|
||||
}
|
||||
#ifdef USE_MEM_TRACKER
|
||||
if (doris::config::track_new_delete) {
|
||||
init_hook();
|
||||
}
|
||||
#endif // USE_MEM_TRACKER
|
||||
#endif
|
||||
|
||||
std::vector<doris::StorePath> paths;
|
||||
|
||||
@ -87,6 +87,9 @@ Status VResultSink::send(RuntimeState* state, RowBatch* batch) {
|
||||
}
|
||||
|
||||
Status VResultSink::send(RuntimeState* state, Block* block) {
|
||||
// The memory consumption in the process of sending the results is not check query memory limit.
|
||||
// Avoid the query being cancelled when the memory limit is reached after the query result comes out.
|
||||
STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
|
||||
return _writer->append_block(*block);
|
||||
}
|
||||
|
||||
|
||||
@ -226,45 +226,44 @@ static void insert_LRUCache(LRUCache& cache, const CacheKey& key, int value,
|
||||
|
||||
TEST_F(CacheTest, Usage) {
|
||||
LRUCache cache(LRUCacheType::SIZE);
|
||||
cache.set_capacity(1050);
|
||||
cache.set_capacity(1040);
|
||||
|
||||
// The lru usage is handle_size + charge.
|
||||
// handle_size = sizeof(handle) - 1 + key size = 88 - 1 + 3 = 90
|
||||
// handle_size = sizeof(handle) - 1 + key size = 96 - 1 + 3 = 98
|
||||
CacheKey key1("100");
|
||||
insert_LRUCache(cache, key1, 100, CachePriority::NORMAL);
|
||||
ASSERT_EQ(190, cache.get_usage()); // 100 + 90
|
||||
ASSERT_EQ(198, cache.get_usage()); // 100 + 98
|
||||
|
||||
CacheKey key2("200");
|
||||
insert_LRUCache(cache, key2, 200, CachePriority::DURABLE);
|
||||
ASSERT_EQ(480, cache.get_usage()); // 190 + 290(d)
|
||||
ASSERT_EQ(496, cache.get_usage()); // 198 + 298(d), d = DURABLE
|
||||
|
||||
CacheKey key3("300");
|
||||
insert_LRUCache(cache, key3, 300, CachePriority::NORMAL);
|
||||
ASSERT_EQ(870, cache.get_usage()); // 190 + 290(d) + 390
|
||||
ASSERT_EQ(894, cache.get_usage()); // 198 + 298(d) + 398
|
||||
|
||||
CacheKey key4("400");
|
||||
insert_LRUCache(cache, key4, 400, CachePriority::NORMAL);
|
||||
ASSERT_EQ(780, cache.get_usage()); // 290(d) + 490
|
||||
ASSERT_EQ(796, cache.get_usage()); // 298(d) + 498, evict 198 398
|
||||
|
||||
CacheKey key5("500");
|
||||
insert_LRUCache(cache, key5, 500, CachePriority::NORMAL);
|
||||
ASSERT_EQ(880, cache.get_usage()); // 290(d) + 590
|
||||
ASSERT_EQ(896, cache.get_usage()); // 298(d) + 598, evict 498
|
||||
|
||||
CacheKey key6("600");
|
||||
insert_LRUCache(cache, key6, 600, CachePriority::NORMAL);
|
||||
ASSERT_EQ(980, cache.get_usage()); // 290(d) + 690
|
||||
ASSERT_EQ(996, cache.get_usage()); // 298(d) + 698, evict 498
|
||||
|
||||
CacheKey key7("950");
|
||||
insert_LRUCache(cache, key7, 950, CachePriority::DURABLE);
|
||||
ASSERT_EQ(1040, cache.get_usage()); // 1040(d)
|
||||
ASSERT_EQ(0, cache.get_usage()); // evict 298 698, because 950 + 98 > 1040, so insert failed
|
||||
}
|
||||
|
||||
TEST_F(CacheTest, Prune) {
|
||||
LRUCache cache(LRUCacheType::NUMBER);
|
||||
cache.set_capacity(5);
|
||||
|
||||
// The lru usage is handle_size + charge = 96 - 1 = 95
|
||||
// 95 + 3 means handle_size + key size
|
||||
// The lru usage is 1, add one entry
|
||||
CacheKey key1("100");
|
||||
insert_LRUCache(cache, key1, 100, CachePriority::NORMAL);
|
||||
EXPECT_EQ(1, cache.get_usage());
|
||||
|
||||
5
build.sh
5
build.sh
@ -214,6 +214,9 @@ fi
|
||||
if [[ -z ${STRIP_DEBUG_INFO} ]]; then
|
||||
STRIP_DEBUG_INFO=OFF
|
||||
fi
|
||||
if [[ -z ${USE_MEM_TRACKER} ]]; then
|
||||
USE_MEM_TRACKER=ON
|
||||
fi
|
||||
|
||||
if [[ -z ${USE_DWARF} ]]; then
|
||||
USE_DWARF=OFF
|
||||
@ -238,6 +241,7 @@ echo "Get params:
|
||||
USE_LLD -- $USE_LLD
|
||||
USE_DWARF -- $USE_DWARF
|
||||
STRIP_DEBUG_INFO -- $STRIP_DEBUG_INFO
|
||||
USE_MEM_TRACKER -- $USE_MEM_TRACKER
|
||||
"
|
||||
|
||||
# Clean and build generated code
|
||||
@ -300,6 +304,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
|
||||
-DBUILD_JAVA_UDF=${BUILD_JAVA_UDF} \
|
||||
-DSTRIP_DEBUG_INFO=${STRIP_DEBUG_INFO} \
|
||||
-DUSE_DWARF=${USE_DWARF} \
|
||||
-DUSE_MEM_TRACKER=${USE_MEM_TRACKER} \
|
||||
-DUSE_AVX2=${USE_AVX2} \
|
||||
-DGLIBC_COMPATIBILITY=${GLIBC_COMPATIBILITY} ../
|
||||
${BUILD_SYSTEM} -j ${PARALLEL}
|
||||
|
||||
@ -138,6 +138,7 @@ ${CMAKE_CMD} -G "${GENERATOR}" \
|
||||
-DWITH_MYSQL=OFF \
|
||||
-DWITH_KERBEROS=OFF \
|
||||
-DUSE_DWARF=${USE_DWARF} \
|
||||
-DUSE_MEM_TRACKER=ON \
|
||||
${CMAKE_USE_CCACHE} ../
|
||||
${BUILD_SYSTEM} -j ${PARALLEL}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user