From cb943ae7ca03384062243cccae2380a7baea98e4 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Fri, 12 May 2023 20:39:18 +0800 Subject: [PATCH] [pipeline](bug) DCHECK may failed in pip sender queue (#19545) DCHECK may failed in pip sender queue --- be/src/util/runtime_profile.h | 3 -- be/src/vec/runtime/vdata_stream_recvr.cpp | 19 +++------- be/src/vec/runtime/vdata_stream_recvr.h | 46 ++++++++++++----------- 3 files changed, 30 insertions(+), 38 deletions(-) diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 74f8352a1b..e0a8ec50b3 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -62,9 +62,6 @@ class TRuntimeProfileTree; ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \ ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled) -#define CANCEL_SAFE_SCOPED_TIMER_ATOMIC(c, is_cancelled) \ - ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)( \ - c, is_cancelled) #define SCOPED_RAW_TIMER(c) \ doris::ScopedRawTimer MACRO_CONCAT(SCOPED_RAW_TIMER, \ __COUNTER__)(c) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index c8d95a5053..1c965a9d3d 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -57,7 +57,6 @@ VDataStreamRecvr::SenderQueue::~SenderQueue() { } bool VDataStreamRecvr::SenderQueue::should_wait() { - DCHECK(false) << "VDataStreamRecvr::SenderQueue::should_wait execute"; std::unique_lock l(_lock); return !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0; } @@ -69,16 +68,16 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { VLOG_ROW << "wait arrival fragment_instance_id=" << _recvr->fragment_instance_id() << " node=" << _recvr->dest_node_id(); // Don't count time spent waiting on the sender as active time. - CANCEL_SAFE_SCOPED_TIMER_ATOMIC(_recvr->_data_arrival_timer, &_is_cancelled); - CANCEL_SAFE_SCOPED_TIMER_ATOMIC( + CANCEL_SAFE_SCOPED_TIMER(_recvr->_data_arrival_timer, &_is_cancelled); + CANCEL_SAFE_SCOPED_TIMER( _received_first_batch ? nullptr : _recvr->_first_batch_wait_total_timer, &_is_cancelled); _data_arrival_cv.wait(l); } - return _inner_get_batch(block, eos); + return _inner_get_batch_without_lock(block, eos); } -Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block* block, bool* eos) { +Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) { if (_is_cancelled) { return Status::Cancelled("Cancelled"); } @@ -95,7 +94,6 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block* block, bool* eos) auto [next_block, block_byte_size] = std::move(_block_queue.front()); _recvr->_blocks_memory_usage->add(-block_byte_size); _block_queue.pop_front(); - _update_block_queue_empty(); if (!_pending_closures.empty()) { auto closure_pair = _pending_closures.front(); @@ -133,14 +131,11 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe auto pblock_byte_size = pblock.ByteSizeLong(); COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size); - if (_num_remaining_senders <= 0) { + DCHECK(_num_remaining_senders >= 0); + if (_num_remaining_senders == 0) { DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number)); return; } - - if (_is_cancelled) { - return; - } } BlockUPtr block = nullptr; @@ -163,7 +158,6 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); _block_queue.emplace_back(std::move(block), block_byte_size); - _update_block_queue_empty(); // if done is nullptr, this function can't delay this response if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) { MonotonicStopWatch monotonicStopWatch; @@ -208,7 +202,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_bytes_received); _block_queue.emplace_back(std::move(nblock), block_mem_size); - _update_block_queue_empty(); _data_arrival_cv.notify_one(); if (_recvr->exceeds_limit(block_mem_size)) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index ae3ebf7acd..7478dc1eb9 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -189,21 +189,22 @@ public: void close(); - bool queue_empty() { return _block_queue_empty; } + bool queue_empty() { + std::unique_lock l(_lock); + return _block_queue.empty(); + } protected: - virtual void _update_block_queue_empty() {} - Status _inner_get_batch(Block* block, bool* eos); + Status _inner_get_batch_without_lock(Block* block, bool* eos); // Not managed by this class VDataStreamRecvr* _recvr; std::mutex _lock; - std::atomic_bool _is_cancelled; - std::atomic_int _num_remaining_senders; + bool _is_cancelled; + int _num_remaining_senders; std::condition_variable _data_arrival_cv; std::condition_variable _data_removal_cv; std::list> _block_queue; - std::atomic_bool _block_queue_empty = true; bool _received_first_batch; // sender_id @@ -219,23 +220,22 @@ public: PipSenderQueue(VDataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile) : SenderQueue(parent_recvr, num_senders, profile) {} - bool should_wait() override { - return !_is_cancelled && _block_queue_empty && _num_remaining_senders > 0; - } - - void _update_block_queue_empty() override { _block_queue_empty = _block_queue.empty(); } - Status get_batch(Block* block, bool* eos) override { - CHECK(!should_wait()) << " _is_cancelled: " << _is_cancelled - << ", _block_queue_empty: " << _block_queue_empty - << ", _num_remaining_senders: " << _num_remaining_senders; std::lock_guard l(_lock); // protect _block_queue - return _inner_get_batch(block, eos); + DCHECK(_is_cancelled || !_block_queue.empty() || _num_remaining_senders == 0) + << " _is_cancelled: " << _is_cancelled + << ", _block_queue_empty: " << _block_queue.empty() + << ", _num_remaining_senders: " << _num_remaining_senders; + return _inner_get_batch_without_lock(block, eos); } void add_block(Block* block, bool use_move) override { - if (_is_cancelled || !block->rows()) { - return; + if (block->rows() == 0) return; + { + std::unique_lock l(_lock); + if (_is_cancelled) { + return; + } } BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name()); @@ -254,12 +254,14 @@ public: auto block_mem_size = nblock->allocated_bytes(); { std::unique_lock l(_lock); + if (_is_cancelled) { + return; + } _block_queue.emplace_back(std::move(nblock), block_mem_size); + COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); + _recvr->_blocks_memory_usage->add(block_mem_size); + _data_arrival_cv.notify_one(); } - COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); - _recvr->_blocks_memory_usage->add(block_mem_size); - _update_block_queue_empty(); - _data_arrival_cv.notify_one(); } }; } // namespace vectorized