[fix] (mem tracker) Fix inaccurate mem tracker leads to load OOM (#10409)

* fix load tracker

* fix comment
This commit is contained in:
Kidd
2022-06-25 14:13:02 +08:00
committed by GitHub
parent 8abd00dcd5
commit eb25df5a2c
9 changed files with 53 additions and 53 deletions

View File

@ -25,19 +25,15 @@
namespace doris {
LoadChannel::LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit,
LoadChannel::LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTracker>& mem_tracker,
int64_t timeout_s, bool is_high_priority, const std::string& sender_ip,
bool is_vec)
: _load_id(load_id),
_mem_tracker(mem_tracker),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
_is_vec(is_vec) {
_mem_tracker = MemTracker::create_tracker(
channel_mem_limit, "LoadChannel#senderIp=" + sender_ip,
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->register_load_mem_tracker(
_load_id.to_string(), load_mem_limit),
MemTrackerLevel::TASK);
// _last_updated_time should be set before being inserted to
// _load_channels in load_channel_mgr, or it may be erased
// immediately by gc thread.
@ -52,7 +48,6 @@ LoadChannel::~LoadChannel() {
}
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t index_id = params.index_id();
std::shared_ptr<TabletsChannel> channel;
{
@ -138,7 +133,6 @@ bool LoadChannel::is_finished() {
}
Status LoadChannel::cancel() {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
for (auto& it : _tablets_channels) {
it.second->cancel();

View File

@ -39,9 +39,8 @@ class Cache;
// corresponding to a certain load job
class LoadChannel {
public:
LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit,
int64_t timeout_s, bool is_high_priority, const std::string& sender_ip,
bool is_vec);
LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTracker>& mem_tracker,
int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, bool is_ve);
~LoadChannel();
// open a new load channel if not exist
@ -129,7 +128,6 @@ private:
template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
TabletWriterAddResult* response) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t index_id = request.index_id();
// 1. get tablets channel
std::shared_ptr<TabletsChannel> channel;

View File

@ -84,10 +84,9 @@ LoadChannelMgr::~LoadChannelMgr() {
Status LoadChannelMgr::init(int64_t process_mem_limit) {
int64_t load_mgr_mem_limit = calc_process_max_load_memory(process_mem_limit);
_mem_tracker = MemTracker::create_tracker(load_mgr_mem_limit, "LoadChannelMgr",
MemTracker::get_process_tracker(),
MemTrackerLevel::OVERVIEW);
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_mem_tracker = MemTracker::create_virtual_tracker(load_mgr_mem_limit, "LoadChannelMgr",
MemTracker::get_process_tracker(),
MemTrackerLevel::OVERVIEW);
REGISTER_HOOK_METRIC(load_channel_mem_consumption,
[this]() { return _mem_tracker->consumption(); });
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@ -95,16 +94,7 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
return Status::OK();
}
LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t load_mem_limit,
int64_t channel_mem_limit, int64_t timeout_s,
bool is_high_priority,
const std::string& sender_ip, bool is_vec) {
return new LoadChannel(load_id, load_mem_limit, channel_mem_limit, timeout_s, is_high_priority,
sender_ip, is_vec);
}
Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(params.id());
std::shared_ptr<LoadChannel> channel;
{
@ -114,18 +104,31 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
channel = it->second;
} else {
// create a new load channel
int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1;
int64_t channel_mem_limit =
calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit());
int64_t timeout_in_req_s =
params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1;
int64_t channel_timeout_s = calc_channel_timeout_s(timeout_in_req_s);
bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority());
channel.reset(_create_load_channel(load_id, load_mem_limit, channel_mem_limit,
channel_timeout_s, is_high_priority,
params.sender_ip(), params.is_vectorized()));
int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1;
int64_t channel_mem_limit =
calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit());
auto channel_mem_tracker =
MemTracker::create_tracker(channel_mem_limit,
fmt::format("LoadChannel#senderIp={}#loadID={}",
params.sender_ip(), load_id.to_string()),
_mem_tracker);
// TODO
// auto channel_mem_tracker_job = std::make_shared<MemTracker>(
// -1,
// fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(),
// load_id.to_string()),
// ExecEnv::GetInstance()
// ->task_pool_mem_tracker_registry()
// ->register_load_mem_tracker(load_id.to_string(), load_mem_limit),
// MemTrackerLevel::TASK);
channel.reset(new LoadChannel(load_id, channel_mem_tracker, channel_timeout_s,
is_high_priority, params.sender_ip(),
params.is_vectorized()));
_load_channels.insert({load_id, channel});
}
}
@ -181,7 +184,6 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
}
Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(params.id());
std::shared_ptr<LoadChannel> cancelled_channel;
{

View File

@ -61,11 +61,6 @@ public:
std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
private:
static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t load_mem_limit,
int64_t channel_mem_limit, int64_t timeout_s,
bool is_high_priority, const std::string& sender_ip,
bool is_vec);
template <typename Request>
Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof,
const UniqueId& load_id, const Request& request);
@ -84,7 +79,8 @@ protected:
std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
Cache* _last_success_channel = nullptr;
// check the total load mem consumption of this Backend
// check the total load channel mem consumption of this Backend
// TODO no used, refactor soon
std::shared_ptr<MemTracker> _mem_tracker;
CountDownLatch _stop_background_threads_latch;

