diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 165576d1ab..e70be36b77 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -69,7 +69,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { _sink_buffer->set_query_statistics(_sink->query_statistics()); RETURN_IF_ERROR(DataSinkOperator::prepare(state)); - _sink->registe_channels(_sink_buffer.get()); + _sink->register_pipeline_channels(_sink_buffer.get()); return Status::OK(); } @@ -249,7 +249,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { std::string ExchangeSinkLocalState::name_suffix() { std::string name = " (id=" + std::to_string(_parent->node_id()); auto& p = _parent->cast(); - name += ",dest_id=" + std::to_string(p._dest_node_id); + name += ",dst_id=" + std::to_string(p._dest_node_id); name += ")"; return name; } @@ -450,7 +450,8 @@ Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec void ExchangeSinkLocalState::register_channels( pipeline::ExchangeSinkBuffer* buffer) { for (auto channel : channels) { - ((vectorized::PipChannel*)channel)->registe(buffer); + ((vectorized::PipChannel*)channel) + ->register_exchange_buffer(buffer); } } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 4d55e0fcb0..2f1451c5c4 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -791,9 +791,10 @@ Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** hol return Status::OK(); } -void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) { +void VDataStreamSender::register_pipeline_channels( + pipeline::ExchangeSinkBuffer* buffer) { for (auto channel : _channels) { - ((PipChannel*)channel)->registe(buffer); + ((PipChannel*)channel)->register_exchange_buffer(buffer); } } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 75a3bfd86a..d66295c870 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -129,7 +129,7 @@ public: RuntimeState* state() { return _state; } - void registe_channels(pipeline::ExchangeSinkBuffer* buffer); + void register_pipeline_channels(pipeline::ExchangeSinkBuffer* buffer); bool channel_all_can_write(); @@ -530,7 +530,7 @@ public: return Status::OK(); } - void registe(pipeline::ExchangeSinkBuffer* buffer) { + void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; _buffer->register_sink(Channel::_fragment_instance_id); }