[enhancement](memory) Jemalloc performance optimization and compatibility with MemTracker #12496

This commit is contained in:
Xinyi Zou
2022-09-28 12:04:29 +08:00
committed by GitHub
parent e627d285e0
commit 16bb5cb430
8 changed files with 208 additions and 31 deletions

View File

@ -223,7 +223,10 @@ add_library(leveldb STATIC IMPORTED)
set_target_properties(leveldb PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libleveldb.a)
add_library(jemalloc STATIC IMPORTED)
set_target_properties(jemalloc PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libjemalloc.a)
set_target_properties(jemalloc PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libjemalloc_doris.a)
add_library(jemalloc_arrow STATIC IMPORTED)
set_target_properties(jemalloc_arrow PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libjemalloc.a)
add_library(brotlicommon STATIC IMPORTED)
set_target_properties(brotlicommon PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libbrotlicommon.a)
@ -681,6 +684,7 @@ set(COMMON_THIRDPARTY
roaring
fmt
jemalloc
jemalloc_arrow
brotlicommon
brotlidec
brotlienc

View File

@ -115,6 +115,12 @@ if (WITH_MYSQL)
)
endif()
if (USE_JEMALLOC)
set(RUNTIME_FILES ${RUNTIME_FILES}
memory/jemalloc_hook.cpp
)
endif()
add_library(Runtime STATIC
${RUNTIME_FILES}
)

View File

@ -208,7 +208,7 @@ Status ExecEnv::_init_mem_tracker() {
std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process");
_orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan", _process_mem_tracker);
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
thread_context()->_thread_mem_tracker_mgr->init();
thread_context()->_thread_mem_tracker_mgr->init_impl();
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
!defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)

View File

@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "jemalloc/jemalloc.h"
#include "runtime/thread_context.h"
extern "C" {
void* doris_malloc(size_t size) __THROW {
MEM_MALLOC_HOOK(je_nallocx(size, 0));
void* ptr = je_malloc(size);
if (UNLIKELY(ptr == nullptr)) {
MEM_FREE_HOOK(je_nallocx(size, 0));
}
return ptr;
}
void doris_free(void* p) __THROW {
MEM_FREE_HOOK(je_malloc_usable_size(p));
je_free(p);
}
void* doris_realloc(void* p, size_t size) __THROW {
if (UNLIKELY(size == 0)) {
return nullptr;
}
int64_t old_size = je_malloc_usable_size(p);
MEM_MALLOC_HOOK(je_nallocx(size, 0) - old_size);
void* ptr = je_realloc(p, size);
if (UNLIKELY(ptr == nullptr)) {
MEM_FREE_HOOK(je_nallocx(size, 0) - old_size);
}
return ptr;
}
void* doris_calloc(size_t n, size_t size) __THROW {
if (UNLIKELY(size == 0)) {
return nullptr;
}
MEM_MALLOC_HOOK(n * size);
void* ptr = je_calloc(n, size);
if (UNLIKELY(ptr == nullptr)) {
MEM_FREE_HOOK(n * size);
} else {
MEM_FREE_HOOK(je_malloc_usable_size(ptr) - n * size);
}
return ptr;
}
void doris_cfree(void* ptr) __THROW {
MEM_FREE_HOOK(je_malloc_usable_size(ptr));
je_free(ptr);
}
void* doris_memalign(size_t align, size_t size) __THROW {
MEM_MALLOC_HOOK(size);
void* ptr = je_aligned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
MEM_FREE_HOOK(size);
} else {
MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
}
return ptr;
}
void* doris_aligned_alloc(size_t align, size_t size) __THROW {
MEM_MALLOC_HOOK(size);
void* ptr = je_aligned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
MEM_FREE_HOOK(size);
} else {
MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
}
return ptr;
}
void* doris_valloc(size_t size) __THROW {
MEM_MALLOC_HOOK(size);
void* ptr = je_valloc(size);
if (UNLIKELY(ptr == nullptr)) {
MEM_FREE_HOOK(size);
} else {
MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
}
return ptr;
}
void* doris_pvalloc(size_t size) __THROW {
MEM_MALLOC_HOOK(size);
void* ptr = je_valloc(size);
if (UNLIKELY(ptr == nullptr)) {
MEM_FREE_HOOK(size);
} else {
MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
}
return ptr;
}
int doris_posix_memalign(void** r, size_t align, size_t size) __THROW {
MEM_MALLOC_HOOK(size);
int ret = je_posix_memalign(r, align, size);
if (UNLIKELY(ret != 0)) {
MEM_FREE_HOOK(size);
} else {
MEM_MALLOC_HOOK(je_malloc_usable_size(*r) - size);
}
return ret;
}
size_t doris_malloc_usable_size(void* ptr) __THROW {
size_t ret = je_malloc_usable_size(ptr);
return ret;
}
#define ALIAS(doris_fn) __attribute__((alias(#doris_fn), used))
void* malloc(size_t size) __THROW ALIAS(doris_malloc);
void free(void* p) __THROW ALIAS(doris_free);
void* realloc(void* p, size_t size) __THROW ALIAS(doris_realloc);
void* calloc(size_t n, size_t size) __THROW ALIAS(doris_calloc);
void cfree(void* ptr) __THROW ALIAS(doris_cfree);
void* memalign(size_t align, size_t size) __THROW ALIAS(doris_memalign);
void* aligned_alloc(size_t align, size_t size) __THROW ALIAS(doris_aligned_alloc);
void* valloc(size_t size) __THROW ALIAS(doris_valloc);
void* pvalloc(size_t size) __THROW ALIAS(doris_pvalloc);
int posix_memalign(void** r, size_t a, size_t s) __THROW ALIAS(doris_posix_memalign);
size_t malloc_usable_size(void* ptr) __THROW ALIAS(doris_malloc_usable_size);
}

