[Bug](exchange) init _instance_to_rpc_ctx on register_sink (#22976)
init _instance_to_rpc_ctx on register_sink
This commit is contained in:
@ -113,6 +113,7 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
|
||||
finst_id.set_hi(fragment_instance_id.hi);
|
||||
finst_id.set_lo(fragment_instance_id.lo);
|
||||
_instance_to_sending_by_pipeline[low_id] = true;
|
||||
_instance_to_rpc_ctx[low_id] = {};
|
||||
_instance_to_receiver_eof[low_id] = false;
|
||||
_instance_to_rpc_time[low_id] = 0;
|
||||
_construct_request(low_id, finst_id);
|
||||
@ -191,10 +192,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
|
||||
}
|
||||
auto* closure = request.channel->get_closure(id, request.eos, nullptr);
|
||||
|
||||
ExchangeRpcContext rpc_ctx;
|
||||
rpc_ctx._closure = closure;
|
||||
rpc_ctx.is_cancelled = false;
|
||||
_instance_to_rpc_ctx[id] = rpc_ctx;
|
||||
_instance_to_rpc_ctx[id]._closure = closure;
|
||||
_instance_to_rpc_ctx[id].is_cancelled = false;
|
||||
|
||||
closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
|
||||
if (config::exchange_sink_ignore_eovercrowded) {
|
||||
|
||||
Reference in New Issue
Block a user