[Refactor](pipeline) Refactor operator and builder code of pipeline (#14787)

This commit is contained in:
HappenLee
2022-12-05 18:35:00 +08:00
committed by GitHub
parent 382d35c7e1
commit b30cd86e9e
41 changed files with 481 additions and 953 deletions

View File

@ -861,15 +861,6 @@ Status ExecNode::get_next_after_projects(RuntimeState* state, vectorized::Block*
return get_next(state, block, eos);
}
Status ExecNode::execute(RuntimeState* state, vectorized::Block* input_block,
vectorized::Block* output_block, bool* eos) {
return Status::NotSupported("{} not implements execute", get_name());
}
Status ExecNode::pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
return Status::NotSupported("{} not implements pull", get_name());
}
Status ExecNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) {
return Status::NotSupported("{} not implements sink", get_name());
}

View File

@ -55,7 +55,7 @@ class VExpr;
namespace pipeline {
class PipelineFragmentContext;
class Pipeline;
class Operator;
class OperatorBase;
} // namespace pipeline
using std::string;
@ -118,14 +118,11 @@ public:
// new interface to compatible new optimizers in FE
Status get_next_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos);
// Process data
// Eg: Projection, Union All, HashProbe
virtual Status execute(RuntimeState* state, vectorized::Block* input_block,
vectorized::Block* output_block, bool* eos);
// Emit data, both need impl with method: sink
// Eg: Aggregation, Sort
virtual Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos);
// Eg: Aggregation, Sort, Scan
virtual Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
return get_next(state, output_block, eos);
}
virtual Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) {
return Status::OK();
@ -247,7 +244,6 @@ public:
protected:
friend class DataSink;
friend class doris::pipeline::Operator;
/// Initialize 'buffer_pool_client_' and claim the initial reservation for this
/// ExecNode. Only needs to be called by ExecNodes that will use the client.
@ -411,7 +407,7 @@ protected:
std::atomic<bool> _can_read = false;
private:
friend class pipeline::Operator;
friend class pipeline::OperatorBase;
bool _is_closed;
bool _is_resource_released = false;
std::atomic_int _ref; // used by pipeline operator to release resource.

View File

@ -28,7 +28,6 @@ set(PIPELINE_FILES
task_scheduler.cpp
exec/operator.cpp
exec/scan_operator.cpp
exec/olap_scan_operator.cpp
exec/empty_set_operator.cpp
exec/exchange_source_operator.cpp
exec/exchange_sink_operator.cpp
@ -41,7 +40,9 @@ set(PIPELINE_FILES
exec/agg_context.cpp
exec/sort_source_operator.cpp
exec/sort_sink_operator.cpp
exec/repeat_operator.cpp)
exec/repeat_operator.cpp
exec/table_function_operator.cpp
)
add_library(Pipeline STATIC
${PIPELINE_FILES}

View File

@ -21,57 +21,6 @@
namespace doris::pipeline {
AggSinkOperator::AggSinkOperator(AggSinkOperatorBuilder* operator_builder,
vectorized::AggregationNode* agg_node)
: Operator(operator_builder), _agg_node(agg_node) {}
OPERATOR_CODE_GENERATOR(AggSinkOperator, Operator)
Status AggSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
_agg_node->increase_ref();
return Status::OK();
}
Status AggSinkOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(Operator::open(state));
RETURN_IF_ERROR(_agg_node->alloc_resource(state));
return Status::OK();
}
bool AggSinkOperator::can_write() {
return true;
}
Status AggSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return _agg_node->sink(state, in_block, source_state == SourceState::FINISHED);
}
Status AggSinkOperator::close(RuntimeState* state) {
_fresh_exec_timer(_agg_node);
if (!_agg_node->decrease_ref()) {
_agg_node->release_resource(state);
}
return Status::OK();
}
/////////////////////////////// operator template ////////////////////////////////
AggSinkOperatorBuilder::AggSinkOperatorBuilder(int32_t id, const std::string& name,
vectorized::AggregationNode* exec_node)
: OperatorBuilder(id, name, exec_node), _agg_node(exec_node) {}
OperatorPtr AggSinkOperatorBuilder::build_operator() {
return std::make_shared<AggSinkOperator>(this, _agg_node);
}
// use final aggregation source operator
bool AggSinkOperatorBuilder::is_sink() const {
return true;
}
bool AggSinkOperatorBuilder::is_source() const {
return false;
}
} // namespace doris::pipeline

View File

@ -28,37 +28,19 @@ class Block;
} // namespace vectorized
namespace pipeline {
class AggSinkOperatorBuilder;
class AggSinkOperator : public Operator {
class AggSinkOperatorBuilder final : public OperatorBuilder<vectorized::AggregationNode> {
public:
AggSinkOperator(AggSinkOperatorBuilder* operator_builder, vectorized::AggregationNode*);
Status prepare(RuntimeState*) override;
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override;
bool can_write() override;
Status close(RuntimeState* state) override;
Status finalize(doris::RuntimeState* state) override { return Status::OK(); }
private:
vectorized::AggregationNode* _agg_node;
};
class AggSinkOperatorBuilder : public OperatorBuilder {
public:
AggSinkOperatorBuilder(int32_t, const std::string&, vectorized::AggregationNode*);
AggSinkOperatorBuilder(int32_t, ExecNode*);
OperatorPtr build_operator() override;
bool is_sink() const override { return true; };
};
bool is_sink() const override;
bool is_source() const override;
private:
vectorized::AggregationNode* _agg_node;
class AggSinkOperator final : public Operator<AggSinkOperatorBuilder> {
public:
AggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
bool can_write() override { return true; };
};
} // namespace pipeline

View File

@ -22,46 +22,7 @@
namespace doris {
namespace pipeline {
AggregationSourceOperator::AggregationSourceOperator(OperatorBuilder* templ,
vectorized::AggregationNode* node)
: Operator(templ), _agg_node(node) {}
Status AggregationSourceOperator::prepare(RuntimeState* state) {
_agg_node->increase_ref();
return Status::OK();
}
bool AggregationSourceOperator::can_read() {
return _agg_node->can_read();
}
Status AggregationSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
bool eos = false;
RETURN_IF_ERROR(_agg_node->pull(state, block, &eos));
source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
return Status::OK();
}
Status AggregationSourceOperator::close(RuntimeState* state) {
_fresh_exec_timer(_agg_node);
if (!_agg_node->decrease_ref()) {
_agg_node->release_resource(state);
}
return Status::OK();
}
/////////////////////////////// operator template ////////////////////////////////
AggregationSourceOperatorBuilder::AggregationSourceOperatorBuilder(
int32_t id, const std::string& name, vectorized::AggregationNode* exec_node)
: OperatorBuilder(id, name, exec_node) {}
OperatorPtr AggregationSourceOperatorBuilder::build_operator() {
return std::make_shared<AggregationSourceOperator>(
this, assert_cast<vectorized::AggregationNode*>(_related_exec_node));
}
OPERATOR_CODE_GENERATOR(AggSourceOperator, Operator)
} // namespace pipeline
} // namespace doris

View File

@ -25,27 +25,19 @@ class AggregationNode;
namespace pipeline {
// For read none streaming agg sink operator's data
class AggregationSourceOperator : public Operator {
class AggSourceOperatorBuilder final : public OperatorBuilder<vectorized::AggregationNode> {
public:
AggregationSourceOperator(OperatorBuilder*, vectorized::AggregationNode*);
Status prepare(RuntimeState* state) override;
bool can_read() override;
Status close(RuntimeState* state) override;
Status get_block(RuntimeState*, vectorized::Block*, SourceState&) override;
private:
vectorized::AggregationNode* _agg_node;
};
class AggregationSourceOperatorBuilder : public OperatorBuilder {
public:
AggregationSourceOperatorBuilder(int32_t, const std::string&, vectorized::AggregationNode*);
AggSourceOperatorBuilder(int32_t, ExecNode*);
bool is_source() const override { return true; }
OperatorPtr build_operator() override;
};
class AggSourceOperator final : public Operator<AggSourceOperatorBuilder> {
public:
AggSourceOperator(OperatorBuilderBase*, ExecNode*);
};
} // namespace pipeline
} // namespace doris

