[refactor](rename) Rename some variables in pipeline for better readability (#29140)

* rft-rename

* format
This commit is contained in:
zhiqiang
2023-12-28 12:54:47 +08:00
committed by GitHub
parent 82a8232c8a
commit a7c0dddbc9
3 changed files with 29 additions and 25 deletions

View File

@ -71,14 +71,14 @@ Status Pipeline::prepare(RuntimeState* state) {
return Status::OK();
}
Status Pipeline::set_sink(OperatorBuilderPtr& sink_) {
if (_sink) {
Status Pipeline::set_sink_builder(OperatorBuilderPtr& sink_) {
if (_sink_builder) {
return Status::InternalError("set sink twice");
}
if (!sink_->is_sink()) {
return Status::InternalError("should set a sink operator but {}", typeid(sink_).name());
}
_sink = sink_;
_sink_builder = sink_;
return Status::OK();
}

View File

@ -96,10 +96,10 @@ public:
// prepare operators for pipelineX
Status prepare(RuntimeState* state);
Status set_sink(OperatorBuilderPtr& sink_operator);
Status set_sink_builder(OperatorBuilderPtr& sink_operator_builder);
Status set_sink(DataSinkOperatorXPtr& sink_operator);
OperatorBuilderBase* sink() { return _sink.get(); }
OperatorBuilderBase* get_sink_builder() { return _sink_builder.get(); }
DataSinkOperatorXBase* sink_x() { return _sink_x.get(); }
OperatorXs& operator_xs() { return operatorXs; }
DataSinkOperatorXPtr sink_shared_pointer() { return _sink_x; }
@ -185,7 +185,7 @@ private:
void _init_profile();
OperatorBuilders _operator_builders; // left is _source, right is _root
OperatorBuilderPtr _sink; // put block to sink
OperatorBuilderPtr _sink_builder; // put block to sink
std::mutex _depend_mutex;
std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;

View File

@ -35,6 +35,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "exec/data_sink.h"
#include "exec/exec_node.h"
#include "exec/scan_node.h"
@ -332,6 +333,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_runtime_state->set_num_local_sink(request.num_local_sink);
if (request.fragment.__isset.output_sink) {
// Here we build a DataSink object, which will be hold by DataSinkOperator
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, idx, _root_plan->row_desc(),
@ -343,6 +345,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_root_pipeline->set_collect_query_statistics_with_every_batch();
RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
if (_sink) {
// DataSinkOperator is builded here
RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
request.fragment.output_sink, _runtime_state.get()));
}
@ -366,14 +369,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
_total_tasks = 0;
for (PipelinePtr& pipeline : _pipelines) {
// if sink
auto sink = pipeline->sink()->build_operator();
auto sink_operator = pipeline->get_sink_builder()->build_operator();
// TODO pipeline 1 need to add new interface for exec node and operator
static_cast<void>(sink->init(request.fragment.output_sink));
RETURN_IF_ERROR(sink_operator->init(request.fragment.output_sink));
RETURN_IF_ERROR(pipeline->build_operators());
auto task = std::make_unique<PipelineTask>(pipeline, _total_tasks++, _runtime_state.get(),
sink, this, pipeline->pipeline_profile());
static_cast<void>(sink->set_child(task->get_root()));
auto task =
std::make_unique<PipelineTask>(pipeline, _total_tasks++, _runtime_state.get(),
sink_operator, this, pipeline->pipeline_profile());
RETURN_IF_ERROR(sink_operator->set_child(task->get_root()));
_tasks.emplace_back(std::move(task));
_runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr);
}
@ -524,7 +528,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline));
OperatorBuilderPtr child_sink_builder = std::make_shared<UnionSinkOperatorBuilder>(
union_node->id(), child_id, union_node, data_queue);
RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder));
RETURN_IF_ERROR(new_child_pipeline->set_sink_builder(child_sink_builder));
}
OperatorBuilderPtr source_builder = std::make_shared<UnionSourceOperatorBuilder>(
node->id(), union_node, data_queue);
@ -541,7 +545,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
OperatorBuilderPtr pre_agg_sink =
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
data_queue);
RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
OperatorBuilderPtr pre_agg_source =
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
@ -551,7 +555,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>(
node->id(), agg_node, data_queue);
RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
OperatorBuilderPtr pre_agg_source = std::make_shared<StreamingAggSourceOperatorBuilder>(
node->id(), agg_node, data_queue);
@ -559,7 +563,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
} else {
OperatorBuilderPtr agg_sink =
std::make_shared<AggSinkOperatorBuilder>(node->id(), agg_node);
RETURN_IF_ERROR(new_pipe->set_sink(agg_sink));
RETURN_IF_ERROR(new_pipe->set_sink_builder(agg_sink));
OperatorBuilderPtr agg_source =
std::make_shared<AggSourceOperatorBuilder>(node->id(), agg_node);
@ -572,7 +576,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
OperatorBuilderPtr sort_sink = std::make_shared<SortSinkOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink));
RETURN_IF_ERROR(new_pipeline->set_sink_builder(sort_sink));
OperatorBuilderPtr sort_source =
std::make_shared<SortSourceOperatorBuilder>(node->id(), node);
@ -585,7 +589,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
OperatorBuilderPtr partition_sort_sink =
std::make_shared<PartitionSortSinkOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink));
RETURN_IF_ERROR(new_pipeline->set_sink_builder(partition_sort_sink));
OperatorBuilderPtr partition_sort_source =
std::make_shared<PartitionSortSourceOperatorBuilder>(node->id(), node);
@ -598,7 +602,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
OperatorBuilderPtr analytic_sink =
std::make_shared<AnalyticSinkOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(new_pipeline->set_sink(analytic_sink));
RETURN_IF_ERROR(new_pipeline->set_sink_builder(analytic_sink));
OperatorBuilderPtr analytic_source =
std::make_shared<AnalyticSourceOperatorBuilder>(node->id(), node);
@ -637,7 +641,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
}
OperatorBuilderPtr join_sink =
std::make_shared<HashJoinBuildSinkBuilder>(node->id(), join_node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
RETURN_IF_ERROR(new_pipe->set_sink_builder(join_sink));
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr join_source =
@ -652,7 +656,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
OperatorBuilderPtr join_sink =
std::make_shared<NestLoopJoinBuildOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
RETURN_IF_ERROR(new_pipe->set_sink_builder(join_sink));
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr join_source =
@ -690,7 +694,7 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline));
OperatorBuilderPtr sink_builder =
std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(), node);
RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder));
RETURN_IF_ERROR(build_pipeline->set_sink_builder(sink_builder));
for (int child_id = 1; child_id < node->children_count(); ++child_id) {
auto probe_pipeline = add_pipeline();
@ -698,7 +702,7 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
OperatorBuilderPtr probe_sink_builder =
std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(), child_id,
node);
RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder));
RETURN_IF_ERROR(probe_pipeline->set_sink_builder(probe_sink_builder));
}
OperatorBuilderPtr source_builder =
@ -827,7 +831,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
sink_ = std::make_shared<MultiCastDataStreamSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
RETURN_IF_ERROR(_root_pipeline->set_sink(sink_));
RETURN_IF_ERROR(_root_pipeline->set_sink_builder(sink_));
auto& multi_cast_data_streamer =
assert_cast<vectorized::MultiCastDataStreamSink*>(_sink.get())
@ -862,7 +866,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
// 3. create and set sink operator of data stream sender for new pipeline
OperatorBuilderPtr sink_op_builder = std::make_shared<ExchangeSinkOperatorBuilder>(
next_operator_builder_id(), _multi_cast_stream_sink_senders[i].get(), i);
static_cast<void>(new_pipeline->set_sink(sink_op_builder));
static_cast<void>(new_pipeline->set_sink_builder(sink_op_builder));
// 4. init and prepare the data_stream_sender of diff exchange
TDataSink t;
@ -876,7 +880,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
default:
return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
}
return _root_pipeline->set_sink(sink_);
return _root_pipeline->set_sink_builder(sink_);
}
// If all pipeline tasks binded to the fragment instance are finished, then we could