From fb02bb5cd9ed04834efda9f150d843b05885307d Mon Sep 17 00:00:00 2001 From: HuangWei Date: Fri, 22 May 2020 09:11:59 +0800 Subject: [PATCH] [Load] Fix mem limit in NodeChannel (#3643) --- be/src/exec/tablet_sink.cpp | 10 +++++++++- be/src/exec/tablet_sink.h | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 45df64f6db..4941bb3634 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -59,6 +59,7 @@ Status NodeChannel::init(RuntimeState* state) { if (_node_info == nullptr) { std::stringstream ss; ss << "unknown node id, id=" << _node_id; + _cancelled = true; return Status::InternalError(ss.str()); } @@ -131,6 +132,11 @@ Status NodeChannel::open_wait() { } _open_closure = nullptr; + if (!status.ok()) { + _cancelled = true; + return status; + } + // add batch closure _add_batch_closure = ReusableClosure::create(); _add_batch_closure->addFailedHandler([this]() { @@ -179,7 +185,9 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, // so in the ideal case, mem limit is a matter for _plan node. // But there is still some unfinished things, we do mem limit here temporarily. - while (_parent->_mem_tracker->any_limit_exceeded()) { + // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. + // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). + while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && _pending_batches_num > 0) { SCOPED_RAW_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 9ff18cf773..47c9cfe096 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -166,6 +166,7 @@ public: // 0: stopped, send finished(eos request has been sent), or any internal error; // 1: running, haven't reach eos. // only allow 1 rpc in flight + // plz make sure, this func should be called after open_wait(). int try_send_and_fetch_status(); void time_report(std::unordered_map* add_batch_counter_map,