[fix](pipelinex) fix multi cast sink without init (#25066)

This commit is contained in:
Mryange
2023-10-07 15:49:03 +08:00
committed by GitHub
parent 3c9ff7af39
commit 335804bb25
5 changed files with 54 additions and 39 deletions

View File

@ -23,12 +23,4 @@ OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
}
Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
auto& p = _parent->cast<MultiCastDataStreamSinkOperatorX>();
_shared_state->multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>(
p._row_desc, p._pool, p._cast_sender_count);
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -47,8 +47,6 @@ class MultiCastDataStreamSinkLocalState final
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
friend class MultiCastDataStreamSinkOperatorX;
friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
using Base = PipelineXSinkLocalState<MultiCastDependency>;
@ -72,11 +70,6 @@ public:
_row_desc(row_desc),
_cast_sender_count(cast_sender_count) {}
~MultiCastDataStreamSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override { return Status::OK(); }
Status open(doris::RuntimeState* state) override { return Status::OK(); };
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {

View File

@ -119,7 +119,9 @@ public:
: Base(pool, id),
_consumer_id(consumer_id),
_t_data_stream_sink(sink),
_row_descriptor(row_descriptor) {};
_row_descriptor(row_descriptor) {
_op_name = "MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR";
};
~MultiCastDataStreamerSourceOperatorX() override = default;
Dependency* wait_for_dependency(RuntimeState* state) override {
CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);

View File

@ -275,13 +275,19 @@ Status DataSinkOperatorX<MultiCastDataStreamSinkLocalState>::setup_local_states(
RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) {
auto multi_cast_data_streamer =
static_cast<MultiCastDataStreamSinkOperatorX*>(this)->create_multi_cast_data_streamer();
for (auto& info : infos) {
auto local_state = MultiCastDataStreamSinkLocalState::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
local_state->_shared_state->multi_cast_data_streamer = multi_cast_data_streamer;
for (int i = 0; i < infos.size(); i++) {
auto& info = infos[i];
if (i == 0) {
auto local_state = MultiCastDataStreamSinkLocalState::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
local_state->_shared_state->multi_cast_data_streamer = multi_cast_data_streamer;
} else {
auto* _shared_state =
(typename MultiCastDependency::SharedState*)info.dependency->shared_state();
_shared_state->multi_cast_data_streamer = multi_cast_data_streamer;
}
}
return Status::OK();
}
@ -320,15 +326,28 @@ Status OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
std::vector<LocalStateInfo>& infos) {
int child_count = static_cast<pipeline::UnionSourceOperatorX*>(this)->get_child_count();
std::shared_ptr<DataQueue> data_queue;
for (auto& info : infos) {
auto local_state = UnionSourceLocalState::create_shared(state, this);
state->emplace_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
if (child_count != 0) {
if (!data_queue) {
data_queue = local_state->create_data_queue();
}
if (child_count == 0) {
// for union only have const expr
for (auto& info : infos) {
auto local_state = UnionSourceLocalState::create_shared(state, this);
state->emplace_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
}
return Status::OK();
}
for (int i = 0; i < infos.size(); i++) {
auto& info = infos[i];
if (i == 0) {
auto local_state = UnionSourceLocalState::create_shared(state, this);
state->emplace_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
data_queue = local_state->create_data_queue();
local_state->_shared_state->data_queue = data_queue;
} else {
auto* _shared_state =
(typename UnionDependency::SharedState*)info.dependency->shared_state();
_shared_state->data_queue = data_queue;
}
}
return Status::OK();

View File

@ -313,27 +313,36 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
thrift_sink.multi_cast_stream_sink, row_desc));
for (int i = 0; i < sender_size; ++i) {
auto new_pipeline = add_pipeline();
auto _row_desc =
!thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
? RowDescriptor(
state->desc_tbl(),
{thrift_sink.multi_cast_stream_sink.sinks[i].output_tuple_id},
{false})
: _sink->row_desc();
RowDescriptor* _row_desc = nullptr;
{
auto& tmp_row_desc =
!thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
? RowDescriptor(state->desc_tbl(),
{thrift_sink.multi_cast_stream_sink.sinks[i]
.output_tuple_id},
{false})
: _sink->row_desc();
_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
}
auto source_id = sources[i];
OperatorXPtr source_op;
// 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
source_op.reset(new MultiCastDataStreamerSourceOperatorX(
i, pool, thrift_sink.multi_cast_stream_sink.sinks[i], row_desc, source_id));
static_cast<void>(new_pipeline->add_operator(source_op));
// 2. create and set sink operator of data stream sender for new pipeline
DataSinkOperatorXPtr sink_op;
sink_op.reset(new ExchangeSinkOperatorX(
state, row_desc, thrift_sink.multi_cast_stream_sink.sinks[i],
state, *_row_desc, thrift_sink.multi_cast_stream_sink.sinks[i],
thrift_sink.multi_cast_stream_sink.destinations[i], false));
static_cast<void>(new_pipeline->set_sink(sink_op));
{
TDataSink* t = pool->add(new TDataSink());
t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
RETURN_IF_ERROR(sink_op->init(*t));
}
// 3. set dependency dag
_dag[new_pipeline->id()].push_back(cur_pipeline_id);