View File

@ -53,7 +53,6 @@ TabletsChannel::~TabletsChannel() {
}
Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
if (_state == kOpened) {
// Normal case, already open by other sender
@ -253,7 +252,6 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
}
Status TabletsChannel::cancel() {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
return _close_status;

View File

@ -21,6 +21,7 @@
#include <gperftools/nallocx.h>
#include <gperftools/tcmalloc.h>
#include "runtime/mem_tracker.h"
#include "runtime/thread_context.h"
// Notice: modify the command in New/Delete Hook should be careful enough!,
@ -38,12 +39,16 @@
void new_hook(const void* ptr, size_t size) {
if (doris::tls_ctx()) {
doris::tls_ctx()->consume_mem(tc_nallocx(size, 0));
} else if (doris::ExecEnv::GetInstance()->initialized()) {
doris::MemTracker::get_process_tracker()->consume(tc_nallocx(size, 0));
}
}
void delete_hook(const void* ptr) {
if (doris::tls_ctx()) {
doris::tls_ctx()->release_mem(tc_malloc_size(const_cast<void*>(ptr)));
} else if (doris::ExecEnv::GetInstance()->initialized()) {
doris::MemTracker::get_process_tracker()->release(tc_malloc_size(const_cast<void*>(ptr)));
}
}

View File

@ -157,12 +157,16 @@ public:
void consume_mem(int64_t size) {
if (start_thread_mem_tracker) {
_thread_mem_tracker_mgr->cache_consume(size);
} else {
MemTracker::get_process_tracker()->consume(size);
}
}
void release_mem(int64_t size) {
if (start_thread_mem_tracker) {
_thread_mem_tracker_mgr->cache_consume(-size);
} else {
MemTracker::get_process_tracker()->release(size);
}
}

View File

@ -177,6 +177,7 @@ private:
phmap::flat_hash_map<int64_t, std::string> _mem_tracker_labels;
// If true, call memtracker try_consume, otherwise call consume.
bool _check_limit;
bool _stop_consume = false;
int64_t _tracker_id;
// Avoid memory allocation in functions.
@ -256,25 +257,27 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
// When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes`
// and some threads `_untracked_mem <= -config::mem_tracker_consume_min_size_bytes` trigger consumption(),
// it will cause tracker->consumption to be temporarily less than 0.
if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
//
// Temporary memory may be allocated during the consumption of the mem tracker (in the processing logic of
// the exceeded limit), which will lead to entering the TCMalloc Hook again, so suspend consumption to avoid
// falling into an infinite loop.
if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
!_stop_consume) {
_stop_consume = true;
DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string();
// When switching to the current tracker last time, the remaining untracked memory.
if (_untracked_mems[_tracker_id] != 0) {
_untracked_mem += _untracked_mems[_tracker_id];
_untracked_mems[_tracker_id] = 0;
}
// Allocating memory in the Hook command causes the TCMalloc Hook to be entered again,
// will enter infinite recursion. So the temporary memory allocated in mem_tracker.try_consume
// and mem_limit_exceeded will directly call consume.
if (_check_limit) {
_check_limit = false;
noncache_try_consume(_untracked_mem);
_check_limit = true;
} else {
mem_tracker()->consume(_untracked_mem);
}
_untracked_mem = 0;
_stop_consume = false;
}
}

View File

@ -206,6 +206,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
// TODO(zxy) delete in 1.2 version
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
@ -216,6 +217,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
void PInternalServiceImpl::tablet_writer_add_block_by_http(
google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
PTabletWriterAddBlockRequest* request_raw = new PTabletWriterAddBlockRequest();
google::protobuf::Closure* done_raw =
new NewHttpClosure<PTabletWriterAddBlockRequest>(request_raw, done);
@ -243,8 +245,6 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD,
_exec_env->load_channel_mgr()->mem_tracker());
auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
if (!st.ok()) {
@ -264,12 +264,14 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll
const PTabletWriterAddBatchRequest* request,
PTabletWriterAddBatchResult* response,
google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
_tablet_writer_add_batch(cntl_base, request, response, done);
}
void PInternalServiceImpl::tablet_writer_add_batch_by_http(
google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest();
google::protobuf::Closure* done_raw =
new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, done);
@ -300,8 +302,6 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD,
_exec_env->load_channel_mgr()->mem_tracker());
// TODO(zxy) delete in 1.2 version
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl);