[improvement](load) release load channel actively when error occurs (#14218)

This commit is contained in:
Xin Liao
2022-11-13 12:31:15 +08:00
committed by GitHub
parent dd11d5c0a5
commit 33b50860c7
8 changed files with 190 additions and 191 deletions

View File

@ -160,7 +160,7 @@ Status DeltaWriter::write(Tuple* tuple) {
if (_is_cancelled) {
// The writer may be cancelled at any time by other thread.
// just return ERROR if writer is cancelled.
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
return _cancel_status;
}
_mem_table->insert(tuple);
@ -185,7 +185,7 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
}
if (_is_cancelled) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
return _cancel_status;
}
for (const auto& row_idx : row_idxs) {
@ -213,7 +213,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
}
if (_is_cancelled) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
return _cancel_status;
}
_mem_table->insert(block, row_idxs);
@ -247,7 +247,7 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
}
if (_is_cancelled) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
return _cancel_status;
}
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
@ -274,7 +274,7 @@ Status DeltaWriter::wait_flush() {
return Status::OK();
}
if (_is_cancelled) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
return _cancel_status;
}
RETURN_NOT_OK(_flush_token->wait());
return Status::OK();
@ -324,7 +324,7 @@ Status DeltaWriter::close() {
}
if (_is_cancelled) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
return _cancel_status;
}
auto s = _flush_memtable_async();
@ -343,7 +343,7 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
<< "delta writer is supposed be to initialized before close_wait() being called";
if (_is_cancelled) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
return _cancel_status;
}
// return error if previous flush failed
RETURN_NOT_OK(_flush_token->wait());
@ -400,6 +400,10 @@ void DeltaWriter::add_finished_slave_replicas(
}
Status DeltaWriter::cancel() {
return cancel_with_status(Status::Cancelled("already cancelled"));
}
Status DeltaWriter::cancel_with_status(const Status& st) {
std::lock_guard<std::mutex> l(_lock);
if (!_is_init || _is_cancelled) {
return Status::OK();
@ -410,6 +414,7 @@ Status DeltaWriter::cancel() {
_flush_token->cancel();
}
_is_cancelled = true;
_cancel_status = st;
return Status::OK();
}

View File

@ -82,6 +82,7 @@ public:
// abandon current memtable and wait for all pending-flushing memtables to be destructed.
// mem_consumption() should be 0 after this function returns.
Status cancel();
Status cancel_with_status(const Status& st);
// submit current memtable to flush queue, and wait all memtables in flush queue
// to be flushed.
@ -128,6 +129,7 @@ private:
bool _is_init = false;
bool _is_cancelled = false;
Status _cancel_status;
WriteRequest _req;
TabletSharedPtr _tablet;
RowsetSharedPtr _cur_rowset;

View File

@ -121,4 +121,23 @@ Status LoadChannel::cancel() {
return Status::OK();
}
} // namespace doris
void LoadChannel::handle_mem_exceed_limit() {
bool found = false;
std::shared_ptr<TabletsChannel> channel;
{
// lock so that only one thread can check mem limit
std::lock_guard<SpinLock> l(_tablets_channels_lock);
found = _find_largest_consumption_channel(&channel);
}
// Release lock so that other threads can still call add_batch concurrently.
if (found) {
DCHECK(channel != nullptr);
channel->reduce_mem_usage();
} else {
// should not happen, add log to observe
LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. "
<< "load_id=" << _load_id;
}
}
} // namespace doris

View File

@ -63,8 +63,7 @@ public:
// If yes, it will pick a tablets channel to try to reduce memory consumption.
// The method will not return until the chosen tablet channels finished memtable
// flush.
template <typename TabletWriterAddResult>
Status handle_mem_exceed_limit(TabletWriterAddResult* response);
void handle_mem_exceed_limit();
int64_t mem_consumption() {
int64_t mem_usage = 0;
@ -183,25 +182,4 @@ inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
return os;
}
template <typename TabletWriterAddResult>
Status LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) {
bool found = false;
std::shared_ptr<TabletsChannel> channel;
{
// lock so that only one thread can check mem limit
std::lock_guard<SpinLock> l(_tablets_channels_lock);
found = _find_largest_consumption_channel(&channel);
}
// Release lock so that other threads can still call add_batch concurrently.
if (found) {
DCHECK(channel != nullptr);
return channel->reduce_mem_usage(response);
} else {
// should not happen, add log to observe
LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. "
<< "load_id=" << _load_id;
}
return Status::OK();
}
} // namespace doris

