[pipelineX](fix) Fix broadcast dependency hanging (#24740)

This commit is contained in:
Gabriel
2023-09-22 12:24:32 +08:00
committed by GitHub
parent d82b603b53
commit 034582bb64
62 changed files with 107 additions and 64 deletions

View File

@ -49,11 +49,12 @@ namespace vectorized {
void BroadcastPBlockHolder::unref() noexcept {
DCHECK_GT(_ref_count._value, 0);
_ref_count._value.fetch_sub(1);
if (_dep && _ref_count._value == 0) {
auto old_value = _ref_count._value.fetch_sub(1);
if (_dep && old_value == 1) {
_dep->return_available_block();
}
}
} // namespace vectorized
namespace pipeline {
@ -169,7 +170,8 @@ Status ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
}
template <typename Parent>
Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& request) {
Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& request,
[[maybe_unused]] bool* sent) {
if (_is_finishing) {
return Status::OK();
}
@ -178,6 +180,9 @@ Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
if (sent) {
*sent = true;
}
request.block_holder->ref();
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id.lo]);