View File

@ -21,24 +21,6 @@
namespace doris::pipeline {
EmptySetSourceOperator::EmptySetSourceOperator(EmptySetSourceOperatorBuilder* operator_builder,
vectorized::VEmptySetNode* empty_set_node)
: Operator(operator_builder), _empty_set_node(empty_set_node) {}
bool EmptySetSourceOperator::can_read() {
return true;
}
Status EmptySetSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
bool eos = false;
RETURN_IF_ERROR(_empty_set_node->get_next(state, block, &eos));
source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
return Status::OK();
}
EmptySetSourceOperatorBuilder::EmptySetSourceOperatorBuilder(
int32_t id, const string& name, vectorized::VEmptySetNode* empty_set_node)
: OperatorBuilder(id, name, empty_set_node), _empty_set_node(empty_set_node) {}
OPERATOR_CODE_GENERATOR(EmptySetSourceOperator, Operator)
} // namespace doris::pipeline

View File

@ -27,34 +27,19 @@ class VEmptySetNode;
namespace pipeline {
class EmptySetSourceOperatorBuilder;
class EmptySetSourceOperator : public Operator {
class EmptySetSourceOperatorBuilder final : public OperatorBuilder<vectorized::VEmptySetNode> {
public:
EmptySetSourceOperator(EmptySetSourceOperatorBuilder* operator_builder,
vectorized::VEmptySetNode* empty_set_node);
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
bool can_read() override;
private:
vectorized::VEmptySetNode* _empty_set_node;
};
class EmptySetSourceOperatorBuilder : public OperatorBuilder {
public:
EmptySetSourceOperatorBuilder(int32_t id, const std::string& name,
vectorized::VEmptySetNode* empty_set_node);
EmptySetSourceOperatorBuilder(int32_t id, ExecNode* empty_set_node);
bool is_source() const override { return true; }
OperatorPtr build_operator() override {
return std::make_shared<EmptySetSourceOperator>(this, _empty_set_node);
}
OperatorPtr build_operator() override;
};
private:
vectorized::VEmptySetNode* _empty_set_node;
class EmptySetSourceOperator final : public Operator<EmptySetSourceOperatorBuilder> {
public:
EmptySetSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* empty_set_node);
bool can_read() override { return true; };
};
} // namespace pipeline

View File

@ -27,70 +27,48 @@
namespace doris::pipeline {
ExchangeSinkOperator::ExchangeSinkOperator(OperatorBuilder* operator_builder,
vectorized::VDataStreamSender* sink,
PipelineFragmentContext* context)
: Operator(operator_builder), _sink(sink), _context(context) {}
ExchangeSinkOperatorBuilder::ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink,
PipelineFragmentContext* context)
: DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink), _context(context) {}
ExchangeSinkOperator::~ExchangeSinkOperator() = default;
Status ExchangeSinkOperator::init(ExecNode* exec_node, RuntimeState* state) {
RETURN_IF_ERROR(Operator::init(exec_node, state));
_state = state;
return Status::OK();
OperatorPtr ExchangeSinkOperatorBuilder::build_operator() {
return std::make_shared<ExchangeSinkOperator>(this, _sink, _context);
}
ExchangeSinkOperator::ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink,
PipelineFragmentContext* context)
: DataSinkOperator(operator_builder, sink), _context(context) {}
Status ExchangeSinkOperator::init(const TDataSink& tsink) {
RETURN_IF_ERROR(_sink->init(tsink));
PUniqueId query_id;
query_id.set_hi(_state->query_id().hi);
query_id.set_lo(_state->query_id().lo);
_sink_buffer =
std::make_unique<ExchangeSinkBuffer>(query_id, tsink.stream_sink.dest_node_id,
_sink->_sender_id, _state->be_number(), _context);
_dest_node_id = tsink.stream_sink.dest_node_id;
return Status::OK();
}
Status ExchangeSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
RETURN_IF_ERROR(_sink->prepare(state));
_sink->profile()->add_child(_runtime_profile.get(), true, nullptr);
_state = state;
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, _sink->_sender_id,
_state->be_number(), _context);
RETURN_IF_ERROR(DataSinkOperator::prepare(state));
_sink->registe_channels(_sink_buffer.get());
return Status::OK();
}
Status ExchangeSinkOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(_sink->open(state));
return Status::OK();
}
bool ExchangeSinkOperator::can_write() {
return _sink_buffer->can_write() && _sink->channel_all_can_write();
}
Status ExchangeSinkOperator::finalize(RuntimeState* state) {
Status result = Status::OK();
RETURN_IF_ERROR(_sink->close(state, result));
return result;
}
Status ExchangeSinkOperator::sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(_sink->send(state, block, source_state == SourceState::FINISHED));
return Status::OK();
}
bool ExchangeSinkOperator::is_pending_finish() const {
return _sink_buffer->is_pending_finish();
}
Status ExchangeSinkOperator::close(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperator::close(state));
_sink_buffer->close();
RETURN_IF_ERROR(Operator::close(state));
return Status::OK();
}

View File

@ -26,21 +26,27 @@ namespace doris {
namespace pipeline {
class PipelineFragmentContext;
// Now local exchange is not supported since VDataStreamRecvr is considered as a pipeline broker.
class ExchangeSinkOperator : public Operator {
class ExchangeSinkOperatorBuilder final
: public DataSinkOperatorBuilder<vectorized::VDataStreamSender> {
public:
ExchangeSinkOperator(OperatorBuilder* operator_builder, vectorized::VDataStreamSender* sink,
ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, PipelineFragmentContext* context);
OperatorPtr build_operator() override;
private:
PipelineFragmentContext* _context;
};
// Now local exchange is not supported since VDataStreamRecvr is considered as a pipeline broker.
class ExchangeSinkOperator final : public DataSinkOperator<ExchangeSinkOperatorBuilder> {
public:
ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink,
PipelineFragmentContext* context);
~ExchangeSinkOperator() override;
Status init(ExecNode* exec_node, RuntimeState* state = nullptr) override;
Status init(const TDataSink& tsink) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
bool can_write() override;
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override;
bool is_pending_finish() const override;
Status finalize(RuntimeState* state) override;
Status close(RuntimeState* state) override;
@ -48,28 +54,10 @@ public:
private:
std::unique_ptr<ExchangeSinkBuffer> _sink_buffer;
vectorized::VDataStreamSender* _sink;
int _dest_node_id = -1;
RuntimeState* _state = nullptr;
PipelineFragmentContext* _context;
};
class ExchangeSinkOperatorBuilder : public OperatorBuilder {
public:
ExchangeSinkOperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node,
vectorized::VDataStreamSender* sink,
PipelineFragmentContext* context)
: OperatorBuilder(id, name, exec_node), _sink(sink), _context(context) {}
bool is_sink() const override { return true; }
OperatorPtr build_operator() override {
return std::make_shared<ExchangeSinkOperator>(this, _sink, _context);
}
private:
vectorized::VDataStreamSender* _sink;
PipelineFragmentContext* _context;
};
} // namespace pipeline
} // namespace doris

View File

@ -23,41 +23,14 @@
namespace doris::pipeline {
ExchangeSourceOperator::ExchangeSourceOperator(OperatorBuilder* operator_builder,
vectorized::VExchangeNode* node)
: Operator(operator_builder), _exchange_node(node) {}
Status ExchangeSourceOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return _exchange_node->alloc_resource(state);
}
OPERATOR_CODE_GENERATOR(ExchangeSourceOperator, Operator)
bool ExchangeSourceOperator::can_read() {
return _exchange_node->_stream_recvr->ready_to_read();
}
Status ExchangeSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
bool eos = false;
auto st = _exchange_node->get_next(state, block, &eos);
source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
return st;
return _node->_stream_recvr->ready_to_read();
}
bool ExchangeSourceOperator::is_pending_finish() const {
// TODO HappenLee
return false;
}
Status ExchangeSourceOperator::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
_fresh_exec_timer(_exchange_node);
_exchange_node->release_resource(state);
return Operator::close(state);
}
} // namespace doris::pipeline