View File

@ -217,4 +217,136 @@ Status LoadChannelMgr::_start_load_channels_clean() {
return Status::OK();
}
void LoadChannelMgr::_handle_mem_exceed_limit() {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
int64_t process_mem_limit = MemInfo::mem_limit() * config::soft_mem_limit_frac;
if (_mem_tracker->consumption() < _load_soft_mem_limit &&
MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
return;
}
// Indicate whether current thread is reducing mem on hard limit.
bool reducing_mem_on_hard_limit = false;
std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem;
{
std::unique_lock<std::mutex> l(_lock);
while (_should_wait_flush) {
LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit
<< ", waiting for flush";
_wait_flush_cond.wait(l);
}
bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit ||
MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit;
// Some other thread is flushing data, and not reached hard limit now,
// we don't need to handle mem limit in current thread.
if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
return;
}
// Pick LoadChannels to reduce memory usage, if some other thread is reducing memory
// due to soft limit, and we reached hard limit now, current thread may pick some
// duplicate channels and trigger duplicate reducing memory process.
// But the load channel's reduce memory process is thread safe, only 1 thread can
// reduce memory at the same time, other threads will wait on a condition variable,
// after the reduce-memory work finished, all threads will return.
using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, int64_t>;
std::vector<ChannelMemPair> candidate_channels;
int64_t total_consume = 0;
for (auto& kv : _load_channels) {
if (kv.second->is_high_priority()) {
// do not select high priority channel to reduce memory
// to avoid blocking them.
continue;
}
int64_t mem = kv.second->mem_consumption();
// save the mem consumption, since the calculation might be expensive.
candidate_channels.push_back(std::make_pair(kv.second, mem));
total_consume += mem;
}
if (candidate_channels.empty()) {
// should not happen, add log to observe
LOG(WARNING) << "All load channels are high priority, failed to find suitable"
<< "channels to reduce memory when total load mem limit exceed";
return;
}
// sort all load channels, try to find the largest one.
std::sort(candidate_channels.begin(), candidate_channels.end(),
[](const ChannelMemPair& lhs, const ChannelMemPair& rhs) {
return lhs.second > rhs.second;
});
int64_t mem_consumption_in_picked_channel = 0;
auto largest_channel = *candidate_channels.begin();
// If some load-channel is big enough, we can reduce it only, try our best to avoid
// reducing small load channels.
if (_load_channel_min_mem_to_reduce > 0 &&
largest_channel.second > _load_channel_min_mem_to_reduce) {
// Pick 1 load channel to reduce memory.
channels_to_reduce_mem.push_back(largest_channel.first);
mem_consumption_in_picked_channel = largest_channel.second;
} else {
// Pick multiple channels to reduce memory.
int64_t mem_to_flushed = total_consume / 3;
for (auto ch : candidate_channels) {
channels_to_reduce_mem.push_back(ch.first);
mem_consumption_in_picked_channel += ch.second;
if (mem_consumption_in_picked_channel >= mem_to_flushed) {
break;
}
}
}
std::ostringstream oss;
if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
oss << "reducing memory of " << channels_to_reduce_mem.size()
<< " load channels (total mem consumption: " << mem_consumption_in_picked_channel
<< " bytes), because total load mem consumption "
<< PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES)
<< " has exceeded";
if (_mem_tracker->consumption() > _load_hard_mem_limit) {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
} else {
_soft_reduce_mem_in_progress = true;
oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
}
} else {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
oss << "reducing memory of " << channels_to_reduce_mem.size()
<< " load channels (total mem consumption: " << mem_consumption_in_picked_channel
<< " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit "
<< PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
<< " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str();
}
LOG(INFO) << oss.str();
}
for (auto ch : channels_to_reduce_mem) {
uint64_t begin = GetCurrentTimeMicros();
int64_t mem_usage = ch->mem_consumption();
ch->handle_mem_exceed_limit();
LOG(INFO) << "reduced memory of " << *ch << ", cost "
<< (GetCurrentTimeMicros() - begin) / 1000
<< " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes";
}
{
std::lock_guard<std::mutex> l(_lock);
// If a thread have finished the memtable flush for soft limit, and now
// the hard limit is already reached, it should not update these variables.
if (reducing_mem_on_hard_limit && _should_wait_flush) {
_should_wait_flush = false;
_wait_flush_cond.notify_all();
}
if (_soft_reduce_mem_in_progress) {
_soft_reduce_mem_in_progress = false;
}
}
return;
}
} // namespace doris

View File

