[Load] Fix mem limit in NodeChannel (#3643)
This commit is contained in:
@ -59,6 +59,7 @@ Status NodeChannel::init(RuntimeState* state) {
|
||||
if (_node_info == nullptr) {
|
||||
std::stringstream ss;
|
||||
ss << "unknown node id, id=" << _node_id;
|
||||
_cancelled = true;
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
|
||||
@ -131,6 +132,11 @@ Status NodeChannel::open_wait() {
|
||||
}
|
||||
_open_closure = nullptr;
|
||||
|
||||
if (!status.ok()) {
|
||||
_cancelled = true;
|
||||
return status;
|
||||
}
|
||||
|
||||
// add batch closure
|
||||
_add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create();
|
||||
_add_batch_closure->addFailedHandler([this]() {
|
||||
@ -179,7 +185,9 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
|
||||
// We use OlapTableSink mem_tracker which has the same ancestor of _plan node,
|
||||
// so in the ideal case, mem limit is a matter for _plan node.
|
||||
// But there is still some unfinished things, we do mem limit here temporarily.
|
||||
while (_parent->_mem_tracker->any_limit_exceeded()) {
|
||||
// _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
|
||||
// It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
|
||||
while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && _pending_batches_num > 0) {
|
||||
SCOPED_RAW_TIMER(&_mem_exceeded_block_ns);
|
||||
SleepFor(MonoDelta::FromMilliseconds(10));
|
||||
}
|
||||
|
||||
@ -166,6 +166,7 @@ public:
|
||||
// 0: stopped, send finished(eos request has been sent), or any internal error;
|
||||
// 1: running, haven't reach eos.
|
||||
// only allow 1 rpc in flight
|
||||
// plz make sure, this func should be called after open_wait().
|
||||
int try_send_and_fetch_status();
|
||||
|
||||
void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map,
|
||||
|
||||
Reference in New Issue
Block a user