From 33b50860c7a32e34c10ff805a8a885897de9e5bb Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Sun, 13 Nov 2022 12:31:15 +0800 Subject: [PATCH] [improvement](load) release load channel actively when error occurs (#14218) --- be/src/olap/delta_writer.cpp | 19 ++-- be/src/olap/delta_writer.h | 2 + be/src/runtime/load_channel.cpp | 21 +++- be/src/runtime/load_channel.h | 24 +---- be/src/runtime/load_channel_mgr.cpp | 132 +++++++++++++++++++++++++ be/src/runtime/load_channel_mgr.h | 145 ++-------------------------- be/src/runtime/tablets_channel.cpp | 35 +++---- be/src/runtime/tablets_channel.h | 3 +- 8 files changed, 190 insertions(+), 191 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index c9a4890fbb..c1bb34f1d6 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -160,7 +160,7 @@ Status DeltaWriter::write(Tuple* tuple) { if (_is_cancelled) { // The writer may be cancelled at any time by other thread. // just return ERROR if writer is cancelled. - return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); + return _cancel_status; } _mem_table->insert(tuple); @@ -185,7 +185,7 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector& row } if (_is_cancelled) { - return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); + return _cancel_status; } for (const auto& row_idx : row_idxs) { @@ -213,7 +213,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector } if (_is_cancelled) { - return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); + return _cancel_status; } _mem_table->insert(block, row_idxs); @@ -247,7 +247,7 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { } if (_is_cancelled) { - return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); + return _cancel_status; } VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " @@ -274,7 +274,7 @@ Status DeltaWriter::wait_flush() { return Status::OK(); } if (_is_cancelled) { - return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); + return _cancel_status; } RETURN_NOT_OK(_flush_token->wait()); return Status::OK(); @@ -324,7 +324,7 @@ Status DeltaWriter::close() { } if (_is_cancelled) { - return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); + return _cancel_status; } auto s = _flush_memtable_async(); @@ -343,7 +343,7 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, << "delta writer is supposed be to initialized before close_wait() being called"; if (_is_cancelled) { - return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); + return _cancel_status; } // return error if previous flush failed RETURN_NOT_OK(_flush_token->wait()); @@ -400,6 +400,10 @@ void DeltaWriter::add_finished_slave_replicas( } Status DeltaWriter::cancel() { + return cancel_with_status(Status::Cancelled("already cancelled")); +} + +Status DeltaWriter::cancel_with_status(const Status& st) { std::lock_guard l(_lock); if (!_is_init || _is_cancelled) { return Status::OK(); @@ -410,6 +414,7 @@ Status DeltaWriter::cancel() { _flush_token->cancel(); } _is_cancelled = true; + _cancel_status = st; return Status::OK(); } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 444533fecc..5484c7c7db 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -82,6 +82,7 @@ public: // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. Status cancel(); + Status cancel_with_status(const Status& st); // submit current memtable to flush queue, and wait all memtables in flush queue // to be flushed. @@ -128,6 +129,7 @@ private: bool _is_init = false; bool _is_cancelled = false; + Status _cancel_status; WriteRequest _req; TabletSharedPtr _tablet; RowsetSharedPtr _cur_rowset; diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index ba15c6c7b4..10a49bdc54 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -121,4 +121,23 @@ Status LoadChannel::cancel() { return Status::OK(); } -} // namespace doris \ No newline at end of file +void LoadChannel::handle_mem_exceed_limit() { + bool found = false; + std::shared_ptr channel; + { + // lock so that only one thread can check mem limit + std::lock_guard l(_tablets_channels_lock); + found = _find_largest_consumption_channel(&channel); + } + // Release lock so that other threads can still call add_batch concurrently. + if (found) { + DCHECK(channel != nullptr); + channel->reduce_mem_usage(); + } else { + // should not happen, add log to observe + LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. " + << "load_id=" << _load_id; + } +} + +} // namespace doris diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 5af420a970..9d8e5f2f33 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -63,8 +63,7 @@ public: // If yes, it will pick a tablets channel to try to reduce memory consumption. // The method will not return until the chosen tablet channels finished memtable // flush. - template - Status handle_mem_exceed_limit(TabletWriterAddResult* response); + void handle_mem_exceed_limit(); int64_t mem_consumption() { int64_t mem_usage = 0; @@ -183,25 +182,4 @@ inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { return os; } -template -Status LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) { - bool found = false; - std::shared_ptr channel; - { - // lock so that only one thread can check mem limit - std::lock_guard l(_tablets_channels_lock); - found = _find_largest_consumption_channel(&channel); - } - // Release lock so that other threads can still call add_batch concurrently. - if (found) { - DCHECK(channel != nullptr); - return channel->reduce_mem_usage(response); - } else { - // should not happen, add log to observe - LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. " - << "load_id=" << _load_id; - } - return Status::OK(); -} - } // namespace doris diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index c81ace2487..45c80c6169 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -217,4 +217,136 @@ Status LoadChannelMgr::_start_load_channels_clean() { return Status::OK(); } +void LoadChannelMgr::_handle_mem_exceed_limit() { + // Check the soft limit. + DCHECK(_load_soft_mem_limit > 0); + int64_t process_mem_limit = MemInfo::mem_limit() * config::soft_mem_limit_frac; + if (_mem_tracker->consumption() < _load_soft_mem_limit && + MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { + return; + } + // Indicate whether current thread is reducing mem on hard limit. + bool reducing_mem_on_hard_limit = false; + std::vector> channels_to_reduce_mem; + { + std::unique_lock l(_lock); + while (_should_wait_flush) { + LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit + << ", waiting for flush"; + _wait_flush_cond.wait(l); + } + bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || + MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit; + // Some other thread is flushing data, and not reached hard limit now, + // we don't need to handle mem limit in current thread. + if (_soft_reduce_mem_in_progress && !hard_limit_reached) { + return; + } + + // Pick LoadChannels to reduce memory usage, if some other thread is reducing memory + // due to soft limit, and we reached hard limit now, current thread may pick some + // duplicate channels and trigger duplicate reducing memory process. + // But the load channel's reduce memory process is thread safe, only 1 thread can + // reduce memory at the same time, other threads will wait on a condition variable, + // after the reduce-memory work finished, all threads will return. + using ChannelMemPair = std::pair, int64_t>; + std::vector candidate_channels; + int64_t total_consume = 0; + for (auto& kv : _load_channels) { + if (kv.second->is_high_priority()) { + // do not select high priority channel to reduce memory + // to avoid blocking them. + continue; + } + int64_t mem = kv.second->mem_consumption(); + // save the mem consumption, since the calculation might be expensive. + candidate_channels.push_back(std::make_pair(kv.second, mem)); + total_consume += mem; + } + + if (candidate_channels.empty()) { + // should not happen, add log to observe + LOG(WARNING) << "All load channels are high priority, failed to find suitable" + << "channels to reduce memory when total load mem limit exceed"; + return; + } + + // sort all load channels, try to find the largest one. + std::sort(candidate_channels.begin(), candidate_channels.end(), + [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) { + return lhs.second > rhs.second; + }); + + int64_t mem_consumption_in_picked_channel = 0; + auto largest_channel = *candidate_channels.begin(); + // If some load-channel is big enough, we can reduce it only, try our best to avoid + // reducing small load channels. + if (_load_channel_min_mem_to_reduce > 0 && + largest_channel.second > _load_channel_min_mem_to_reduce) { + // Pick 1 load channel to reduce memory. + channels_to_reduce_mem.push_back(largest_channel.first); + mem_consumption_in_picked_channel = largest_channel.second; + } else { + // Pick multiple channels to reduce memory. + int64_t mem_to_flushed = total_consume / 3; + for (auto ch : candidate_channels) { + channels_to_reduce_mem.push_back(ch.first); + mem_consumption_in_picked_channel += ch.second; + if (mem_consumption_in_picked_channel >= mem_to_flushed) { + break; + } + } + } + + std::ostringstream oss; + if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { + oss << "reducing memory of " << channels_to_reduce_mem.size() + << " load channels (total mem consumption: " << mem_consumption_in_picked_channel + << " bytes), because total load mem consumption " + << PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES) + << " has exceeded"; + if (_mem_tracker->consumption() > _load_hard_mem_limit) { + _should_wait_flush = true; + reducing_mem_on_hard_limit = true; + oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES); + } else { + _soft_reduce_mem_in_progress = true; + oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES); + } + } else { + _should_wait_flush = true; + reducing_mem_on_hard_limit = true; + oss << "reducing memory of " << channels_to_reduce_mem.size() + << " load channels (total mem consumption: " << mem_consumption_in_picked_channel + << " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit " + << PrettyPrinter::print(process_mem_limit, TUnit::BYTES) + << " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str(); + } + LOG(INFO) << oss.str(); + } + + for (auto ch : channels_to_reduce_mem) { + uint64_t begin = GetCurrentTimeMicros(); + int64_t mem_usage = ch->mem_consumption(); + ch->handle_mem_exceed_limit(); + LOG(INFO) << "reduced memory of " << *ch << ", cost " + << (GetCurrentTimeMicros() - begin) / 1000 + << " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes"; + } + + { + std::lock_guard l(_lock); + // If a thread have finished the memtable flush for soft limit, and now + // the hard limit is already reached, it should not update these variables. + if (reducing_mem_on_hard_limit && _should_wait_flush) { + _should_wait_flush = false; + _wait_flush_cond.notify_all(); + } + if (_soft_reduce_mem_in_progress) { + _soft_reduce_mem_in_progress = false; + } + } + return; +} + } // namespace doris diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index c1c86a7890..2dcc79fab8 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -77,8 +77,7 @@ private: void _finish_load_channel(UniqueId load_id); // check if the total load mem consumption exceeds limit. // If yes, it will pick a load channel to try to reduce memory consumption. - template - Status _handle_mem_exceed_limit(TabletWriterAddResult* response); + void _handle_mem_exceed_limit(); Status _start_bg_worker(); @@ -154,13 +153,17 @@ Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, // 2. check if mem consumption exceed limit // If this is a high priority load task, do not handle this. // because this may block for a while, which may lead to rpc timeout. - RETURN_IF_ERROR(_handle_mem_exceed_limit(response)); + _handle_mem_exceed_limit(); } // 3. add batch to load channel // batch may not exist in request(eg: eos request without batch), // this case will be handled in load channel's add batch method. - RETURN_IF_ERROR(channel->add_batch(request, response)); + Status st = channel->add_batch(request, response); + if (UNLIKELY(!st.ok())) { + channel->cancel(); + return st; + } // 4. handle finish if (channel->is_finished()) { @@ -169,138 +172,4 @@ Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, return Status::OK(); } -template -Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) { - // Check the soft limit. - DCHECK(_load_soft_mem_limit > 0); - int64_t process_mem_limit = MemInfo::mem_limit() * config::soft_mem_limit_frac; - if (_mem_tracker->consumption() < _load_soft_mem_limit && - MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { - return Status::OK(); - } - // Indicate whether current thread is reducing mem on hard limit. - bool reducing_mem_on_hard_limit = false; - std::vector> channels_to_reduce_mem; - { - std::unique_lock l(_lock); - while (_should_wait_flush) { - LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit - << ", waiting for flush"; - _wait_flush_cond.wait(l); - } - bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || - MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit; - // Some other thread is flushing data, and not reached hard limit now, - // we don't need to handle mem limit in current thread. - if (_soft_reduce_mem_in_progress && !hard_limit_reached) { - return Status::OK(); - } - - // Pick LoadChannels to reduce memory usage, if some other thread is reducing memory - // due to soft limit, and we reached hard limit now, current thread may pick some - // duplicate channels and trigger duplicate reducing memory process. - // But the load channel's reduce memory process is thread safe, only 1 thread can - // reduce memory at the same time, other threads will wait on a condition variable, - // after the reduce-memory work finished, all threads will return. - using ChannelMemPair = std::pair, int64_t>; - std::vector candidate_channels; - int64_t total_consume = 0; - for (auto& kv : _load_channels) { - if (kv.second->is_high_priority()) { - // do not select high priority channel to reduce memory - // to avoid blocking them. - continue; - } - int64_t mem = kv.second->mem_consumption(); - // save the mem consumption, since the calculation might be expensive. - candidate_channels.push_back(std::make_pair(kv.second, mem)); - total_consume += mem; - } - - if (candidate_channels.empty()) { - // should not happen, add log to observe - LOG(WARNING) << "All load channels are high priority, failed to find suitable" - << "channels to reduce memory when total load mem limit exceed"; - return Status::OK(); - } - - // sort all load channels, try to find the largest one. - std::sort(candidate_channels.begin(), candidate_channels.end(), - [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) { - return lhs.second > rhs.second; - }); - - int64_t mem_consumption_in_picked_channel = 0; - auto largest_channel = *candidate_channels.begin(); - // If some load-channel is big enough, we can reduce it only, try our best to avoid - // reducing small load channels. - if (_load_channel_min_mem_to_reduce > 0 && - largest_channel.second > _load_channel_min_mem_to_reduce) { - // Pick 1 load channel to reduce memory. - channels_to_reduce_mem.push_back(largest_channel.first); - mem_consumption_in_picked_channel = largest_channel.second; - } else { - // Pick multiple channels to reduce memory. - int64_t mem_to_flushed = total_consume / 3; - for (auto ch : candidate_channels) { - channels_to_reduce_mem.push_back(ch.first); - mem_consumption_in_picked_channel += ch.second; - if (mem_consumption_in_picked_channel >= mem_to_flushed) { - break; - } - } - } - - std::ostringstream oss; - if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { - oss << "reducing memory of " << channels_to_reduce_mem.size() - << " load channels (total mem consumption: " << mem_consumption_in_picked_channel - << " bytes), because total load mem consumption " - << PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES) - << " has exceeded"; - if (_mem_tracker->consumption() > _load_hard_mem_limit) { - _should_wait_flush = true; - reducing_mem_on_hard_limit = true; - oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES); - } else { - _soft_reduce_mem_in_progress = true; - oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES); - } - } else { - _should_wait_flush = true; - reducing_mem_on_hard_limit = true; - oss << "reducing memory of " << channels_to_reduce_mem.size() - << " load channels (total mem consumption: " << mem_consumption_in_picked_channel - << " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit " - << PrettyPrinter::print(process_mem_limit, TUnit::BYTES) - << " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str(); - } - LOG(INFO) << oss.str(); - } - - Status st = Status::OK(); - for (auto ch : channels_to_reduce_mem) { - uint64_t begin = GetCurrentTimeMicros(); - int64_t mem_usage = ch->mem_consumption(); - st = ch->handle_mem_exceed_limit(response); - LOG(INFO) << "reduced memory of " << *ch << ", cost " - << (GetCurrentTimeMicros() - begin) / 1000 - << " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes"; - } - - { - std::lock_guard l(_lock); - // If a thread have finished the memtable flush for soft limit, and now - // the hard limit is already reached, it should not update these variables. - if (reducing_mem_on_hard_limit && _should_wait_flush) { - _should_wait_flush = false; - _wait_flush_cond.notify_all(); - } - if (_soft_reduce_mem_in_progress) { - _soft_reduce_mem_in_progress = false; - } - } - return st; -} - } // namespace doris diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 584e101199..85ab150203 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -203,14 +203,13 @@ int64_t TabletsChannel::mem_consumption() { return mem_usage; } -template -Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { +void TabletsChannel::reduce_mem_usage() { if (_try_to_wait_flushing()) { // `_try_to_wait_flushing()` returns true means other thread already // reduced the mem usage, and current thread do not need to reduce again. LOG(INFO) << "Duplicate reduce mem usage on TabletsChannel, txn_id: " << _txn_id << ", index_id: " << _index_id; - return Status::OK(); + return; } std::vector writers_to_wait_flush; @@ -219,7 +218,9 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { if (_state == kFinished) { // TabletsChannel is closed without LoadChannel's lock, // therefore it's possible for reduce_mem_usage() to be called right after close() - return _close_status; + LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id + << ", index_id: " << _index_id; + return; } // Sort the DeltaWriters by mem consumption in descend order. @@ -275,8 +276,6 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { << writers[counter - 1]->get_memtable_consumption_snapshot() << " bytes"; } LOG(INFO) << ss.str(); - google::protobuf::RepeatedPtrField* tablet_errors = - response->mutable_tablet_errors(); // following loop flush memtable async, we'll do it with _lock for (int i = 0; i < counter; i++) { Status st = writers[i]->flush_memtable_and_wait(false); @@ -287,9 +286,7 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { writers[i]->tablet_id(), _txn_id, st.code(), st.precise_code(), st.get_error_msg()); LOG(WARNING) << err_msg; - PTabletError* error = tablet_errors->Add(); - error->set_tablet_id(writers[i]->tablet_id()); - error->set_msg(err_msg); + writers[i]->cancel_with_status(st); _broken_tablets.insert(writers[i]->tablet_id()); } } @@ -308,14 +305,16 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { } } - Status st = Status::OK(); for (auto writer : writers_to_wait_flush) { - st = writer->wait_flush(); + Status st = writer->wait_flush(); if (!st.ok()) { - st = Status::InternalError( - "failed to reduce mem consumption by flushing memtable. err: {}", - st.to_string()); - break; + auto err_msg = strings::Substitute( + "tablet writer failed to reduce mem consumption by waiting flush memtable, " + "tablet_id=$0, txn_id=$1, err=$2, errcode=$3, msg:$4", + writer->tablet_id(), _txn_id, st.code(), st.precise_code(), st.get_error_msg()); + LOG(WARNING) << err_msg; + writer->cancel_with_status(st); + _broken_tablets.insert(writer->tablet_id()); } } @@ -325,7 +324,7 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { _reduce_memory_cond.notify_all(); } - return st; + return; } Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) { @@ -502,8 +501,4 @@ TabletsChannel::add_batch( PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*); -template Status TabletsChannel::reduce_mem_usage( - PTabletWriterAddBatchResult*); -template Status TabletsChannel::reduce_mem_usage( - PTabletWriterAddBlockResult*); } // namespace doris diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index bfdbf0e8f7..524a66fb70 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -92,8 +92,7 @@ public: // eg. flush the largest memtable immediately. // return Status::OK if mem is reduced. // no-op when this channel has been closed or cancelled - template - Status reduce_mem_usage(TabletWriterAddResult* response); + void reduce_mem_usage(); int64_t mem_consumption();