[Bug](exchange) fix dcheck fail when VDataStreamRecvr input empty block (#22992)
fix dcheck fail when VDataStreamRecvr input empty block
This commit is contained in:
@ -57,8 +57,7 @@ bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragmen
|
||||
|
||||
SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
auto it = _shared_contexts.find(my_node_id);
|
||||
if (it == _shared_contexts.cend()) {
|
||||
if (!_shared_contexts.count(my_node_id)) {
|
||||
_shared_contexts.insert({my_node_id, std::make_shared<SharedHashTableContext>()});
|
||||
}
|
||||
return _shared_contexts[my_node_id];
|
||||
|
||||
@ -159,7 +159,11 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
|
||||
COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
|
||||
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
|
||||
|
||||
_block_queue.emplace_back(std::move(block), block_byte_size);
|
||||
bool empty = !block->rows();
|
||||
|
||||
if (!empty) {
|
||||
_block_queue.emplace_back(std::move(block), block_byte_size);
|
||||
}
|
||||
// if done is nullptr, this function can't delay this response
|
||||
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
|
||||
MonotonicStopWatch monotonicStopWatch;
|
||||
@ -169,7 +173,9 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
|
||||
*done = nullptr;
|
||||
}
|
||||
_recvr->_blocks_memory_usage->add(block_byte_size);
|
||||
_data_arrival_cv.notify_one();
|
||||
if (!empty) {
|
||||
_data_arrival_cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
|
||||
@ -205,8 +211,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
|
||||
COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
|
||||
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
|
||||
|
||||
_block_queue.emplace_back(std::move(nblock), block_mem_size);
|
||||
_data_arrival_cv.notify_one();
|
||||
bool empty = !nblock->rows();
|
||||
|
||||
if (!empty) {
|
||||
_block_queue.emplace_back(std::move(nblock), block_mem_size);
|
||||
_data_arrival_cv.notify_one();
|
||||
}
|
||||
|
||||
if (_recvr->exceeds_limit(block_mem_size)) {
|
||||
// yiguolei
|
||||
@ -384,8 +394,8 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
|
||||
}
|
||||
|
||||
bool VDataStreamRecvr::ready_to_read() {
|
||||
for (size_t i = 0; i < _sender_queues.size(); i++) {
|
||||
if (_sender_queues[i]->should_wait()) {
|
||||
for (const auto& queue : _sender_queues) {
|
||||
if (queue->should_wait()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -237,7 +237,9 @@ public:
|
||||
}
|
||||
|
||||
void add_block(Block* block, bool use_move) override {
|
||||
if (block->rows() == 0) return;
|
||||
if (block->rows() == 0) {
|
||||
return;
|
||||
}
|
||||
{
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (_is_cancelled) {
|
||||
|
||||
Reference in New Issue
Block a user