View File

@ -25,31 +25,20 @@ class VExchangeNode;
namespace doris::pipeline {
class ExchangeSourceOperator : public Operator {
class ExchangeSourceOperatorBuilder final : public OperatorBuilder<vectorized::VExchangeNode> {
public:
explicit ExchangeSourceOperator(OperatorBuilder*, vectorized::VExchangeNode*);
Status open(RuntimeState* state) override;
bool can_read() override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
bool is_pending_finish() const override;
Status close(RuntimeState* state) override;
private:
vectorized::VExchangeNode* _exchange_node;
};
class ExchangeSourceOperatorBuilder : public OperatorBuilder {
public:
ExchangeSourceOperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node)
: OperatorBuilder(id, name, exec_node) {}
ExchangeSourceOperatorBuilder(int32_t id, ExecNode* exec_node);
bool is_source() const override { return true; }
OperatorPtr build_operator() override {
return std::make_shared<ExchangeSourceOperator>(
this, reinterpret_cast<vectorized::VExchangeNode*>(_related_exec_node));
}
OperatorPtr build_operator() override;
};
class ExchangeSourceOperator final : public Operator<ExchangeSourceOperatorBuilder> {
public:
ExchangeSourceOperator(OperatorBuilderBase*, ExecNode*);
bool can_read() override;
bool is_pending_finish() const override;
};
} // namespace doris::pipeline

View File

@ -1,46 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <utility>
#include "scan_operator.h"
#include "vec/exec/scan/new_olap_scan_node.h"
namespace doris::pipeline {
class OlapScanOperatorBuilder;
class OlapScanOperator : public ScanOperator {
public:
OlapScanOperator(OperatorBuilder* operator_builder, vectorized::NewOlapScanNode* scan_node);
};
class OlapScanOperatorBuilder : public ScanOperatorBuilder {
public:
OlapScanOperatorBuilder(uint32_t id, const std::string& name,
vectorized::NewOlapScanNode* new_olap_scan_node);
OperatorPtr build_operator() override {
return std::make_shared<OlapScanOperator>(this, _new_olap_scan_node);
}
private:
vectorized::NewOlapScanNode* _new_olap_scan_node;
};
} // namespace doris::pipeline

View File

