[refactor](profile&names) using dst_id in pipelinex profile to be same as non pipeline; rename some function names (#28626)

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2023-12-19 17:44:29 +08:00
committed by GitHub
parent b99ac973d2
commit c72191eb9e
3 changed files with 9 additions and 7 deletions

View File

@ -69,7 +69,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
_sink_buffer->set_query_statistics(_sink->query_statistics());
RETURN_IF_ERROR(DataSinkOperator::prepare(state));
_sink->registe_channels(_sink_buffer.get());
_sink->register_pipeline_channels(_sink_buffer.get());
return Status::OK();
}
@ -249,7 +249,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
std::string ExchangeSinkLocalState::name_suffix() {
std::string name = " (id=" + std::to_string(_parent->node_id());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
name += ",dest_id=" + std::to_string(p._dest_node_id);
name += ",dst_id=" + std::to_string(p._dest_node_id);
name += ")";
return name;
}
@ -450,7 +450,8 @@ Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
void ExchangeSinkLocalState::register_channels(
pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer) {
for (auto channel : channels) {
((vectorized::PipChannel<ExchangeSinkLocalState>*)channel)->registe(buffer);
((vectorized::PipChannel<ExchangeSinkLocalState>*)channel)
->register_exchange_buffer(buffer);
}
}

View File

@ -791,9 +791,10 @@ Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** hol
return Status::OK();
}
void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer) {
void VDataStreamSender::register_pipeline_channels(
pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer) {
for (auto channel : _channels) {
((PipChannel<VDataStreamSender>*)channel)->registe(buffer);
((PipChannel<VDataStreamSender>*)channel)->register_exchange_buffer(buffer);
}
}

View File

@ -129,7 +129,7 @@ public:
RuntimeState* state() { return _state; }
void registe_channels(pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer);
void register_pipeline_channels(pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer);
bool channel_all_can_write();
@ -530,7 +530,7 @@ public:
return Status::OK();
}
void registe(pipeline::ExchangeSinkBuffer<Parent>* buffer) {
void register_exchange_buffer(pipeline::ExchangeSinkBuffer<Parent>* buffer) {
_buffer = buffer;
_buffer->register_sink(Channel<Parent>::_fragment_instance_id);
}