@ -77,8 +77,7 @@ private:
void _finish_load_channel(UniqueId load_id);
// check if the total load mem consumption exceeds limit.
// If yes, it will pick a load channel to try to reduce memory consumption.
template <typename TabletWriterAddResult>
Status _handle_mem_exceed_limit(TabletWriterAddResult* response);
void _handle_mem_exceed_limit();
Status _start_bg_worker();
@ -154,13 +153,17 @@ Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request,
// 2. check if mem consumption exceed limit
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
RETURN_IF_ERROR(_handle_mem_exceed_limit(response));
_handle_mem_exceed_limit();
}
// 3. add batch to load channel
// batch may not exist in request(eg: eos request without batch),
// this case will be handled in load channel's add batch method.
RETURN_IF_ERROR(channel->add_batch(request, response));
Status st = channel->add_batch(request, response);
if (UNLIKELY(!st.ok())) {
channel->cancel();
return st;
}
// 4. handle finish
if (channel->is_finished()) {
@ -169,138 +172,4 @@ Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request,
return Status::OK();
}
template <typename TabletWriterAddResult>
Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
int64_t process_mem_limit = MemInfo::mem_limit() * config::soft_mem_limit_frac;
if (_mem_tracker->consumption() < _load_soft_mem_limit &&
MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
return Status::OK();
}
// Indicate whether current thread is reducing mem on hard limit.
bool reducing_mem_on_hard_limit = false;
std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem;
{
std::unique_lock<std::mutex> l(_lock);
while (_should_wait_flush) {
LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit
<< ", waiting for flush";
_wait_flush_cond.wait(l);
}
bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit ||
MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit;
// Some other thread is flushing data, and not reached hard limit now,
// we don't need to handle mem limit in current thread.
if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
return Status::OK();
}
// Pick LoadChannels to reduce memory usage, if some other thread is reducing memory
// due to soft limit, and we reached hard limit now, current thread may pick some
// duplicate channels and trigger duplicate reducing memory process.
// But the load channel's reduce memory process is thread safe, only 1 thread can
// reduce memory at the same time, other threads will wait on a condition variable,
// after the reduce-memory work finished, all threads will return.
using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, int64_t>;
std::vector<ChannelMemPair> candidate_channels;
int64_t total_consume = 0;
for (auto& kv : _load_channels) {
if (kv.second->is_high_priority()) {
// do not select high priority channel to reduce memory
// to avoid blocking them.
continue;
}
int64_t mem = kv.second->mem_consumption();
// save the mem consumption, since the calculation might be expensive.
candidate_channels.push_back(std::make_pair(kv.second, mem));
total_consume += mem;
}
if (candidate_channels.empty()) {
// should not happen, add log to observe
LOG(WARNING) << "All load channels are high priority, failed to find suitable"
<< "channels to reduce memory when total load mem limit exceed";
return Status::OK();
}
// sort all load channels, try to find the largest one.
std::sort(candidate_channels.begin(), candidate_channels.end(),
[](const ChannelMemPair& lhs, const ChannelMemPair& rhs) {
return lhs.second > rhs.second;
});
int64_t mem_consumption_in_picked_channel = 0;
auto largest_channel = *candidate_channels.begin();
// If some load-channel is big enough, we can reduce it only, try our best to avoid
// reducing small load channels.
if (_load_channel_min_mem_to_reduce > 0 &&
largest_channel.second > _load_channel_min_mem_to_reduce) {
// Pick 1 load channel to reduce memory.
channels_to_reduce_mem.push_back(largest_channel.first);
mem_consumption_in_picked_channel = largest_channel.second;
} else {
// Pick multiple channels to reduce memory.
int64_t mem_to_flushed = total_consume / 3;
for (auto ch : candidate_channels) {
channels_to_reduce_mem.push_back(ch.first);
mem_consumption_in_picked_channel += ch.second;
if (mem_consumption_in_picked_channel >= mem_to_flushed) {
break;
}
}
}
std::ostringstream oss;
if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
oss << "reducing memory of " << channels_to_reduce_mem.size()
<< " load channels (total mem consumption: " << mem_consumption_in_picked_channel
<< " bytes), because total load mem consumption "
<< PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES)
<< " has exceeded";
if (_mem_tracker->consumption() > _load_hard_mem_limit) {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
} else {
_soft_reduce_mem_in_progress = true;
oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
}
} else {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
oss << "reducing memory of " << channels_to_reduce_mem.size()
<< " load channels (total mem consumption: " << mem_consumption_in_picked_channel
<< " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit "
<< PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
<< " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str();
}
LOG(INFO) << oss.str();
}
Status st = Status::OK();
for (auto ch : channels_to_reduce_mem) {
uint64_t begin = GetCurrentTimeMicros();
int64_t mem_usage = ch->mem_consumption();
st = ch->handle_mem_exceed_limit(response);
LOG(INFO) << "reduced memory of " << *ch << ", cost "
<< (GetCurrentTimeMicros() - begin) / 1000
<< " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes";
}
{
std::lock_guard<std::mutex> l(_lock);
// If a thread have finished the memtable flush for soft limit, and now
// the hard limit is already reached, it should not update these variables.
if (reducing_mem_on_hard_limit && _should_wait_flush) {
_should_wait_flush = false;
_wait_flush_cond.notify_all();
}
if (_soft_reduce_mem_in_progress) {
_soft_reduce_mem_in_progress = false;
}
}
return st;
}
} // namespace doris

