[improvement](load) reduce memory in batch for small load channels (#14214)

This commit is contained in:
zhannngchen
2022-11-12 22:14:01 +08:00
committed by GitHub
parent beaf2fcaf6
commit 7682c08af0
2 changed files with 80 additions and 29 deletions

View File

@ -70,6 +70,12 @@ LoadChannelMgr::~LoadChannelMgr() {
Status LoadChannelMgr::init(int64_t process_mem_limit) {
_load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
_load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100;
// If a load channel's memory consumption is no more than 10% of the hard limit, it's not
// worth to reduce memory on it. Since we only reduce 1/3 memory for one load channel,
// for a channel consume 10% of hard limit, we can only release about 3% memory each time,
// it's not quite helpfull to reduce memory pressure.
// In this case we need to pick multiple load channels to reduce memory more effectively.
_load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1;
_mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
_mem_tracker_set = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
"LoadChannelMgrTrackerSet");

View File

@ -87,7 +87,6 @@ protected:
std::mutex _lock;
// load id -> load channel
std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
std::shared_ptr<LoadChannel> _reduce_memory_channel = nullptr;
Cache* _last_success_channel = nullptr;
// check the total load channel mem consumption of this Backend
@ -96,6 +95,14 @@ protected:
std::unique_ptr<MemTrackerLimiter> _mem_tracker_set;
int64_t _load_hard_mem_limit = -1;
int64_t _load_soft_mem_limit = -1;
// By default, we try to reduce memory on the load channel with largest mem consumption,
// but if there are lots of small load channel, even the largest one consumes very
// small memory, in this case we need to pick multiple load channels to reduce memory
// more effectively.
// `_load_channel_min_mem_to_reduce` is used to determine whether the largest load channel's
// memory consumption is big enough.
int64_t _load_channel_min_mem_to_reduce = -1;
bool _soft_reduce_mem_in_progress = false;
// If hard limit reached, one thread will trigger load channel flush,
// other threads should wait on the condition variable.
@ -171,10 +178,9 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
return Status::OK();
}
// Pick load channel to reduce memory.
std::shared_ptr<LoadChannel> channel;
// Indicate whether current thread is reducing mem on hard limit.
bool reducing_mem_on_hard_limit = false;
std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem;
{
std::unique_lock<std::mutex> l(_lock);
while (_should_wait_flush) {
@ -182,41 +188,74 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
<< ", waiting for flush";
_wait_flush_cond.wait(l);
}
bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit ||
MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit;
// Some other thread is flushing data, and not reached hard limit now,
// we don't need to handle mem limit in current thread.
if (_reduce_memory_channel != nullptr &&
_mem_tracker->consumption() < _load_hard_mem_limit &&
MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
return Status::OK();
}
// We need to pick a LoadChannel to reduce memory usage.
// If `_reduce_memory_channel` is not null, it means the hard limit is
// exceed now, we still need to pick a load channel again. Because
// `_reduce_memory_channel` might not be the largest consumer now.
int64_t max_consume = 0;
// Pick LoadChannels to reduce memory usage, if some other thread is reducing memory
// due to soft limit, and we reached hard limit now, current thread may pick some
// duplicate channels and trigger duplicate reducing memory process.
// But the load channel's reduce memory process is thread safe, only 1 thread can
// reduce memory at the same time, other threads will wait on a condition variable,
// after the reduce-memory work finished, all threads will return.
using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, int64_t>;
std::vector<ChannelMemPair> candidate_channels;
int64_t total_consume = 0;
for (auto& kv : _load_channels) {
if (kv.second->is_high_priority()) {
// do not select high priority channel to reduce memory
// to avoid blocking them.
continue;
}
if (kv.second->mem_consumption() > max_consume) {
max_consume = kv.second->mem_consumption();
channel = kv.second;
}
int64_t mem = kv.second->mem_consumption();
// save the mem consumption, since the calculation might be expensive.
candidate_channels.push_back(std::make_pair(kv.second, mem));
total_consume += mem;
}
if (max_consume == 0) {
if (candidate_channels.empty()) {
// should not happen, add log to observe
LOG(WARNING) << "failed to find suitable load channel when total load mem limit exceed";
LOG(WARNING) << "All load channels are high priority, failed to find suitable"
<< "channels to reduce memory when total load mem limit exceed";
return Status::OK();
}
DCHECK(channel.get() != nullptr);
_reduce_memory_channel = channel;
// sort all load channels, try to find the largest one.
std::sort(candidate_channels.begin(), candidate_channels.end(),
[](const ChannelMemPair& lhs, const ChannelMemPair& rhs) {
return lhs.second > rhs.second;
});
int64_t mem_consumption_in_picked_channel = 0;
auto largest_channel = *candidate_channels.begin();
// If some load-channel is big enough, we can reduce it only, try our best to avoid
// reducing small load channels.
if (_load_channel_min_mem_to_reduce > 0 &&
largest_channel.second > _load_channel_min_mem_to_reduce) {
// Pick 1 load channel to reduce memory.
channels_to_reduce_mem.push_back(largest_channel.first);
mem_consumption_in_picked_channel = largest_channel.second;
} else {
// Pick multiple channels to reduce memory.
int64_t mem_to_flushed = total_consume / 3;
for (auto ch : candidate_channels) {
channels_to_reduce_mem.push_back(ch.first);
mem_consumption_in_picked_channel += ch.second;
if (mem_consumption_in_picked_channel >= mem_to_flushed) {
break;
}
}
}
std::ostringstream oss;
if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
oss << "reducing memory of " << *channel << " because total load mem consumption "
oss << "reducing memory of " << channels_to_reduce_mem.size()
<< " load channels (total mem consumption: " << mem_consumption_in_picked_channel
<< " bytes), because total load mem consumption "
<< PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES)
<< " has exceeded";
if (_mem_tracker->consumption() > _load_hard_mem_limit) {
@ -224,24 +263,30 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
reducing_mem_on_hard_limit = true;
oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
} else {
_soft_reduce_mem_in_progress = true;
oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
}
} else {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
oss << "reducing memory of " << *channel << " because process memory used "
<< PerfCounters::get_vm_rss_str() << " has exceeded limit "
oss << "reducing memory of " << channels_to_reduce_mem.size()
<< " load channels (total mem consumption: " << mem_consumption_in_picked_channel
<< " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit "
<< PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
<< " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str();
}
LOG(INFO) << oss.str();
}
// No matter soft limit or hard limit reached, only 1 thread will wait here,
// if hard limit reached, other threads will pend at the beginning of this
// method.
Status st = channel->handle_mem_exceed_limit(response);
LOG(INFO) << "reduce memory of " << *channel << " finished";
Status st = Status::OK();
for (auto ch : channels_to_reduce_mem) {
uint64_t begin = GetCurrentTimeMicros();
int64_t mem_usage = ch->mem_consumption();
st = ch->handle_mem_exceed_limit(response);
LOG(INFO) << "reduced memory of " << *ch << ", cost "
<< (GetCurrentTimeMicros() - begin) / 1000
<< " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes";
}
{
std::lock_guard<std::mutex> l(_lock);
@ -251,8 +296,8 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
_should_wait_flush = false;
_wait_flush_cond.notify_all();
}
if (_reduce_memory_channel == channel) {
_reduce_memory_channel = nullptr;
if (_soft_reduce_mem_in_progress) {
_soft_reduce_mem_in_progress = false;
}
}
return st;