[pipeline](CTE) Support multi stream data sink in pipeline (#19519)
This commit is contained in:
@ -31,6 +31,7 @@
|
||||
#include <utility>
|
||||
|
||||
#include "common/config.h"
|
||||
#include "vec/sink/multi_cast_data_stream_sink.h"
|
||||
#include "vec/sink/vdata_stream_sender.h"
|
||||
#include "vec/sink/vjdbc_table_sink.h"
|
||||
#include "vec/sink/vmemory_scratch_sink.h"
|
||||
@ -161,6 +162,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
RETURN_IF_ERROR(status);
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
|
||||
return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support in pipeline engine");
|
||||
}
|
||||
|
||||
default: {
|
||||
std::stringstream error_msg;
|
||||
@ -302,6 +306,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
|
||||
RETURN_IF_ERROR(status);
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
|
||||
DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
|
||||
DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
|
||||
auto multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>(
|
||||
row_desc, pool, thrift_sink.multi_cast_stream_sink.sinks.size());
|
||||
sink->reset(new vectorized::MultiCastDataStreamSink(multi_cast_data_streamer));
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
std::stringstream error_msg;
|
||||
|
||||
@ -58,7 +58,9 @@ set(PIPELINE_FILES
|
||||
exec/union_source_operator.cpp
|
||||
exec/data_queue.cpp
|
||||
exec/select_operator.cpp
|
||||
exec/empty_source_operator.cpp)
|
||||
exec/empty_source_operator.cpp
|
||||
exec/multi_cast_data_streamer.cpp
|
||||
exec/multi_cast_data_stream_source.cpp)
|
||||
|
||||
if (WITH_MYSQL)
|
||||
set(PIPELINE_FILES
|
||||
|
||||
@ -34,20 +34,33 @@ class DataSink;
|
||||
namespace doris::pipeline {
|
||||
|
||||
ExchangeSinkOperatorBuilder::ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink,
|
||||
PipelineFragmentContext* context)
|
||||
: DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink), _context(context) {}
|
||||
PipelineFragmentContext* context,
|
||||
int mult_cast_id)
|
||||
: DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink),
|
||||
_context(context),
|
||||
_mult_cast_id(mult_cast_id) {}
|
||||
|
||||
OperatorPtr ExchangeSinkOperatorBuilder::build_operator() {
|
||||
return std::make_shared<ExchangeSinkOperator>(this, _sink, _context);
|
||||
return std::make_shared<ExchangeSinkOperator>(this, _sink, _context, _mult_cast_id);
|
||||
}
|
||||
|
||||
ExchangeSinkOperator::ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink,
|
||||
PipelineFragmentContext* context)
|
||||
: DataSinkOperator(operator_builder, sink), _context(context) {}
|
||||
PipelineFragmentContext* context, int mult_cast_id)
|
||||
: DataSinkOperator(operator_builder, sink),
|
||||
_context(context),
|
||||
_mult_cast_id(mult_cast_id) {}
|
||||
|
||||
Status ExchangeSinkOperator::init(const TDataSink& tsink) {
|
||||
RETURN_IF_ERROR(_sink->init(tsink));
|
||||
_dest_node_id = tsink.stream_sink.dest_node_id;
|
||||
// -1 means not the mult cast stream sender
|
||||
if (_mult_cast_id == -1) {
|
||||
RETURN_IF_ERROR(_sink->init(tsink));
|
||||
_dest_node_id = tsink.stream_sink.dest_node_id;
|
||||
} else {
|
||||
TDataSink new_t_sink;
|
||||
new_t_sink.stream_sink = tsink.multi_cast_stream_sink.sinks[_mult_cast_id];
|
||||
RETURN_IF_ERROR(_sink->init(new_t_sink));
|
||||
_dest_node_id = tsink.multi_cast_stream_sink.sinks[_mult_cast_id].dest_node_id;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -37,19 +37,21 @@ class PipelineFragmentContext;
|
||||
class ExchangeSinkOperatorBuilder final
|
||||
: public DataSinkOperatorBuilder<vectorized::VDataStreamSender> {
|
||||
public:
|
||||
ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, PipelineFragmentContext* context);
|
||||
ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, PipelineFragmentContext* context,
|
||||
int mult_cast_id = -1);
|
||||
|
||||
OperatorPtr build_operator() override;
|
||||
|
||||
private:
|
||||
PipelineFragmentContext* _context;
|
||||
int _mult_cast_id = -1;
|
||||
};
|
||||
|
||||
// Now local exchange is not supported since VDataStreamRecvr is considered as a pipeline broker.
|
||||
class ExchangeSinkOperator final : public DataSinkOperator<ExchangeSinkOperatorBuilder> {
|
||||
public:
|
||||
ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink,
|
||||
PipelineFragmentContext* context);
|
||||
PipelineFragmentContext* context, int mult_cast_id);
|
||||
Status init(const TDataSink& tsink) override;
|
||||
|
||||
Status prepare(RuntimeState* state) override;
|
||||
@ -65,6 +67,7 @@ private:
|
||||
int _dest_node_id = -1;
|
||||
RuntimeState* _state = nullptr;
|
||||
PipelineFragmentContext* _context;
|
||||
int _mult_cast_id = -1;
|
||||
};
|
||||
|
||||
} // namespace pipeline
|
||||
|
||||
47
be/src/pipeline/exec/multi_cast_data_stream_sink.h
Normal file
47
be/src/pipeline/exec/multi_cast_data_stream_sink.h
Normal file
@ -0,0 +1,47 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "operator.h"
|
||||
#include "vec/sink/multi_cast_data_stream_sink.h"
|
||||
|
||||
namespace doris::pipeline {
|
||||
|
||||
class MultiCastDataStreamSinkOperatorBuilder final
|
||||
: public DataSinkOperatorBuilder<vectorized::MultiCastDataStreamSink> {
|
||||
public:
|
||||
MultiCastDataStreamSinkOperatorBuilder(int32_t id, DataSink* sink)
|
||||
: DataSinkOperatorBuilder(id, "MultiCastDataStreamSinkOperator", sink) {}
|
||||
|
||||
OperatorPtr build_operator() override;
|
||||
};
|
||||
|
||||
class MultiCastDataStreamSinkOperator final
|
||||
: public DataSinkOperator<MultiCastDataStreamSinkOperatorBuilder> {
|
||||
public:
|
||||
MultiCastDataStreamSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
|
||||
: DataSinkOperator(operator_builder, sink) {}
|
||||
|
||||
bool can_write() override { return true; }
|
||||
};
|
||||
|
||||
OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
|
||||
return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
|
||||
}
|
||||
|
||||
} // namespace doris::pipeline
|
||||
64
be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Normal file
64
be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Normal file
@ -0,0 +1,64 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "multi_cast_data_stream_source.h"
|
||||
|
||||
#include <functional>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "pipeline/exec/multi_cast_data_streamer.h"
|
||||
#include "pipeline/exec/operator.h"
|
||||
#include "vec/core/block.h"
|
||||
|
||||
namespace doris::pipeline {
|
||||
|
||||
MultiCastDataStreamerSourceOperatorBuilder::MultiCastDataStreamerSourceOperatorBuilder(
|
||||
int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer)
|
||||
: OperatorBuilderBase(id, "MultiCastDataStreamerSourceOperator"),
|
||||
_consumer_id(consumer_id),
|
||||
_multi_cast_data_streamer(data_streamer) {};
|
||||
|
||||
OperatorPtr MultiCastDataStreamerSourceOperatorBuilder::build_operator() {
|
||||
return std::make_shared<MultiCastDataStreamerSourceOperator>(this, _consumer_id,
|
||||
_multi_cast_data_streamer);
|
||||
}
|
||||
|
||||
const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() {
|
||||
return _multi_cast_data_streamer->row_desc();
|
||||
}
|
||||
|
||||
MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator(
|
||||
OperatorBuilderBase* operator_builder, const int consumer_id,
|
||||
std::shared_ptr<MultiCastDataStreamer>& data_streamer)
|
||||
: OperatorBase(operator_builder),
|
||||
_consumer_id(consumer_id),
|
||||
_multi_cast_data_streamer(data_streamer) {};
|
||||
|
||||
bool MultiCastDataStreamerSourceOperator::can_read() {
|
||||
return _multi_cast_data_streamer->can_read(_consumer_id);
|
||||
}
|
||||
|
||||
Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
|
||||
SourceState& source_state) {
|
||||
bool eos = false;
|
||||
_multi_cast_data_streamer->pull(_consumer_id, block, &eos);
|
||||
if (eos) {
|
||||
source_state = SourceState::FINISHED;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace doris::pipeline
|
||||
78
be/src/pipeline/exec/multi_cast_data_stream_source.h
Normal file
78
be/src/pipeline/exec/multi_cast_data_stream_source.h
Normal file
@ -0,0 +1,78 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "operator.h"
|
||||
|
||||
namespace doris {
|
||||
class ExecNode;
|
||||
class RuntimeState;
|
||||
|
||||
namespace vectorized {
|
||||
class Block;
|
||||
} // namespace vectorized
|
||||
|
||||
namespace pipeline {
|
||||
class MultiCastDataStreamer;
|
||||
|
||||
class MultiCastDataStreamerSourceOperatorBuilder final : public OperatorBuilderBase {
|
||||
public:
|
||||
MultiCastDataStreamerSourceOperatorBuilder(int32_t id, const int consumer_id,
|
||||
std::shared_ptr<MultiCastDataStreamer>&);
|
||||
|
||||
bool is_source() const override { return true; }
|
||||
|
||||
OperatorPtr build_operator() override;
|
||||
|
||||
const RowDescriptor& row_desc() override;
|
||||
|
||||
private:
|
||||
const int _consumer_id;
|
||||
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
|
||||
};
|
||||
|
||||
class MultiCastDataStreamerSourceOperator final : public OperatorBase {
|
||||
public:
|
||||
MultiCastDataStreamerSourceOperator(OperatorBuilderBase* operator_builder,
|
||||
const int consumer_id,
|
||||
std::shared_ptr<MultiCastDataStreamer>& data_streamer);
|
||||
|
||||
Status get_block(RuntimeState* state, vectorized::Block* block,
|
||||
SourceState& source_state) override;
|
||||
|
||||
Status prepare(RuntimeState* state) override { return Status::OK(); };
|
||||
|
||||
Status open(RuntimeState* state) override { return Status::OK(); };
|
||||
|
||||
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool can_read() override;
|
||||
|
||||
private:
|
||||
const int _consumer_id;
|
||||
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
|
||||
};
|
||||
|
||||
} // namespace pipeline
|
||||
} // namespace doris
|
||||
75
be/src/pipeline/exec/multi_cast_data_streamer.cpp
Normal file
75
be/src/pipeline/exec/multi_cast_data_streamer.cpp
Normal file
@ -0,0 +1,75 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "multi_cast_data_streamer.h"
|
||||
|
||||
#include "runtime/runtime_state.h"
|
||||
|
||||
namespace doris::pipeline {
|
||||
|
||||
MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size)
|
||||
: _used_count(used_count), _mem_size(mem_size) {
|
||||
_block = vectorized::Block::create_unique(block->get_columns_with_type_and_name());
|
||||
block->clear();
|
||||
}
|
||||
|
||||
void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) {
|
||||
std::lock_guard l(_mutex);
|
||||
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
|
||||
if (pos_to_pull != _multi_cast_blocks.end()) {
|
||||
if (pos_to_pull->_used_count == 1) {
|
||||
DCHECK(pos_to_pull == _multi_cast_blocks.begin());
|
||||
pos_to_pull->_block->swap(*block);
|
||||
|
||||
_cumulative_mem_size -= pos_to_pull->_mem_size;
|
||||
pos_to_pull++;
|
||||
_multi_cast_blocks.pop_front();
|
||||
} else {
|
||||
pos_to_pull->_used_count--;
|
||||
pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
|
||||
(void)vectorized::MutableBlock(block).merge(*pos_to_pull->_block);
|
||||
pos_to_pull++;
|
||||
}
|
||||
}
|
||||
*eos = _eos and pos_to_pull == _multi_cast_blocks.end();
|
||||
}
|
||||
|
||||
void MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) {
|
||||
auto rows = block->rows();
|
||||
COUNTER_UPDATE(_process_rows, rows);
|
||||
|
||||
auto block_mem_size = block->allocated_bytes();
|
||||
std::lock_guard l(_mutex);
|
||||
int need_process_count = _cast_sender_count - _opened_sender_count;
|
||||
// TODO: if the [queue back block rows + block->rows()] < batch_size, better
|
||||
// do merge block. but need check the need_process_count and used_count whether
|
||||
// equal
|
||||
_multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size);
|
||||
_cumulative_mem_size += block_mem_size;
|
||||
COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, _peak_mem_usage->value()));
|
||||
|
||||
auto end = _multi_cast_blocks.end();
|
||||
end--;
|
||||
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
|
||||
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
|
||||
_sender_pos_to_read[i] = end;
|
||||
}
|
||||
}
|
||||
_eos = eos;
|
||||
}
|
||||
|
||||
} // namespace doris::pipeline
|
||||
82
be/src/pipeline/exec/multi_cast_data_streamer.h
Normal file
82
be/src/pipeline/exec/multi_cast_data_streamer.h
Normal file
@ -0,0 +1,82 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "vec/sink/vdata_stream_sender.h"
|
||||
|
||||
namespace doris::pipeline {
|
||||
|
||||
struct MultiCastBlock {
|
||||
MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size);
|
||||
|
||||
std::unique_ptr<vectorized::Block> _block;
|
||||
int _used_count;
|
||||
size_t _mem_size;
|
||||
};
|
||||
|
||||
// TDOD: MultiCastDataStreamer same as the data queue, maybe rethink union and refactor the
|
||||
// code
|
||||
class MultiCastDataStreamer {
|
||||
public:
|
||||
MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count)
|
||||
: _row_desc(row_desc),
|
||||
_profile(pool->add(new RuntimeProfile("MultiCastDataStreamSink"))),
|
||||
_cast_sender_count(cast_sender_count) {
|
||||
_sender_pos_to_read.resize(cast_sender_count, _multi_cast_blocks.end());
|
||||
_peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
|
||||
_process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT);
|
||||
};
|
||||
|
||||
~MultiCastDataStreamer() = default;
|
||||
|
||||
void pull(int sender_idx, vectorized::Block* block, bool* eos);
|
||||
|
||||
void push(RuntimeState* state, vectorized::Block* block, bool eos);
|
||||
|
||||
// use sink to check can_write, now always true after we support spill to disk
|
||||
bool can_write() { return true; }
|
||||
|
||||
bool can_read(int sender_idx) {
|
||||
std::lock_guard l(_mutex);
|
||||
return _sender_pos_to_read[sender_idx] != _multi_cast_blocks.end() || _eos;
|
||||
}
|
||||
|
||||
const RowDescriptor& row_desc() { return _row_desc; }
|
||||
|
||||
RuntimeProfile* profile() { return _profile; }
|
||||
|
||||
void set_eos() {
|
||||
std::lock_guard l(_mutex);
|
||||
_eos = true;
|
||||
}
|
||||
|
||||
private:
|
||||
const RowDescriptor& _row_desc;
|
||||
RuntimeProfile* _profile;
|
||||
std::list<MultiCastBlock> _multi_cast_blocks;
|
||||
std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
|
||||
std::mutex _mutex;
|
||||
bool _eos = false;
|
||||
int _cast_sender_count = 0;
|
||||
int _opened_sender_count = 0;
|
||||
int64_t _cumulative_mem_size = 0;
|
||||
|
||||
RuntimeProfile::Counter* _process_rows;
|
||||
RuntimeProfile::Counter* _peak_mem_usage;
|
||||
};
|
||||
} // namespace doris::pipeline
|
||||
@ -54,6 +54,8 @@
|
||||
#include "pipeline/exec/exchange_source_operator.h"
|
||||
#include "pipeline/exec/hashjoin_build_sink.h"
|
||||
#include "pipeline/exec/hashjoin_probe_operator.h"
|
||||
#include "pipeline/exec/multi_cast_data_stream_sink.h"
|
||||
#include "pipeline/exec/multi_cast_data_stream_source.h"
|
||||
#include "pipeline/exec/mysql_scan_operator.h" // IWYU pragma: keep
|
||||
#include "pipeline/exec/nested_loop_join_build_operator.h"
|
||||
#include "pipeline/exec/nested_loop_join_probe_operator.h"
|
||||
@ -307,7 +309,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
|
||||
_root_pipeline = fragment_context->add_pipeline();
|
||||
RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
|
||||
if (_sink) {
|
||||
RETURN_IF_ERROR(_create_sink(request.fragment.output_sink));
|
||||
RETURN_IF_ERROR(
|
||||
_create_sink(request.local_params[idx].sender_id, request.fragment.output_sink));
|
||||
}
|
||||
RETURN_IF_ERROR(_build_pipeline_tasks(request));
|
||||
if (_sink) {
|
||||
@ -341,6 +344,12 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
|
||||
RETURN_IF_ERROR(task->prepare(_runtime_state.get()));
|
||||
}
|
||||
_total_tasks = _tasks.size();
|
||||
|
||||
// register the profile of child data stream sender
|
||||
for (auto& sender : _multi_cast_stream_sink_senders) {
|
||||
_sink->profile()->add_child(sender->profile(), true, nullptr);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -686,7 +695,7 @@ void PipelineFragmentContext::close_if_prepare_failed() {
|
||||
}
|
||||
|
||||
// construct sink operator
|
||||
Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) {
|
||||
Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thrift_sink) {
|
||||
OperatorBuilderPtr sink_;
|
||||
switch (thrift_sink.type) {
|
||||
case TDataSinkType::DATA_STREAM_SINK: {
|
||||
@ -715,6 +724,40 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) {
|
||||
_sink.get());
|
||||
break;
|
||||
}
|
||||
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
|
||||
sink_ = std::make_shared<MultiCastDataStreamSinkOperatorBuilder>(next_operator_builder_id(),
|
||||
_sink.get());
|
||||
RETURN_IF_ERROR(_root_pipeline->set_sink(sink_));
|
||||
|
||||
auto& multi_cast_data_streamer =
|
||||
assert_cast<vectorized::MultiCastDataStreamSink*>(_sink.get())
|
||||
->get_multi_cast_data_streamer();
|
||||
DCHECK_EQ(thrift_sink.multi_cast_stream_sink.sinks.size(),
|
||||
thrift_sink.multi_cast_stream_sink.destinations.size());
|
||||
auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
|
||||
_multi_cast_stream_sink_senders.resize(sender_size);
|
||||
for (int i = 0; i < sender_size; ++i) {
|
||||
auto new_pipeline = add_pipeline();
|
||||
// 1. create the data stream sender sink
|
||||
_multi_cast_stream_sink_senders[i].reset(new vectorized::VDataStreamSender(
|
||||
_runtime_state.get(), _runtime_state->obj_pool(), sender_id, sink_->row_desc(),
|
||||
thrift_sink.multi_cast_stream_sink.sinks[i],
|
||||
thrift_sink.multi_cast_stream_sink.destinations[i], 16 * 1024, false));
|
||||
|
||||
// 2. create and set the source operator of multi_cast_data_stream_source for new pipeline
|
||||
OperatorBuilderPtr source_op =
|
||||
std::make_shared<MultiCastDataStreamerSourceOperatorBuilder>(
|
||||
next_operator_builder_id(), i, multi_cast_data_streamer);
|
||||
new_pipeline->add_operator(source_op);
|
||||
|
||||
// 3. create and set sink operator of data stream sender for new pipeline
|
||||
OperatorBuilderPtr sink_op_builder = std::make_shared<ExchangeSinkOperatorBuilder>(
|
||||
next_operator_builder_id(), _multi_cast_stream_sink_senders[i].get(), this, i);
|
||||
new_pipeline->set_sink(sink_op_builder);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
default:
|
||||
return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
|
||||
}
|
||||
|
||||
@ -123,7 +123,7 @@ public:
|
||||
taskgroup::TaskGroup* get_task_group() const { return _query_ctx->get_task_group(); }
|
||||
|
||||
private:
|
||||
Status _create_sink(const TDataSink& t_data_sink);
|
||||
Status _create_sink(int sender_id, const TDataSink& t_data_sink);
|
||||
Status _build_pipelines(ExecNode*, PipelinePtr);
|
||||
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request);
|
||||
template <bool is_intersect>
|
||||
@ -168,7 +168,10 @@ private:
|
||||
std::unique_ptr<RuntimeState> _runtime_state;
|
||||
|
||||
ExecNode* _root_plan = nullptr; // lives in _runtime_state->obj_pool()
|
||||
// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
|
||||
// of it in pipeline task not the fragment_context
|
||||
std::unique_ptr<DataSink> _sink;
|
||||
std::vector<std::unique_ptr<DataSink>> _multi_cast_stream_sink_senders;
|
||||
|
||||
std::shared_ptr<QueryContext> _query_ctx;
|
||||
|
||||
|
||||
58
be/src/vec/sink/multi_cast_data_stream_sink.h
Normal file
58
be/src/vec/sink/multi_cast_data_stream_sink.h
Normal file
@ -0,0 +1,58 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "pipeline/exec/multi_cast_data_streamer.h"
|
||||
#include "vec/sink/vdata_stream_sender.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
class MultiCastDataStreamSink : public DataSink {
|
||||
public:
|
||||
MultiCastDataStreamSink(std::shared_ptr<pipeline::MultiCastDataStreamer>& streamer)
|
||||
: _multi_cast_data_streamer(streamer) {};
|
||||
|
||||
~MultiCastDataStreamSink() override = default;
|
||||
|
||||
Status send(RuntimeState* state, Block* block, bool eos = false) override {
|
||||
_multi_cast_data_streamer->push(state, block, eos);
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
Status open(doris::RuntimeState* state) override { return Status::OK(); };
|
||||
|
||||
// use sink to check can_write, now always true after we support spill to disk
|
||||
bool can_write() { return _multi_cast_data_streamer->can_write(); }
|
||||
|
||||
RuntimeProfile* profile() override { return _multi_cast_data_streamer->profile(); }
|
||||
|
||||
const RowDescriptor& row_desc() { return _multi_cast_data_streamer->row_desc(); }
|
||||
|
||||
std::shared_ptr<pipeline::MultiCastDataStreamer>& get_multi_cast_data_streamer() {
|
||||
return _multi_cast_data_streamer;
|
||||
}
|
||||
|
||||
Status close(RuntimeState* state, Status exec_status) override {
|
||||
_multi_cast_data_streamer->set_eos();
|
||||
return DataSink::close(state, exec_status);
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
@ -45,9 +45,7 @@
|
||||
#include "util/brpc_client_cache.h"
|
||||
#include "util/proto_util.h"
|
||||
#include "util/telemetry/telemetry.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/common/sip_hash.h"
|
||||
#include "vec/core/column_with_type_and_name.h"
|
||||
#include "vec/exprs/vexpr.h"
|
||||
#include "vec/runtime/vdata_stream_mgr.h"
|
||||
#include "vec/runtime/vdata_stream_recvr.h"
|
||||
|
||||
Reference in New Issue
Block a user