[pipelineX](minor) refine code (#25015)

This commit is contained in:
Gabriel
2023-10-07 10:45:33 +08:00
committed by GitHub
parent 813c8f1e5a
commit bd582aee75
12 changed files with 129 additions and 100 deletions

View File

@ -215,4 +215,6 @@ Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
return Status::OK();
}
template class DataSinkOperatorX<AnalyticSinkLocalState>;
} // namespace doris::pipeline

View File

@ -102,4 +102,4 @@ private:
};
} // namespace pipeline
} // namespace doris
} // namespace doris

View File

@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "multi_cast_data_stream_sink.h"
namespace doris::pipeline {
OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
}
Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
auto& p = _parent->cast<MultiCastDataStreamSinkOperatorX>();
_shared_state->multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>(
p._row_desc, p._pool, p._cast_sender_count);
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -18,6 +18,7 @@
#pragma once
#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
namespace doris::pipeline {
@ -40,8 +41,75 @@ public:
bool can_write() override { return true; }
};
OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
}
class MultiCastDataStreamSinkOperatorX;
class MultiCastDataStreamSinkLocalState final
: public PipelineXSinkLocalState<MultiCastDependency> {
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
friend class MultiCastDataStreamSinkOperatorX;
friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
using Base = PipelineXSinkLocalState<MultiCastDependency>;
using Parent = MultiCastDataStreamSinkOperatorX;
private:
std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
};
class MultiCastDataStreamSinkOperatorX final
: public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> {
using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
public:
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
const int cast_sender_count, ObjectPool* pool,
const TMultiCastDataStreamSink& sink,
const RowDescriptor& row_desc)
: Base(sink_id, sources),
_pool(pool),
_row_desc(row_desc),
_cast_sender_count(cast_sender_count) {}
~MultiCastDataStreamSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override { return Status::OK(); }
Status open(doris::RuntimeState* state) override { return Status::OK(); };
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
auto st = local_state._shared_state->multi_cast_data_streamer->push(
state, in_block, source_state == SourceState::FINISHED);
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
if (st.template is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
}
return st;
}
return Status::OK();
}
RowDescriptor& row_desc() override { return _row_desc; }
std::shared_ptr<pipeline::MultiCastDataStreamer> create_multi_cast_data_streamer() {
auto multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>(
_row_desc, _pool, _cast_sender_count);
return multi_cast_data_streamer;
}
private:
friend class MultiCastDataStreamSinkLocalState;
ObjectPool* _pool;
RowDescriptor _row_desc;
int _cast_sender_count;
friend class MultiCastDataStreamSinkLocalState;
};
} // namespace doris::pipeline

View File

@ -130,11 +130,9 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
if (p._t_data_stream_sink.__isset.output_exprs) {
_output_expr_contexts.resize(p._output_expr_contexts.size());
for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i]));
}
_output_expr_contexts.resize(p._output_expr_contexts.size());
for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i]));
}
return Status::OK();
}
@ -150,7 +148,7 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
if (!local_state._output_expr_contexts.empty()) {
output_block = &tmp_block;
}
local_state._shared_state->_multi_cast_data_streamer->pull(_consumer_id, output_block, &eos);
local_state._shared_state->multi_cast_data_streamer->pull(_consumer_id, output_block, &eos);
if (!local_state._conjuncts.empty()) {
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block,
@ -162,9 +160,11 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
local_state._output_expr_contexts, *output_block, block));
materialize_block_inplace(*block);
}
COUNTER_UPDATE(local_state._rows_returned_counter, block->rows());
if (eos) {
source_state = SourceState::FINISHED;
}
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -108,6 +108,7 @@ public:
private:
vectorized::VExprContextSPtrs _output_expr_contexts;
};
class MultiCastDataStreamerSourceOperatorX final
: public OperatorX<MultiCastDataStreamSourceLocalState> {
public:
@ -169,73 +170,5 @@ private:
const RowDescriptor& _row_desc() { return _row_descriptor; }
};
// sink operator
class MultiCastDataStreamSinkOperatorX;
class MultiCastDataStreamSinkLocalState final
: public PipelineXSinkLocalState<MultiCastDependency> {
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {}
friend class MultiCastDataStreamSinkOperatorX;
friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
using Base = PipelineXSinkLocalState<MultiCastDependency>;
using Parent = MultiCastDataStreamSinkOperatorX;
};
class MultiCastDataStreamSinkOperatorX final
: public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> {
using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
public:
friend class UnionSinkLocalState;
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
const int cast_sender_count, ObjectPool* pool,
const TMultiCastDataStreamSink& sink,
const RowDescriptor& row_desc)
: Base(sink_id, sources),
_pool(pool),
_row_desc(row_desc),
_cast_sender_count(cast_sender_count) {}
~MultiCastDataStreamSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override { return Status::OK(); }
Status open(doris::RuntimeState* state) override { return Status::OK(); };
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
auto st = local_state._shared_state->_multi_cast_data_streamer->push(
state, in_block, source_state == SourceState::FINISHED);
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
if (st.template is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
}
return st;
}
return Status::OK();
}
std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer() {
auto multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>(
_row_desc, _pool, _cast_sender_count);
return multi_cast_data_streamer;
}
RowDescriptor& row_desc() override { return _row_desc; }
private:
ObjectPool* _pool;
RowDescriptor _row_desc;
int _cast_sender_count;
friend class MultiCastDataStreamSinkLocalState;
};
} // namespace pipeline
} // namespace doris
} // namespace doris

