[fix](load) fix a bug that reduce memory work on hard limit might be triggered twice (#13967)
When the load mem hard limit reached, all load channel should wait on the lock of LoadChannelMgr, util current reduce mem work finished. In current implementation, there's a bug might cause some threads be woke up before reduce mem work finished: thread A found that soft limit reached, picked a load channel and waiting for reduce memory work finish. The memory keep increasing thread B found that hard limit reached (either the load mem hard limit, or process soft limit), it picked a load channel to reduce memory and set the variable _should_wait_flush to true thread C found that _should_wait_flush is true, waiting on _wait_flush_cond thread A finished it's reduce memory work, found that _should_wait_flush is true, set it to false, and notify all threads. thread C is woke up and pick a load channel to do the reduce memory work, and now thread B's work is not finished. We can see 2 threads doing reduce memory work when hard limit reached, it's quite confusing.
This commit is contained in:
@ -170,6 +170,8 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
|
||||
}
|
||||
// Pick load channel to reduce memory.
|
||||
std::shared_ptr<LoadChannel> channel;
|
||||
// Indicate whether current thread is reducing mem on hard limit.
|
||||
bool reducing_mem_on_hard_limit = false;
|
||||
{
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
while (_should_wait_flush) {
|
||||
@ -216,12 +218,14 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
|
||||
<< " has exceeded";
|
||||
if (_mem_tracker->consumption() > _load_hard_mem_limit) {
|
||||
_should_wait_flush = true;
|
||||
reducing_mem_on_hard_limit = true;
|
||||
oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
|
||||
} else {
|
||||
oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
|
||||
}
|
||||
} else {
|
||||
_should_wait_flush = true;
|
||||
reducing_mem_on_hard_limit = true;
|
||||
oss << "reducing memory of " << *channel << " because process memory used "
|
||||
<< PerfCounters::get_vm_rss_str() << " has exceeded limit "
|
||||
<< PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
|
||||
@ -238,7 +242,9 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
if (_should_wait_flush) {
|
||||
// If a thread have finished the memtable flush for soft limit, and now
|
||||
// the hard limit is already reached, it should not update these variables.
|
||||
if (reducing_mem_on_hard_limit && _should_wait_flush) {
|
||||
_should_wait_flush = false;
|
||||
_wait_flush_cond.notify_all();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user