View File

@ -36,28 +36,11 @@
// destructor to control the behavior of consume can lead to unexpected behavior,
// like this: if (LIKELY(doris::start_thread_mem_tracker)) {
void new_hook(const void* ptr, size_t size) {
if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) {
// Currently in bthread, consume thread context mem tracker in bthread tls.
doris::update_bthread_context();
doris::bthread_context->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
} else if (doris::thread_context_ptr._init) {
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
} else {
doris::ThreadMemTrackerMgr::consume_no_attach(tc_nallocx(size, 0));
}
MEM_MALLOC_HOOK(tc_nallocx(size, 0));
}
void delete_hook(const void* ptr) {
if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) {
doris::update_bthread_context();
doris::bthread_context->_thread_mem_tracker_mgr->consume(
-tc_malloc_size(const_cast<void*>(ptr)));
} else if (doris::thread_context_ptr._init) {
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(
-tc_malloc_size(const_cast<void*>(ptr)));
} else {
doris::ThreadMemTrackerMgr::consume_no_attach(-tc_malloc_size(const_cast<void*>(ptr)));
}
MEM_FREE_HOOK(tc_malloc_size(const_cast<void*>(ptr)));
}
void init_hook() {

View File

@ -51,12 +51,15 @@ public:
// only for tcmalloc hook
static void consume_no_attach(int64_t size) {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
if (ExecEnv::GetInstance()->initialized()) {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
}
}
// After thread initialization, calling `init` again must call `clear_untracked_mems` first
// to avoid memory tracking loss.
void init();
void init_impl();
// After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker
void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id,
@ -85,9 +88,13 @@ public:
bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); }
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
if (_limiter_tracker_raw == nullptr) init_impl();
return _limiter_tracker_stack.back();
}
MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; }
MemTrackerLimiter* limiter_mem_tracker_raw() {
if (_limiter_tracker_raw == nullptr) init_impl();
return _limiter_tracker_raw;
}
void set_check_limit(bool check_limit) { _check_limit = check_limit; }
void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@ -120,7 +127,7 @@ private:
// _limiter_tracker_stack[0] = orphan_mem_tracker
std::vector<std::shared_ptr<MemTrackerLimiter>> _limiter_tracker_stack;
MemTrackerLimiter* _limiter_tracker_raw;
MemTrackerLimiter* _limiter_tracker_raw = nullptr;
std::vector<MemTracker*> _consumer_tracker_stack;
// If true, call memtracker try_consume, otherwise call consume.
@ -138,12 +145,18 @@ inline void ThreadMemTrackerMgr::init() {
// _limiter_tracker_stack[0] = orphan_mem_tracker
DCHECK(_limiter_tracker_stack.size() <= 1)
<< "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
if (_limiter_tracker_stack.size() == 0) {
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_task_id_stack.push_back("");
_fragment_instance_id_stack.push_back(TUniqueId());
if (_limiter_tracker_raw == nullptr && ExecEnv::GetInstance()->initialized()) {
init_impl();
}
}
inline void ThreadMemTrackerMgr::init_impl() {
DCHECK(_limiter_tracker_stack.size() == 0);
DCHECK(_limiter_tracker_raw == nullptr);
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_task_id_stack.push_back("");
_fragment_instance_id_stack.push_back(TUniqueId());
_check_limit = true;
}
@ -166,9 +179,10 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) {
// When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes`
// and some threads `_untracked_mem <= -config::mem_tracker_consume_min_size_bytes` trigger consumption(),
// it will cause tracker->consumption to be temporarily less than 0.
// After the jemalloc hook is loaded, before ExecEnv init, _limiter_tracker=nullptr.
if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
!_stop_consume) {
!_stop_consume && ExecEnv::GetInstance()->initialized()) {
if (_check_limit) {
flush_untracked_mem<true>();
} else {
@ -182,8 +196,9 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
// the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop.
_stop_consume = true;
old_untracked_mem = _untracked_mem;
if (_limiter_tracker_raw == nullptr) init_impl();
DCHECK(_limiter_tracker_raw);
old_untracked_mem = _untracked_mem;
if (CheckLimit) {
#ifndef BE_TEST
// When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker.

View File

@ -268,4 +268,30 @@ public:
->_thread_mem_tracker_mgr->last_consumer_tracker(), \
msg), \
##__VA_ARGS__);
// Mem Hook to consume thread mem tracker
#define MEM_MALLOC_HOOK(size) \
do { \
if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { \
doris::update_bthread_context(); \
doris::bthread_context->_thread_mem_tracker_mgr->consume(size); \
} else if (LIKELY(doris::thread_context_ptr._init)) { \
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(size); \
} else { \
doris::ThreadMemTrackerMgr::consume_no_attach(size); \
} \
} while (0)
#define MEM_FREE_HOOK(size) \
do { \
if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { \
doris::update_bthread_context(); \
doris::bthread_context->_thread_mem_tracker_mgr->consume(-size); \
} else if (doris::thread_context_ptr._init) { \
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(-size); \
} else { \
doris::ThreadMemTrackerMgr::consume_no_attach(-size); \
} \
} while (0)
} // namespace doris

View File

@ -207,6 +207,8 @@ export UBSAN_OPTIONS=print_stacktrace=1
## set hdfs conf
export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml"
export MALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16"
if [[ "${RUN_DAEMON}" -eq 1 ]]; then
nohup ${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/doris_be" "$@" >>"${LOG_DIR}/be.out" 2>&1 </dev/null &
else