@ -19,36 +19,18 @@
namespace doris::pipeline {
Operator::Operator(OperatorBuilder* operator_builder)
OperatorBase::OperatorBase(OperatorBuilderBase* operator_builder)
: _operator_builder(operator_builder), _is_closed(false) {}
bool Operator::is_sink() const {
bool OperatorBase::is_sink() const {
return _operator_builder->is_sink();
}
bool Operator::is_source() const {
bool OperatorBase::is_source() const {
return _operator_builder->is_source();
}
Status Operator::init(ExecNode* exec_node, RuntimeState* state) {
_runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
if (exec_node) {
exec_node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
}
return Status::OK();
}
Status Operator::prepare(RuntimeState* state) {
_mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
_runtime_profile.get());
return Status::OK();
}
Status Operator::open(RuntimeState* state) {
return Status::OK();
}
Status Operator::close(RuntimeState* state) {
Status OperatorBase::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
@ -56,16 +38,11 @@ Status Operator::close(RuntimeState* state) {
return Status::OK();
}
const RowDescriptor& Operator::row_desc() {
const RowDescriptor& OperatorBase::row_desc() {
return _operator_builder->row_desc();
}
void Operator::_fresh_exec_timer(doris::ExecNode* node) {
node->_runtime_profile->total_time_counter()->update(
_runtime_profile->total_time_counter()->value());
}
std::string Operator::debug_string() const {
std::string OperatorBase::debug_string() const {
std::stringstream ss;
ss << _operator_builder->get_name() << ", source: " << is_source();
ss << ", sink: " << is_sink() << ", is closed: " << _is_closed;
@ -75,13 +52,13 @@ std::string Operator::debug_string() const {
/////////////////////////////////////// OperatorBuilder ////////////////////////////////////////////////////////////
Status OperatorBuilder::prepare(doris::RuntimeState* state) {
Status OperatorBuilderBase::prepare(doris::RuntimeState* state) {
_state = state;
// runtime filter, now dispose by NewOlapScanNode
return Status::OK();
}
void OperatorBuilder::close(doris::RuntimeState* state) {
void OperatorBuilderBase::close(doris::RuntimeState* state) {
if (_is_closed) {
return;
}

View File

@ -24,6 +24,15 @@
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
#define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS) \
NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node) \
: OperatorBuilder(id, #NAME, exec_node) {} \
\
OperatorPtr NAME##Builder::build_operator() { return std::make_shared<NAME>(this, _node); } \
\
NAME::NAME(OperatorBuilderBase* operator_builder, ExecNode* node) \
: SUBCLASS(operator_builder, node) {};
namespace doris::pipeline {
// Result of source pull data, init state is DEPEND_ON_SOURCE
@ -33,7 +42,6 @@ enum class SourceState : uint8_t {
FINISHED = 2
};
//
enum class SinkState : uint8_t {
SINK_IDLE = 0, // can send block to sink
SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
@ -41,16 +49,86 @@ enum class SinkState : uint8_t {
};
//////////////// DO NOT USE THE UP State ////////////////
class OperatorBuilder;
class Operator;
class OperatorBuilderBase;
class OperatorBase;
using OperatorPtr = std::shared_ptr<Operator>;
using OperatorPtr = std::shared_ptr<OperatorBase>;
using Operators = std::vector<OperatorPtr>;
class Operator {
using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
using OperatorBuilders = std::vector<OperatorBuilderPtr>;
class OperatorBuilderBase {
public:
explicit Operator(OperatorBuilder* operator_builder);
virtual ~Operator() = default;
OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {}
virtual ~OperatorBuilderBase() = default;
virtual OperatorPtr build_operator() = 0;
virtual bool is_sink() const { return false; }
virtual bool is_source() const { return false; }
// create the object used by all operator
virtual Status prepare(RuntimeState* state);
// destory the object used by all operator
virtual void close(RuntimeState* state);
std::string get_name() const { return _name; }
RuntimeState* runtime_state() { return _state; }
virtual const RowDescriptor& row_desc() = 0;
int32_t id() const { return _id; }
protected:
const int32_t _id;
const std::string _name;
RuntimeState* _state = nullptr;
bool _is_closed = false;
};
template <typename NodeType>
class OperatorBuilder : public OperatorBuilderBase {
public:
OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
: OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {}
virtual ~OperatorBuilder() = default;
const RowDescriptor& row_desc() override { return _node->row_desc(); }
NodeType* exec_node() const { return _node; }
protected:
NodeType* _node;
};
template <typename SinkType>
class DataSinkOperatorBuilder : public OperatorBuilderBase {
public:
DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* sink = nullptr)
: OperatorBuilderBase(id, name), _sink(reinterpret_cast<SinkType*>(sink)) {}
virtual ~DataSinkOperatorBuilder() = default;
bool is_sink() const override { return true; }
const RowDescriptor& row_desc() override { return _sink->row_desc(); }
SinkType* exec_node() const { return _sink; }
protected:
SinkType* _sink;
};
class OperatorBase {
public:
explicit OperatorBase(OperatorBuilderBase* operator_builder);
virtual ~OperatorBase() = default;
// After both sink and source need to know the cancel state.
// do cancel work
@ -58,14 +136,11 @@ public:
bool is_source() const;
// Should be call after ExecNode is constructed
virtual Status init(ExecNode* exec_node, RuntimeState* state = nullptr);
// Only result sink and data stream sink need to impl the virtual function
virtual Status init(const TDataSink& tsink) { return Status::OK(); };
// Do prepare some state of Operator
virtual Status prepare(RuntimeState* state);
virtual Status prepare(RuntimeState* state) = 0;
// Like ExecNode,when pipeline task first time be scheduled, can't block
// the pipeline should be open after dependencies is finish
@ -73,15 +148,15 @@ public:
// Now the pipeline only have one task, so the there is no performance bottleneck for the mechanism,
// but if one pipeline have multi task to parallel work, need to rethink the logic
//
// Each operator should call open_self() to prepare resource to do data compute.
// if ExecNode split to sink and source operator, open_self() should be called in sink operator
virtual Status open(RuntimeState* state);
// Each operator should call alloc_resource() to prepare resource to do data compute.
// if ExecNode split to sink and source operator, alloc_resource() should be called in sink operator
virtual Status open(RuntimeState* state) = 0;
// Release the resource, should not block the thread
//
// Each operator should call close_self() to release resource
// if ExecNode split to sink and source operator, close_self() should be called in source operator
virtual Status close(RuntimeState* state);
virtual Status close(RuntimeState* state) = 0;
Status set_child(OperatorPtr child) {
if (is_source()) {
@ -96,20 +171,14 @@ public:
virtual bool can_write() { return false; } // for sink
// for pipeline
virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
[[maybe_unused]] vectorized::Block* block,
[[maybe_unused]] SourceState& result_state) {
std::stringstream error_msg;
error_msg << " has not implements get_block";
return Status::NotSupported(error_msg.str());
}
virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
SourceState& result_state) {
return Status::OK();
};
// return can write continue
virtual Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) {
std::stringstream error_msg;
error_msg << " not a sink ";
return Status::NotSupported(error_msg.str());
}
virtual Status sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state) = 0;
virtual Status finalize(RuntimeState* state) {
std::stringstream error_msg;
@ -130,7 +199,7 @@ public:
MemTracker* mem_tracker() const { return _mem_tracker.get(); }
const OperatorBuilder* operator_builder() const { return _operator_builder; }
const OperatorBuilderBase* operator_builder() const { return _operator_builder; }
const RowDescriptor& row_desc();
@ -138,11 +207,9 @@ public:
std::string debug_string() const;
protected:
void _fresh_exec_timer(ExecNode* node);
std::unique_ptr<MemTracker> _mem_tracker;
OperatorBuilder* _operator_builder;
OperatorBuilderBase* _operator_builder;
// source has no child
// if an operator is not source, it will get data from its child.
OperatorPtr _child;
@ -155,44 +222,163 @@ private:
bool _is_closed = false;
};
class OperatorBuilder {
template <typename OperatorBuilderType>
class DataSinkOperator : public OperatorBase {
public:
OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
: _id(id), _name(name), _related_exec_node(exec_node) {}
using NodeType =
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
virtual ~OperatorBuilder() = default;
DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
: OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
virtual OperatorPtr build_operator() = 0;
virtual ~DataSinkOperator() = default;
virtual bool is_sink() const { return false; }
virtual bool is_source() const { return false; }
virtual Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(_sink->prepare(state));
_runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
_sink->profile()->insert_child_head(_runtime_profile.get(), true);
_mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
_runtime_profile.get());
return Status::OK();
}
// create the object used by all operator
virtual Status prepare(RuntimeState* state);
virtual Status open(RuntimeState* state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return _sink->open(state);
}
// destory the object used by all operator
virtual void close(RuntimeState* state);
virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
if (!UNLIKELY(in_block)) {
DCHECK(source_state == SourceState::FINISHED)
<< "block is null, eos should invoke in finalize.";
return Status::OK();
}
return _sink->send(state, in_block, source_state == SourceState::FINISHED);
}
std::string get_name() const { return _name; }
virtual Status close(RuntimeState* state) override {
_fresh_exec_timer(_sink);
return _sink->close(state, Status::OK());
}
RuntimeState* runtime_state() { return _state; }
const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
ExecNode* exec_node() const { return _related_exec_node; }
int32_t id() const { return _id; }
virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
protected:
const int32_t _id;
const std::string _name;
ExecNode* _related_exec_node;
void _fresh_exec_timer(NodeType* node) {
node->profile()->total_time_counter()->update(
_runtime_profile->total_time_counter()->value());
}
RuntimeState* _state = nullptr;
bool _is_closed = false;
NodeType* _sink;
};
using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
using OperatorBuilders = std::vector<OperatorBuilderPtr>;
template <typename OperatorBuilderType>
class Operator : public OperatorBase {
public:
using NodeType =
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
Operator(OperatorBuilderBase* builder, ExecNode* node)
: OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
virtual ~Operator() = default;
virtual Status prepare(RuntimeState* state) override {
_runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
_node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
_mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
_runtime_profile.get());
_node->increase_ref();
return Status::OK();
}
virtual Status open(RuntimeState* state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(_node->alloc_resource(state));
return Status::OK();
}
virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return _node->sink(state, in_block, source_state == SourceState::FINISHED);
}
virtual Status close(RuntimeState* state) override {
_fresh_exec_timer(_node);
if (!_node->decrease_ref()) {
_node->release_resource(state);
}
return Status::OK();
}
virtual Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
bool eos = false;
RETURN_IF_ERROR(_node->pull(state, block, &eos));
source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
return Status::OK();
}
virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
virtual bool can_read() override { return _node->can_read(); }
protected:
void _fresh_exec_timer(NodeType* node) {
node->runtime_profile()->total_time_counter()->update(
_runtime_profile->total_time_counter()->value());
}
NodeType* _node;
};
template <typename OperatorBuilderType>
class DataStateOperator : public Operator<OperatorBuilderType> {
public:
using NodeType =
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
: Operator<OperatorBuilderType>(builder, node),
_child_block(new vectorized::Block),
_child_source_state(SourceState::DEPEND_ON_SOURCE) {};
virtual ~DataStateOperator() = default;
virtual Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
auto& node = Operator<OperatorBuilderType>::_node;
auto& child = Operator<OperatorBuilderType>::_child;
if (node->need_more_input_data()) {
RETURN_IF_ERROR(child->get_block(state, _child_block.get(), _child_source_state));
source_state = _child_source_state;
if (_child_block->rows() == 0) {
return Status::OK();
}
node->push(state, _child_block.get(), source_state == SourceState::FINISHED);
}
bool eos = false;
RETURN_IF_ERROR(node->pull(state, block, &eos));
if (eos) {
source_state = SourceState::FINISHED;
_child_block->clear_column_data();
} else if (!node->need_more_input_data()) {
source_state = SourceState::MORE_DATA;
} else {
_child_block->clear_column_data();
}
return Status::OK();
}
protected:
std::unique_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;
};
} // namespace doris::pipeline

View File

@ -19,58 +19,8 @@
#include "vec/exec/vrepeat_node.h"
namespace doris {
namespace pipeline {
namespace doris::pipeline {
RepeatOperator::RepeatOperator(RepeatOperatorBuilder* operator_builder,
vectorized::VRepeatNode* repeat_node)
: Operator(operator_builder), _repeat_node(repeat_node) {}
OPERATOR_CODE_GENERATOR(RepeatOperator, DataStateOperator)
Status RepeatOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(Operator::open(state));
_child_block.reset(new vectorized::Block);
return _repeat_node->alloc_resource(state);
}
Status RepeatOperator::close(RuntimeState* state) {
_fresh_exec_timer(_repeat_node);
_repeat_node->release_resource(state);
Operator::close(state);
return Status::OK();
}
Status RepeatOperator::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
if (_repeat_node->need_more_input_data()) {
RETURN_IF_ERROR(_child->get_block(state, _child_block.get(), _child_source_state));
source_state = _child_source_state;
if (_child_block->rows() == 0) {
return Status::OK();
}
_repeat_node->push(state, _child_block.get(), source_state == SourceState::FINISHED);
}
bool eos = false;
RETURN_IF_ERROR(_repeat_node->pull(state, block, &eos));
if (eos) {
source_state = SourceState::FINISHED;
_child_block->clear_column_data();
} else if (!_repeat_node->need_more_input_data()) {
source_state = SourceState::MORE_DATA;
} else {
_child_block->clear_column_data();
}
return Status::OK();
}
RepeatOperatorBuilder::RepeatOperatorBuilder(int32_t id, vectorized::VRepeatNode* repeat_node)
: OperatorBuilder(id, "RepeatOperatorBuilder", repeat_node), _repeat_node(repeat_node) {}
OperatorPtr RepeatOperatorBuilder::build_operator() {
return std::make_shared<RepeatOperator>(this, _repeat_node);
}
} // namespace pipeline
} // namespace doris
} // namespace doris::pipeline

View File

@ -26,32 +26,17 @@ class VExprContext;
class Block;
} // namespace vectorized
namespace pipeline {
class RepeatOperatorBuilder;
class RepeatOperator : public Operator {
class RepeatOperatorBuilder final : public OperatorBuilder<vectorized::VRepeatNode> {
public:
RepeatOperator(RepeatOperatorBuilder* operator_builder, vectorized::VRepeatNode* repeat_node);
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
private:
vectorized::VRepeatNode* _repeat_node;
std::unique_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;
};
class RepeatOperatorBuilder : public OperatorBuilder {
public:
RepeatOperatorBuilder(int32_t id, vectorized::VRepeatNode* repeat_node);
RepeatOperatorBuilder(int32_t id, ExecNode* repeat_node);
OperatorPtr build_operator() override;
};
private:
vectorized::VRepeatNode* _repeat_node;
class RepeatOperator final : public DataStateOperator<RepeatOperatorBuilder> {
public:
RepeatOperator(OperatorBuilderBase* operator_builder, ExecNode* repeat_node);
};
} // namespace pipeline

View File

@ -21,49 +21,18 @@
#include "vec/sink/vresult_sink.h"
namespace doris::pipeline {
ResultSinkOperator::ResultSinkOperator(OperatorBuilder* operator_builder,
vectorized::VResultSink* sink)
: Operator(operator_builder), _sink(sink) {}
Status ResultSinkOperator::init(const TDataSink& tsink) {
return Status::OK();
ResultSinkOperatorBuilder::ResultSinkOperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "ResultSinkOperator", sink) {};
OperatorPtr ResultSinkOperatorBuilder::build_operator() {
return std::make_shared<ResultSinkOperator>(this, _sink);
}
Status ResultSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
return _sink->prepare(state);
}
Status ResultSinkOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return _sink->open(state);
}
ResultSinkOperator::ResultSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
: DataSinkOperator(operator_builder, sink) {};
bool ResultSinkOperator::can_write() {
return _sink->_sender->can_sink();
}
Status ResultSinkOperator::sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
if (!block) {
DCHECK(source_state == SourceState::FINISHED)
<< "block is null, eos should invoke in finalize.";
return Status::OK();
}
return _sink->send(state, block);
}
Status ResultSinkOperator::finalize(RuntimeState* state) {
_finalized = true;
return _sink->close(state, Status::OK());
}
// TODO: Support fresh exec time for sink
Status ResultSinkOperator::close(RuntimeState* state) {
if (!_finalized) {
RETURN_IF_ERROR(_sink->close(state, Status::InternalError("Not finalized")));
}
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -26,43 +26,18 @@ class VResultSink;
namespace pipeline {
class ResultSinkOperator : public Operator {
class ResultSinkOperatorBuilder final : public DataSinkOperatorBuilder<vectorized::VResultSink> {
public:
ResultSinkOperator(OperatorBuilder* operator_builder, vectorized::VResultSink* sink);
ResultSinkOperatorBuilder(int32_t id, DataSink* sink);
Status init(const TDataSink& tsink) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
bool can_write() override;
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override;
Status finalize(RuntimeState* state) override;
Status close(RuntimeState* state) override;
private:
vectorized::VResultSink* _sink;
bool _finalized = false;
OperatorPtr build_operator() override;
};
class ResultSinkOperatorBuilder : public OperatorBuilder {
class ResultSinkOperator final : public DataSinkOperator<ResultSinkOperatorBuilder> {
public:
ResultSinkOperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node,
vectorized::VResultSink* sink)
: OperatorBuilder(id, name, exec_node), _sink(sink) {}
ResultSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink);
bool is_sink() const override { return true; }
OperatorPtr build_operator() override {
return std::make_shared<ResultSinkOperator>(this, _sink);
}
private:
vectorized::VResultSink* _sink;
bool can_write() override;
};
} // namespace pipeline

View File

@ -22,47 +22,35 @@
namespace doris::pipeline {
ScanOperator::ScanOperator(OperatorBuilder* operator_builder, vectorized::VScanNode* scan_node)
: Operator(operator_builder), _scan_node(scan_node) {}
OPERATOR_CODE_GENERATOR(ScanOperator, Operator)
Status ScanOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(Operator::open(state));
return _scan_node->open(state);
return _node->open(state);
}
bool ScanOperator::can_read() {
if (_scan_node->_eos || !_scan_node->_scanner_ctx || _scan_node->_scanner_ctx->done() ||
_scan_node->_scanner_ctx->can_finish()) {
if (_node->_eos || !_node->_scanner_ctx || _node->_scanner_ctx->done() ||
_node->_scanner_ctx->can_finish()) {
// _eos: need eos
// !_scanner_ctx: need call open
// _scanner_ctx->done(): need finish
// _scanner_ctx->can_finish(): should be scheduled
return true;
} else {
return !_scan_node->_scanner_ctx->empty_in_queue(); // have block to process
return !_node->_scanner_ctx->empty_in_queue(); // have block to process
}
}
Status ScanOperator::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& result_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
bool eos = false;
RETURN_IF_ERROR(_scan_node->get_next(state, block, &eos));
result_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
return Status::OK();
}
bool ScanOperator::is_pending_finish() const {
return _scan_node->_scanner_ctx && !_scan_node->_scanner_ctx->can_finish();
return _node->_scanner_ctx && !_node->_scanner_ctx->can_finish();
}
Status ScanOperator::close(RuntimeState* state) {
if (!is_closed()) {
RETURN_IF_ERROR(_scan_node->close(state));
}
_fresh_exec_timer(_scan_node);
return Operator::close(state);
RETURN_IF_ERROR(Operator::close(state));
_node->close(state);
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -29,31 +29,24 @@ class ScannerContext;
namespace doris::pipeline {
class ScanOperator : public Operator {
class ScanOperatorBuilder : public OperatorBuilder<vectorized::VScanNode> {
public:
ScanOperator(OperatorBuilder* operator_builder, vectorized::VScanNode* scan_node);
ScanOperatorBuilder(int32_t id, ExecNode* exec_node);
bool is_source() const override { return true; }
OperatorPtr build_operator() override;
};
class ScanOperator : public Operator<ScanOperatorBuilder> {
public:
ScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node);
bool can_read() override; // for source
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& result_state) override;
bool is_pending_finish() const override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
private:
vectorized::VScanNode* _scan_node;
};
class ScanOperatorBuilder : public OperatorBuilder {
public:
ScanOperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node)
: OperatorBuilder(id, name, exec_node) {}
bool is_source() const override { return true; }
};
} // namespace doris::pipeline

