cherry pick from #38031
This commit is contained in:
@ -878,11 +878,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
|
||||
Status VNodeChannel::close_wait(RuntimeState* state) {
|
||||
DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemoryReclamation::process_full_gc(); });
|
||||
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
|
||||
// set _is_closed to true finally
|
||||
Defer set_closed {[&]() {
|
||||
std::lock_guard<std::mutex> l(_closed_lock);
|
||||
_is_closed = true;
|
||||
}};
|
||||
|
||||
auto st = none_of({_cancelled, !_eos_is_produced});
|
||||
if (!st.ok()) {
|
||||
@ -906,8 +901,8 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
|
||||
VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
|
||||
_close_time_ms = UnixMillis() - _close_time_ms;
|
||||
|
||||
if (_cancelled || state->is_cancelled()) {
|
||||
cancel(state->cancel_reason());
|
||||
if (state->is_cancelled()) {
|
||||
_cancel_with_msg(state->cancel_reason());
|
||||
}
|
||||
|
||||
if (_add_batches_finished) {
|
||||
@ -919,6 +914,11 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
|
||||
_index_channel->set_error_tablet_in_state(state);
|
||||
_index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id);
|
||||
_index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, _node_id);
|
||||
|
||||
std::lock_guard<std::mutex> l(_closed_lock);
|
||||
// only when normal close, we set _is_closed to true.
|
||||
// otherwise, we will set it to true in cancel().
|
||||
_is_closed = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user