View File

@ -203,14 +203,13 @@ int64_t TabletsChannel::mem_consumption() {
return mem_usage;
}
template <typename TabletWriterAddResult>
Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
void TabletsChannel::reduce_mem_usage() {
if (_try_to_wait_flushing()) {
// `_try_to_wait_flushing()` returns true means other thread already
// reduced the mem usage, and current thread do not need to reduce again.
LOG(INFO) << "Duplicate reduce mem usage on TabletsChannel, txn_id: " << _txn_id
<< ", index_id: " << _index_id;
return Status::OK();
return;
}
std::vector<DeltaWriter*> writers_to_wait_flush;
@ -219,7 +218,9 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
if (_state == kFinished) {
// TabletsChannel is closed without LoadChannel's lock,
// therefore it's possible for reduce_mem_usage() to be called right after close()
return _close_status;
LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id
<< ", index_id: " << _index_id;
return;
}
// Sort the DeltaWriters by mem consumption in descend order.
@ -275,8 +276,6 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
<< writers[counter - 1]->get_memtable_consumption_snapshot() << " bytes";
}
LOG(INFO) << ss.str();
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
response->mutable_tablet_errors();
// following loop flush memtable async, we'll do it with _lock
for (int i = 0; i < counter; i++) {
Status st = writers[i]->flush_memtable_and_wait(false);
@ -287,9 +286,7 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
writers[i]->tablet_id(), _txn_id, st.code(), st.precise_code(),
st.get_error_msg());
LOG(WARNING) << err_msg;
PTabletError* error = tablet_errors->Add();
error->set_tablet_id(writers[i]->tablet_id());
error->set_msg(err_msg);
writers[i]->cancel_with_status(st);
_broken_tablets.insert(writers[i]->tablet_id());
}
}
@ -308,14 +305,16 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
}
}
Status st = Status::OK();
for (auto writer : writers_to_wait_flush) {
st = writer->wait_flush();
Status st = writer->wait_flush();
if (!st.ok()) {
st = Status::InternalError(
"failed to reduce mem consumption by flushing memtable. err: {}",
st.to_string());
break;
auto err_msg = strings::Substitute(
"tablet writer failed to reduce mem consumption by waiting flush memtable, "
"tablet_id=$0, txn_id=$1, err=$2, errcode=$3, msg:$4",
writer->tablet_id(), _txn_id, st.code(), st.precise_code(), st.get_error_msg());
LOG(WARNING) << err_msg;
writer->cancel_with_status(st);
_broken_tablets.insert(writer->tablet_id());
}
}
@ -325,7 +324,7 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
_reduce_memory_cond.notify_all();
}
return st;
return;
}
Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
@ -502,8 +501,4 @@ TabletsChannel::add_batch<PTabletWriterAddBatchRequest, PTabletWriterAddBatchRes
template Status
TabletsChannel::add_batch<PTabletWriterAddBlockRequest, PTabletWriterAddBlockResult>(
PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*);
template Status TabletsChannel::reduce_mem_usage<PTabletWriterAddBatchResult>(
PTabletWriterAddBatchResult*);
template Status TabletsChannel::reduce_mem_usage<PTabletWriterAddBlockResult>(
PTabletWriterAddBlockResult*);
} // namespace doris

View File

@ -92,8 +92,7 @@ public:
// eg. flush the largest memtable immediately.
// return Status::OK if mem is reduced.
// no-op when this channel has been closed or cancelled
template <typename TabletWriterAddResult>
Status reduce_mem_usage(TabletWriterAddResult* response);
void reduce_mem_usage();
int64_t mem_consumption();