View File

@ -21,31 +21,6 @@
namespace doris::pipeline {
SortSinkOperatorBuilder::SortSinkOperatorBuilder(int32_t id, const string& name,
vectorized::VSortNode* sort_node)
: OperatorBuilder(id, name, sort_node), _sort_node(sort_node) {}
OPERATOR_CODE_GENERATOR(SortSinkOperator, Operator)
SortSinkOperator::SortSinkOperator(SortSinkOperatorBuilder* operator_builder,
vectorized::VSortNode* sort_node)
: Operator(operator_builder), _sort_node(sort_node) {}
Status SortSinkOperator::open(doris::RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(Operator::open(state));
RETURN_IF_ERROR(_sort_node->alloc_resource(state));
return Status::OK();
}
Status SortSinkOperator::close(doris::RuntimeState* /*state*/) {
_fresh_exec_timer(_sort_node);
return Status::OK();
}
Status SortSinkOperator::sink(doris::RuntimeState* state, vectorized::Block* block,
SourceState source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
// TODO pipeline when sort node's _reuse_mem is false, we should pass a new block to it.
RETURN_IF_ERROR(_sort_node->sink(state, block, source_state == SourceState::FINISHED));
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -29,41 +29,20 @@ class VSortNode;
namespace pipeline {
class SortSinkOperatorBuilder;
class SortSinkOperator : public Operator {
class SortSinkOperatorBuilder final : public OperatorBuilder<vectorized::VSortNode> {
public:
SortSinkOperator(SortSinkOperatorBuilder* operator_builder, vectorized::VSortNode* sort_node);
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
// return can write continue
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override;
Status finalize(RuntimeState* /*state*/) override { return Status::OK(); }
bool can_write() override { return true; };
private:
vectorized::VSortNode* _sort_node;
};
class SortSinkOperatorBuilder : public OperatorBuilder {
public:
SortSinkOperatorBuilder(int32_t id, const std::string& name, vectorized::VSortNode* sort_node);
SortSinkOperatorBuilder(int32_t id, ExecNode* sort_node);
bool is_sink() const override { return true; }
bool is_source() const override { return false; }
OperatorPtr build_operator() override;
};
OperatorPtr build_operator() override {
return std::make_shared<SortSinkOperator>(this, _sort_node);
}
class SortSinkOperator final : public Operator<SortSinkOperatorBuilder> {
public:
SortSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node);
private:
vectorized::VSortNode* _sort_node;
bool can_write() override { return true; };
};
} // namespace pipeline

View File

@ -21,34 +21,6 @@
namespace doris::pipeline {
SortSourceOperatorBuilder::SortSourceOperatorBuilder(int32_t id, const string& name,
vectorized::VSortNode* sort_node)
: OperatorBuilder(id, name, sort_node), _sort_node(sort_node) {}
SortSourceOperator::SortSourceOperator(SortSourceOperatorBuilder* operator_builder,
vectorized::VSortNode* sort_node)
: Operator(operator_builder), _sort_node(sort_node) {}
Status SortSourceOperator::close(doris::RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
_fresh_exec_timer(_sort_node);
_sort_node->release_resource(state);
return Operator::close(state);
}
bool SortSourceOperator::can_read() {
return _sort_node->can_read();
}
Status SortSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
bool eos = false;
RETURN_IF_ERROR(_sort_node->pull(state, block, &eos));
source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
return Status::OK();
}
OPERATOR_CODE_GENERATOR(SortSourceOperator, Operator)
} // namespace doris::pipeline

View File

@ -29,39 +29,18 @@ class VSortNode;
namespace pipeline {
class SortSourceOperatorBuilder;
class SortSourceOperator : public Operator {
class SortSourceOperatorBuilder final : public OperatorBuilder<vectorized::VSortNode> {
public:
SortSourceOperator(SortSourceOperatorBuilder* operator_builder,
vectorized::VSortNode* sort_node);
Status close(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
bool can_read() override;
private:
vectorized::VSortNode* _sort_node;
};
class SortSourceOperatorBuilder : public OperatorBuilder {
public:
SortSourceOperatorBuilder(int32_t id, const std::string& name,
vectorized::VSortNode* sort_node);
bool is_sink() const override { return false; }
SortSourceOperatorBuilder(int32_t id, ExecNode* sort_node);
bool is_source() const override { return true; }
OperatorPtr build_operator() override {
return std::make_shared<SortSourceOperator>(this, _sort_node);
}
OperatorPtr build_operator() override;
};
private:
vectorized::VSortNode* _sort_node;
class SortSourceOperator final : public Operator<SortSourceOperatorBuilder> {
public:
SortSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node);
};
} // namespace pipeline

View File

@ -21,10 +21,10 @@
namespace doris::pipeline {
StreamingAggSinkOperator::StreamingAggSinkOperator(
StreamingAggSinkOperatorBuilder* operator_builder, vectorized::AggregationNode* agg_node,
std::shared_ptr<AggContext> agg_context)
: Operator(operator_builder), _agg_node(agg_node), _agg_context(std::move(agg_context)) {}
StreamingAggSinkOperator::StreamingAggSinkOperator(OperatorBuilderBase* operator_builder,
ExecNode* agg_node,
std::shared_ptr<AggContext> agg_context)
: Operator(operator_builder, agg_node), _agg_context(std::move(agg_context)) {}
Status StreamingAggSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
@ -34,13 +34,6 @@ Status StreamingAggSinkOperator::prepare(RuntimeState* state) {
return Status::OK();
}
Status StreamingAggSinkOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(Operator::open(state));
RETURN_IF_ERROR(_agg_node->alloc_resource(state));
return Status::OK();
}
bool StreamingAggSinkOperator::can_write() {
// sink and source in diff threads
return _agg_context->has_enough_space_to_push();
@ -52,7 +45,7 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in
Status ret = Status::OK();
if (in_block && in_block->rows() > 0) {
auto bock_from_ctx = _agg_context->get_free_block();
RETURN_IF_ERROR(_agg_node->do_pre_agg(in_block, bock_from_ctx.get()));
RETURN_IF_ERROR(_node->do_pre_agg(in_block, bock_from_ctx.get()));
if (bock_from_ctx->rows() == 0) {
_agg_context->return_free_block(std::move(bock_from_ctx));
} else {
@ -67,7 +60,7 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in
}
Status StreamingAggSinkOperator::close(RuntimeState* state) {
_fresh_exec_timer(_agg_node);
Operator::close(state);
if (_agg_context && !_agg_context->is_finish()) {
// finish should be set, if not set here means error.
_agg_context->set_canceled();
@ -77,25 +70,13 @@ Status StreamingAggSinkOperator::close(RuntimeState* state) {
return Status::OK();
}
/////////////////////////////// operator template ////////////////////////////////
StreamingAggSinkOperatorBuilder::StreamingAggSinkOperatorBuilder(
int32_t id, const std::string& name, vectorized::AggregationNode* exec_node,
std::shared_ptr<AggContext> agg_context)
: OperatorBuilder(id, name, exec_node),
_agg_node(exec_node),
int32_t id, ExecNode* exec_node, std::shared_ptr<AggContext> agg_context)
: OperatorBuilder(id, "StreamingAggSinkOperator", exec_node),
_agg_context(std::move(agg_context)) {}
OperatorPtr StreamingAggSinkOperatorBuilder::build_operator() {
return std::make_shared<StreamingAggSinkOperator>(this, _agg_node, _agg_context);
return std::make_shared<StreamingAggSinkOperator>(this, _node, _agg_context);
}
// use final aggregation source operator
bool StreamingAggSinkOperatorBuilder::is_sink() const {
return true;
}
bool StreamingAggSinkOperatorBuilder::is_source() const {
return false;
}
} // namespace doris::pipeline

View File

@ -28,26 +28,34 @@ class Block;
} // namespace vectorized
namespace pipeline {
class StreamingAggSinkOperatorBuilder;
class StreamingAggSinkOperator : public Operator {
class StreamingAggSinkOperatorBuilder final : public OperatorBuilder<vectorized::AggregationNode> {
public:
StreamingAggSinkOperator(StreamingAggSinkOperatorBuilder* operator_builder,
vectorized::AggregationNode*, std::shared_ptr<AggContext>);
StreamingAggSinkOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<AggContext>);
OperatorPtr build_operator() override;
bool is_sink() const override { return true; };
bool is_source() const override { return false; };
private:
std::shared_ptr<AggContext> _agg_context;
};
class StreamingAggSinkOperator final : public Operator<StreamingAggSinkOperatorBuilder> {
public:
StreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode*,
std::shared_ptr<AggContext>);
Status prepare(RuntimeState*) override;
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override;
bool can_write() override;
Status close(RuntimeState* state) override;
Status finalize(doris::RuntimeState* state) override { return Status::OK(); }
private:
vectorized::AggregationNode* _agg_node;
vectorized::Block _preagg_block = vectorized::Block();
RuntimeProfile::Counter* _queue_byte_size_counter;
@ -56,20 +64,5 @@ private:
std::shared_ptr<AggContext> _agg_context;
};
class StreamingAggSinkOperatorBuilder : public OperatorBuilder {
public:
StreamingAggSinkOperatorBuilder(int32_t, const std::string&, vectorized::AggregationNode*,
std::shared_ptr<AggContext>);
OperatorPtr build_operator() override;
bool is_sink() const override;
bool is_source() const override;
private:
vectorized::AggregationNode* _agg_node;
std::shared_ptr<AggContext> _agg_context;
};
} // namespace pipeline
} // namespace doris

View File

@ -21,15 +21,9 @@
namespace doris {
namespace pipeline {
StreamingAggSourceOperator::StreamingAggSourceOperator(OperatorBuilder* templ,
vectorized::AggregationNode* node,
StreamingAggSourceOperator::StreamingAggSourceOperator(OperatorBuilderBase* templ, ExecNode* node,
std::shared_ptr<AggContext> agg_context)
: Operator(templ), _agg_node(node), _agg_context(std::move(agg_context)) {}
Status StreamingAggSourceOperator::prepare(RuntimeState* state) {
_agg_node->increase_ref();
return Status::OK();
}
: Operator(templ, node), _agg_context(std::move(agg_context)) {}
bool StreamingAggSourceOperator::can_read() {
return _agg_context->has_data_or_finished();
@ -44,14 +38,14 @@ Status StreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Bl
RETURN_IF_ERROR(_agg_context->get_block(&agg_block));
if (_agg_context->data_exhausted()) {
RETURN_IF_ERROR(_agg_node->pull(state, block, &eos));
RETURN_IF_ERROR(_node->pull(state, block, &eos));
} else {
block->swap(*agg_block);
agg_block->clear_column_data(_agg_node->row_desc().num_materialized_slots());
agg_block->clear_column_data(_node->row_desc().num_materialized_slots());
_agg_context->return_free_block(std::move(agg_block));
}
} else {
RETURN_IF_ERROR(_agg_node->pull(state, block, &eos));
RETURN_IF_ERROR(_node->pull(state, block, &eos));
}
source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
@ -59,27 +53,13 @@ Status StreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Bl
return Status::OK();
}
Status StreamingAggSourceOperator::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
_fresh_exec_timer(_agg_node);
if (!_agg_node->decrease_ref()) {
_agg_node->release_resource(state);
}
return Operator::close(state);
}
/////////////////////////////// operator template ////////////////////////////////
StreamingAggSourceOperatorBuilder::StreamingAggSourceOperatorBuilder(
int32_t id, const std::string& name, vectorized::AggregationNode* exec_node,
std::shared_ptr<AggContext> agg_context)
: OperatorBuilder(id, name, exec_node), _agg_context(std::move(agg_context)) {}
int32_t id, ExecNode* exec_node, std::shared_ptr<AggContext> agg_context)
: OperatorBuilder(id, "StreamingAggSourceOperator", exec_node),
_agg_context(std::move(agg_context)) {}
OperatorPtr StreamingAggSourceOperatorBuilder::build_operator() {
return std::make_shared<StreamingAggSourceOperator>(
this, assert_cast<vectorized::AggregationNode*>(_related_exec_node), _agg_context);
return std::make_shared<StreamingAggSourceOperator>(this, _node, _agg_context);
}
} // namespace pipeline

