[fix](sink) fix pipeline load stuck #21636

This commit is contained in:
Xinyi Zou
2023-07-09 16:27:11 +08:00
committed by GitHub
parent 015426b2b4
commit bf61d2cfc0

View File

@ -109,6 +109,7 @@ public:
void Run() override {
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
DCHECK(packet_in_flight);
auto open_partition_failed = [this]() {
std::stringstream ss;
ss << "failed to open partition, error=" << berror(this->cntl.ErrorCode())
@ -128,7 +129,7 @@ public:
open_partition_failed();
}
}
done = true;
packet_in_flight = false;
}
void join() { brpc::Join(cntl.call_id()); }
@ -138,7 +139,7 @@ public:
VNodeChannel* vnode_channel;
IndexChannel* index_channel;
int64_t partition_id;
std::atomic<bool> done {false};
std::atomic<bool> packet_in_flight {false};
};
IndexChannel::~IndexChannel() {}
@ -549,6 +550,7 @@ void VNodeChannel::open_partition(int64_t partition_id) {
remain_ms = config::min_load_rpc_timeout_ms;
}
open_partition_closure->cntl.set_timeout_ms(remain_ms);
open_partition_closure->packet_in_flight = true;
_stub->open_partition(&open_partition_closure.get()->cntl, &request,
&open_partition_closure.get()->result, open_partition_closure.get());
@ -565,7 +567,7 @@ void VNodeChannel::open_partition_wait() {
bool VNodeChannel::open_partition_finished() const {
for (auto& open_partition_closure : _open_partition_closures) {
if (!open_partition_closure->done) {
if (open_partition_closure->packet_in_flight) {
return false;
}
}
@ -914,12 +916,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
}
bool VNodeChannel::is_send_data_rpc_done() const {
if (_add_block_closure != nullptr) {
return _add_batches_finished || (_cancelled && !_add_block_closure->is_packet_in_flight());
} else {
// such as, canceled before open_wait new closure.
return _add_batches_finished || _cancelled;
}
return _add_batches_finished || _cancelled;
}
Status VNodeChannel::close_wait(RuntimeState* state) {
@ -1498,8 +1495,9 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
}
bool VOlapTableSink::is_close_done() {
if (config::enable_lazy_open_partition && !_open_partition_done) {
return false;
// Only after try_close, need to wait rpc end.
if (!_try_close) {
return true;
}
bool close_done = true;
for (const auto& index_channel : _channels) {