diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index e8b35c16d1..dcd17dcb0a 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -65,7 +65,6 @@ NodeChannel::~NodeChannel() noexcept { delete _add_batch_closure; _add_batch_closure = nullptr; } - _cur_add_batch_request.release_id(); } // if "_cancelled" is set to true, @@ -144,8 +143,6 @@ void NodeChannel::open() { } _stub->tablet_writer_open(&_open_closure->cntl, &request, &_open_closure->result, _open_closure); - request.release_id(); - request.release_schema(); } void NodeChannel::_cancel_with_msg(const std::string& msg) { @@ -446,16 +443,13 @@ void NodeChannel::cancel(const std::string& cancel_msg) { auto closure = new RefCountClosure(); closure->ref(); - int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; - if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { - remain_ms = config::min_load_rpc_timeout_ms; - } + int remain_ms = std::max(_rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS, + (unsigned long long)config::min_load_rpc_timeout_ms); closure->cntl.set_timeout_ms(remain_ms); if (config::tablet_writer_ignore_eovercrowded) { closure->cntl.ignore_eovercrowded(); } _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); - request.release_id(); } int NodeChannel::try_send_and_fetch_status(RuntimeState* state,