View File

@ -25,24 +25,10 @@ class AggregationNode;
}
namespace pipeline {
class StreamingAggSourceOperator : public Operator {
class StreamingAggSourceOperatorBuilder final
: public OperatorBuilder<vectorized::AggregationNode> {
public:
StreamingAggSourceOperator(OperatorBuilder*, vectorized::AggregationNode*,
std::shared_ptr<AggContext>);
Status prepare(RuntimeState* state) override;
bool can_read() override;
Status close(RuntimeState* state) override;
Status get_block(RuntimeState*, vectorized::Block*, SourceState& source_state) override;
private:
vectorized::AggregationNode* _agg_node;
std::shared_ptr<AggContext> _agg_context;
};
class StreamingAggSourceOperatorBuilder : public OperatorBuilder {
public:
StreamingAggSourceOperatorBuilder(int32_t, const std::string&, vectorized::AggregationNode*,
std::shared_ptr<AggContext>);
StreamingAggSourceOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<AggContext>);
bool is_source() const override { return true; }
@ -52,5 +38,15 @@ private:
std::shared_ptr<AggContext> _agg_context;
};
class StreamingAggSourceOperator final : public Operator<StreamingAggSourceOperatorBuilder> {
public:
StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr<AggContext>);
bool can_read() override;
Status get_block(RuntimeState*, vectorized::Block*, SourceState& source_state) override;
private:
std::shared_ptr<AggContext> _agg_context;
};
} // namespace pipeline
} // namespace doris