View File

@ -124,7 +124,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
return Status::OK();
}
std::shared_ptr<DataQueue> UnionSourceLocalState::data_queue() {
std::shared_ptr<DataQueue> UnionSourceLocalState::create_data_queue() {
auto& p = _parent->cast<Parent>();
std::shared_ptr<DataQueue> data_queue = std::make_shared<DataQueue>(p._child_size, _dependency);
return data_queue;

View File

@ -78,7 +78,7 @@ public:
UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {};
Status init(RuntimeState* state, LocalStateInfo& info) override;
std::shared_ptr<DataQueue> data_queue();
std::shared_ptr<DataQueue> create_data_queue();
private:
friend class UnionSourceOperatorX;

View File

@ -111,7 +111,7 @@ protected:
class WriteDependency : public Dependency {
public:
WriteDependency(int id, std::string name) : Dependency(id, name), _ready_for_write(true) {}
virtual ~WriteDependency() = default;
~WriteDependency() override = default;
bool is_write_dependency() override { return true; }
@ -428,7 +428,7 @@ private:
struct MultiCastSharedState {
public:
std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer;
};
class MultiCastDependency final : public WriteDependency {
@ -438,7 +438,7 @@ public:
~MultiCastDependency() override = default;
void* shared_state() override { return (void*)&_multi_cast_state; };
MultiCastDependency* can_read(const int consumer_id) {
if (_multi_cast_state._multi_cast_data_streamer->can_read(consumer_id)) {
if (_multi_cast_state.multi_cast_data_streamer->can_read(consumer_id)) {
return nullptr;
} else {
return this;

View File

@ -37,6 +37,7 @@
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
@ -257,20 +258,14 @@ Status DataSinkOperatorXBase::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}
template <typename LocalStateType>
Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state,
LocalSinkStateInfo& info) {
auto local_state = LocalStateType::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
return local_state->init(state, info);
}
template <typename LocalStateType>
Status DataSinkOperatorX<LocalStateType>::setup_local_states(
RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) {
DCHECK(infos.size() == 1);
for (auto& info : infos) {
RETURN_IF_ERROR(setup_local_state(state, info));
auto local_state = LocalStateType::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
}
return Status::OK();
}
@ -279,12 +274,12 @@ template <>
Status DataSinkOperatorX<MultiCastDataStreamSinkLocalState>::setup_local_states(
RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) {
auto multi_cast_data_streamer =
static_cast<MultiCastDataStreamSinkOperatorX*>(this)->multi_cast_data_streamer();
static_cast<MultiCastDataStreamSinkOperatorX*>(this)->create_multi_cast_data_streamer();
for (auto& info : infos) {
auto local_state = MultiCastDataStreamSinkLocalState::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
local_state->_shared_state->_multi_cast_data_streamer = multi_cast_data_streamer;
local_state->_shared_state->multi_cast_data_streamer = multi_cast_data_streamer;
}
return Status::OK();
@ -331,7 +326,7 @@ Status OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
RETURN_IF_ERROR(local_state->init(state, info));
if (child_count != 0) {
if (!data_queue) {
data_queue = local_state->data_queue();
data_queue = local_state->create_data_queue();
}
local_state->_shared_state->data_queue = data_queue;
}

View File

@ -419,8 +419,6 @@ public:
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }
virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0;
virtual Status setup_local_states(RuntimeState* state,
std::vector<LocalSinkStateInfo>& infos) = 0;
@ -529,8 +527,6 @@ public:
: DataSinkOperatorXBase(id, sources) {}
~DataSinkOperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override;
Status setup_local_states(RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) override;
void get_dependency(std::vector<DependencySPtr>& dependency) override;

View File

@ -60,6 +60,7 @@
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"