[pipeline](bug) DCHECK may failed in pip sender queue (#19545)
DCHECK may failed in pip sender queue
This commit is contained in:
@ -62,9 +62,6 @@ class TRuntimeProfileTree;
|
||||
ScopedTimer<ThreadCpuStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
|
||||
#define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
|
||||
ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
|
||||
#define CANCEL_SAFE_SCOPED_TIMER_ATOMIC(c, is_cancelled) \
|
||||
ScopedTimer<MonotonicStopWatch, std::atomic_bool> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)( \
|
||||
c, is_cancelled)
|
||||
#define SCOPED_RAW_TIMER(c) \
|
||||
doris::ScopedRawTimer<doris::MonotonicStopWatch, int64_t> MACRO_CONCAT(SCOPED_RAW_TIMER, \
|
||||
__COUNTER__)(c)
|
||||
|
||||
@ -57,7 +57,6 @@ VDataStreamRecvr::SenderQueue::~SenderQueue() {
|
||||
}
|
||||
|
||||
bool VDataStreamRecvr::SenderQueue::should_wait() {
|
||||
DCHECK(false) << "VDataStreamRecvr::SenderQueue::should_wait execute";
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
return !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0;
|
||||
}
|
||||
@ -69,16 +68,16 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
|
||||
VLOG_ROW << "wait arrival fragment_instance_id=" << _recvr->fragment_instance_id()
|
||||
<< " node=" << _recvr->dest_node_id();
|
||||
// Don't count time spent waiting on the sender as active time.
|
||||
CANCEL_SAFE_SCOPED_TIMER_ATOMIC(_recvr->_data_arrival_timer, &_is_cancelled);
|
||||
CANCEL_SAFE_SCOPED_TIMER_ATOMIC(
|
||||
CANCEL_SAFE_SCOPED_TIMER(_recvr->_data_arrival_timer, &_is_cancelled);
|
||||
CANCEL_SAFE_SCOPED_TIMER(
|
||||
_received_first_batch ? nullptr : _recvr->_first_batch_wait_total_timer,
|
||||
&_is_cancelled);
|
||||
_data_arrival_cv.wait(l);
|
||||
}
|
||||
return _inner_get_batch(block, eos);
|
||||
return _inner_get_batch_without_lock(block, eos);
|
||||
}
|
||||
|
||||
Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block* block, bool* eos) {
|
||||
Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) {
|
||||
if (_is_cancelled) {
|
||||
return Status::Cancelled("Cancelled");
|
||||
}
|
||||
@ -95,7 +94,6 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block* block, bool* eos)
|
||||
auto [next_block, block_byte_size] = std::move(_block_queue.front());
|
||||
_recvr->_blocks_memory_usage->add(-block_byte_size);
|
||||
_block_queue.pop_front();
|
||||
_update_block_queue_empty();
|
||||
|
||||
if (!_pending_closures.empty()) {
|
||||
auto closure_pair = _pending_closures.front();
|
||||
@ -133,14 +131,11 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
|
||||
auto pblock_byte_size = pblock.ByteSizeLong();
|
||||
COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size);
|
||||
|
||||
if (_num_remaining_senders <= 0) {
|
||||
DCHECK(_num_remaining_senders >= 0);
|
||||
if (_num_remaining_senders == 0) {
|
||||
DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
|
||||
return;
|
||||
}
|
||||
|
||||
if (_is_cancelled) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
BlockUPtr block = nullptr;
|
||||
@ -163,7 +158,6 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
|
||||
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
|
||||
|
||||
_block_queue.emplace_back(std::move(block), block_byte_size);
|
||||
_update_block_queue_empty();
|
||||
// if done is nullptr, this function can't delay this response
|
||||
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
|
||||
MonotonicStopWatch monotonicStopWatch;
|
||||
@ -208,7 +202,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
|
||||
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_bytes_received);
|
||||
|
||||
_block_queue.emplace_back(std::move(nblock), block_mem_size);
|
||||
_update_block_queue_empty();
|
||||
_data_arrival_cv.notify_one();
|
||||
|
||||
if (_recvr->exceeds_limit(block_mem_size)) {
|
||||
|
||||
@ -189,21 +189,22 @@ public:
|
||||
|
||||
void close();
|
||||
|
||||
bool queue_empty() { return _block_queue_empty; }
|
||||
bool queue_empty() {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
return _block_queue.empty();
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void _update_block_queue_empty() {}
|
||||
Status _inner_get_batch(Block* block, bool* eos);
|
||||
Status _inner_get_batch_without_lock(Block* block, bool* eos);
|
||||
|
||||
// Not managed by this class
|
||||
VDataStreamRecvr* _recvr;
|
||||
std::mutex _lock;
|
||||
std::atomic_bool _is_cancelled;
|
||||
std::atomic_int _num_remaining_senders;
|
||||
bool _is_cancelled;
|
||||
int _num_remaining_senders;
|
||||
std::condition_variable _data_arrival_cv;
|
||||
std::condition_variable _data_removal_cv;
|
||||
std::list<std::pair<BlockUPtr, size_t>> _block_queue;
|
||||
std::atomic_bool _block_queue_empty = true;
|
||||
|
||||
bool _received_first_batch;
|
||||
// sender_id
|
||||
@ -219,23 +220,22 @@ public:
|
||||
PipSenderQueue(VDataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile)
|
||||
: SenderQueue(parent_recvr, num_senders, profile) {}
|
||||
|
||||
bool should_wait() override {
|
||||
return !_is_cancelled && _block_queue_empty && _num_remaining_senders > 0;
|
||||
}
|
||||
|
||||
void _update_block_queue_empty() override { _block_queue_empty = _block_queue.empty(); }
|
||||
|
||||
Status get_batch(Block* block, bool* eos) override {
|
||||
CHECK(!should_wait()) << " _is_cancelled: " << _is_cancelled
|
||||
<< ", _block_queue_empty: " << _block_queue_empty
|
||||
<< ", _num_remaining_senders: " << _num_remaining_senders;
|
||||
std::lock_guard<std::mutex> l(_lock); // protect _block_queue
|
||||
return _inner_get_batch(block, eos);
|
||||
DCHECK(_is_cancelled || !_block_queue.empty() || _num_remaining_senders == 0)
|
||||
<< " _is_cancelled: " << _is_cancelled
|
||||
<< ", _block_queue_empty: " << _block_queue.empty()
|
||||
<< ", _num_remaining_senders: " << _num_remaining_senders;
|
||||
return _inner_get_batch_without_lock(block, eos);
|
||||
}
|
||||
|
||||
void add_block(Block* block, bool use_move) override {
|
||||
if (_is_cancelled || !block->rows()) {
|
||||
return;
|
||||
if (block->rows() == 0) return;
|
||||
{
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (_is_cancelled) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name());
|
||||
|
||||
@ -254,12 +254,14 @@ public:
|
||||
auto block_mem_size = nblock->allocated_bytes();
|
||||
{
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (_is_cancelled) {
|
||||
return;
|
||||
}
|
||||
_block_queue.emplace_back(std::move(nblock), block_mem_size);
|
||||
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
|
||||
_recvr->_blocks_memory_usage->add(block_mem_size);
|
||||
_data_arrival_cv.notify_one();
|
||||
}
|
||||
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
|
||||
_recvr->_blocks_memory_usage->add(block_mem_size);
|
||||
_update_block_queue_empty();
|
||||
_data_arrival_cv.notify_one();
|
||||
}
|
||||
};
|
||||
} // namespace vectorized
|
||||
|
||||
Reference in New Issue
Block a user