View File

@ -15,19 +15,10 @@
// specific language governing permissions and limitations
// under the License.
#include "olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "table_function_operator.h"
namespace doris::pipeline {
OlapScanOperator::OlapScanOperator(OperatorBuilder* operator_builder,
vectorized::NewOlapScanNode* scan_node)
: ScanOperator(operator_builder, scan_node) {}
OPERATOR_CODE_GENERATOR(TableFunctionOperator, DataStateOperator)
OlapScanOperatorBuilder::OlapScanOperatorBuilder(uint32_t id, const std::string& name,
vectorized::NewOlapScanNode* new_olap_scan_node)
: ScanOperatorBuilder(id, name, new_olap_scan_node),
_new_olap_scan_node(new_olap_scan_node) {}
} // namespace doris::pipeline
} // namespace doris::pipeline

View File

@ -20,73 +20,17 @@
#include "operator.h"
#include "vec/exec/vtable_function_node.h"
namespace doris {
namespace doris::pipeline {
namespace pipeline {
class TableFunctionOperator;
class TableFunctionOperatorBuilder : public OperatorBuilder {
class TableFunctionOperatorBuilder final : public OperatorBuilder<vectorized::VTableFunctionNode> {
public:
TableFunctionOperatorBuilder(int32_t id, vectorized::VTableFunctionNode* node)
: OperatorBuilder(id, "TableFunctionOperatorBuilder", node), _node(node) {}
TableFunctionOperatorBuilder(int32_t id, ExecNode* node);
OperatorPtr build_operator() override;
private:
vectorized::VTableFunctionNode* _node;
};
class TableFunctionOperator : public Operator {
class TableFunctionOperator final : public DataStateOperator<TableFunctionOperatorBuilder> {
public:
TableFunctionOperator(TableFunctionOperatorBuilder* operator_builder,
vectorized::VTableFunctionNode* node)
: Operator(operator_builder), _node(node) {}
Status open(RuntimeState* state) override {
RETURN_IF_ERROR(Operator::open(state));
_child_block.reset(new vectorized::Block);
return _node->alloc_resource(state);
}
Status close(RuntimeState* state) override {
_node->release_resource(state);
_fresh_exec_timer(_node);
return Operator::close(state);
}
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
if (_node->need_more_input_data()) {
RETURN_IF_ERROR(_child->get_block(state, _child_block.get(), _child_source_state));
source_state = _child_source_state;
if (_child_block->rows() == 0) {
return Status::OK();
}
_node->push(state, _child_block.get(), source_state == SourceState::FINISHED);
}
bool eos = false;
RETURN_IF_ERROR(_node->pull(state, block, &eos));
if (eos) {
source_state = SourceState::FINISHED;
_child_block->clear_column_data();
} else if (!_node->need_more_input_data()) {
source_state = SourceState::MORE_DATA;
} else {
_child_block->clear_column_data();
}
return Status::OK();
}
private:
vectorized::VTableFunctionNode* _node;
std::unique_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;
TableFunctionOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
};
OperatorPtr TableFunctionOperatorBuilder::build_operator() {
return std::make_shared<TableFunctionOperator>(this, _node);
}
} // namespace pipeline
} // namespace doris
} // namespace doris::pipeline

View File

@ -37,7 +37,6 @@ Status Pipeline::build_operators(Operators& operators) {
OperatorPtr pre;
for (auto& operator_t : _operator_builders) {
auto o = operator_t->build_operator();
RETURN_IF_ERROR(o->init(operator_t->exec_node(), _context->get_runtime_state()));
if (pre) {
o->set_child(pre);
}

View File

@ -66,7 +66,7 @@ public:
Status set_sink(OperatorBuilderPtr& sink_operator);
OperatorBuilder* sink() { return _sink.get(); }
OperatorBuilderBase* sink() { return _sink.get(); }
Status build_operators(Operators&);

View File

@ -26,10 +26,10 @@
#include "exec/empty_set_operator.h"
#include "exec/exchange_sink_operator.h"
#include "exec/exchange_source_operator.h"
#include "exec/olap_scan_operator.h"
#include "exec/repeat_operator.h"
#include "exec/result_sink_operator.h"
#include "exec/scan_node.h"
#include "exec/scan_operator.h"
#include "exec/sort_sink_operator.h"
#include "exec/sort_source_operator.h"
#include "exec/streaming_aggregation_sink_operator.h"
@ -257,7 +257,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
for (PipelinePtr& pipeline : _pipelines) {
// if sink
auto sink = pipeline->sink()->build_operator();
RETURN_IF_ERROR(sink->init(pipeline->sink()->exec_node(), _runtime_state.get()));
// TODO pipeline 1 need to add new interface for exec node and operator
sink->init(request.fragment.output_sink);
@ -282,23 +281,20 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
switch (node_type) {
// for source
case TPlanNodeType::OLAP_SCAN_NODE: {
auto* new_olap_scan_node = assert_cast<vectorized::NewOlapScanNode*>(node);
OperatorBuilderPtr operator_t = std::make_shared<OlapScanOperatorBuilder>(
fragment_context->next_operator_builder_id(), "OlapScanOperator",
new_olap_scan_node);
OperatorBuilderPtr operator_t = std::make_shared<ScanOperatorBuilder>(
fragment_context->next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::EXCHANGE_NODE: {
OperatorBuilderPtr operator_t = std::make_shared<ExchangeSourceOperatorBuilder>(
next_operator_builder_id(), "ExchangeSourceOperator", node);
OperatorBuilderPtr operator_t =
std::make_shared<ExchangeSourceOperatorBuilder>(next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::EMPTY_SET_NODE: {
auto* empty_set_node = assert_cast<vectorized::VEmptySetNode*>(node);
OperatorBuilderPtr operator_t = std::make_shared<EmptySetSourceOperatorBuilder>(
next_operator_builder_id(), "EmptySetSourceOperator", empty_set_node);
OperatorBuilderPtr operator_t =
std::make_shared<EmptySetSourceOperatorBuilder>(next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
@ -309,50 +305,47 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
if (agg_node->is_streaming_preagg()) {
auto agg_ctx = std::make_shared<AggContext>();
OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>(
next_operator_builder_id(), "StreamingAggSinkOperator", agg_node, agg_ctx);
next_operator_builder_id(), agg_node, agg_ctx);
RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
OperatorBuilderPtr pre_agg_source = std::make_shared<StreamingAggSourceOperatorBuilder>(
next_operator_builder_id(), "StreamingAggSourceOperator", agg_node, agg_ctx);
next_operator_builder_id(), agg_node, agg_ctx);
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
} else {
OperatorBuilderPtr agg_sink = std::make_shared<AggSinkOperatorBuilder>(
next_operator_builder_id(), "AggSinkOperator", agg_node);
OperatorBuilderPtr agg_sink =
std::make_shared<AggSinkOperatorBuilder>(next_operator_builder_id(), agg_node);
RETURN_IF_ERROR(new_pipe->set_sink(agg_sink));
OperatorBuilderPtr agg_source = std::make_shared<AggregationSourceOperatorBuilder>(
next_operator_builder_id(), "AggSourceOperator", agg_node);
OperatorBuilderPtr agg_source = std::make_shared<AggSourceOperatorBuilder>(
next_operator_builder_id(), agg_node);
RETURN_IF_ERROR(cur_pipe->add_operator(agg_source));
}
break;
}
case TPlanNodeType::SORT_NODE: {
auto* sort_node = assert_cast<vectorized::VSortNode*>(node);
auto new_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
OperatorBuilderPtr sort_sink = std::make_shared<SortSinkOperatorBuilder>(
next_operator_builder_id(), "SortSinkOperator", sort_node);
OperatorBuilderPtr sort_sink =
std::make_shared<SortSinkOperatorBuilder>(next_operator_builder_id(), node);
RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink));
OperatorBuilderPtr sort_source = std::make_shared<SortSourceOperatorBuilder>(
next_operator_builder_id(), "SortSourceOperator", sort_node);
OperatorBuilderPtr sort_source =
std::make_shared<SortSourceOperatorBuilder>(next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(sort_source));
break;
}
case TPlanNodeType::REPEAT_NODE: {
auto* repeat_node = assert_cast<vectorized::VRepeatNode*>(node);
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder =
std::make_shared<RepeatOperatorBuilder>(next_operator_builder_id(), repeat_node);
std::make_shared<RepeatOperatorBuilder>(next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
case TPlanNodeType::TABLE_FUNCTION_NODE: {
auto* repeat_node = assert_cast<vectorized::VTableFunctionNode*>(node);
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder = std::make_shared<TableFunctionOperatorBuilder>(
next_operator_builder_id(), repeat_node);
OperatorBuilderPtr builder =
std::make_shared<TableFunctionOperatorBuilder>(next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
@ -380,15 +373,13 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) {
OperatorBuilderPtr sink_;
switch (thrift_sink.type) {
case TDataSinkType::DATA_STREAM_SINK: {
auto* exchange_sink = assert_cast<doris::vectorized::VDataStreamSender*>(_sink.get());
sink_ = std::make_shared<ExchangeSinkOperatorBuilder>(
next_operator_builder_id(), "ExchangeSinkOperator", nullptr, exchange_sink, this);
sink_ = std::make_shared<ExchangeSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get(), this);
break;
}
case TDataSinkType::RESULT_SINK: {
auto* result_sink = assert_cast<doris::vectorized::VResultSink*>(_sink.get());
sink_ = std::make_shared<ResultSinkOperatorBuilder>(
next_operator_builder_id(), "ResultSinkOperator", nullptr, result_sink);
sink_ = std::make_shared<ResultSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
break;
}
default:
@ -566,4 +557,4 @@ void PipelineFragmentContext::send_report(bool done) {
}
}
} // namespace doris::pipeline
} // namespace doris::pipeline

View File

@ -203,7 +203,6 @@ private:
std::atomic<size_t> _next_core = 0;
};
// TODO pipeline sr
class BlockedTaskScheduler {
public:
explicit BlockedTaskScheduler(std::shared_ptr<TaskQueue> task_queue)

View File

@ -20,7 +20,6 @@
#include "common/status.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/pipeline.h"
#include "pipeline/pipeline_fragment_context.h"
#include "util/to_string.h"

View File

@ -49,6 +49,8 @@ class VScanNode : public ExecNode {
public:
VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {}
virtual ~VScanNode() = default;
friend class VScanner;
friend class NewOlapScanner;
friend class VFileScanner;

View File

@ -36,7 +36,7 @@ class MemPool;
namespace pipeline {
class AggSinkOperator;
class AggregationSourceOperator;
class AggSourceOperator;
class StreamingAggSinkOperator;
class StreamingAggSourceOperator;
} // namespace pipeline
@ -789,7 +789,7 @@ public:
private:
friend class pipeline::AggSinkOperator;
friend class pipeline::StreamingAggSinkOperator;
friend class pipeline::AggregationSourceOperator;
friend class pipeline::AggSourceOperator;
friend class pipeline::StreamingAggSourceOperator;
// group by k1,k2
std::vector<VExprContext*> _probe_expr_ctxs;

View File

@ -88,6 +88,8 @@ public:
bool channel_all_can_write();
const RowDescriptor& row_desc() { return _row_desc; }
protected:
friend class Channel;
friend class pipeline::ExchangeSinkBuffer;

View File

@ -56,6 +56,8 @@ public:
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
const RowDescriptor& row_desc() { return _row_desc; }
private:
Status prepare_exprs(RuntimeState* state);
TResultSinkType::type _sink_type;