[vectorized](pipeline) support union node operator (#15031)

This commit is contained in:
zhangstar333
2022-12-19 22:01:56 +08:00
committed by GitHub
parent 7c67fa8651
commit 494eb895d3
18 changed files with 639 additions and 258 deletions

View File

@ -222,6 +222,9 @@ public:
int64_t rows_returned() const { return _num_rows_returned; }
int64_t limit() const { return _limit; }
bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; }
/// Only use in vectorized exec engine to check whether reach limit and cut num row for block
// and add block rows for profile
void reached_limit(vectorized::Block* block, bool* eos);
const std::vector<TupleId>& get_tuple_ids() const { return _tuple_ids; }
RuntimeProfile* runtime_profile() const { return _runtime_profile.get(); }
@ -259,10 +262,6 @@ protected:
// 2. delete and release the column which create by function all and other reason
void release_block_memory(vectorized::Block& block, uint16_t child_idx = 0);
/// Only use in vectorized exec engine to check whether reach limit and cut num row for block
// and add block rows for profile
void reached_limit(vectorized::Block* block, bool* eos);
/// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc
Status do_projections(vectorized::Block* origin_block, vectorized::Block* output_block);

View File

@ -44,7 +44,6 @@ set(PIPELINE_FILES
exec/analytic_source_operator.cpp
exec/streaming_aggregation_source_operator.cpp
exec/streaming_aggregation_sink_operator.cpp
exec/agg_context.cpp
exec/sort_source_operator.cpp
exec/sort_sink_operator.cpp
exec/repeat_operator.cpp
@ -54,6 +53,9 @@ set(PIPELINE_FILES
exec/set_sink_operator.cpp
exec/set_source_operator.cpp
exec/set_probe_sink_operator.cpp
exec/union_sink_operator.cpp
exec/union_source_operator.cpp
exec/data_queue.cpp
exec/select_operator.cpp)
add_library(Pipeline STATIC

View File

@ -1,118 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "agg_context.h"
#include "runtime/descriptors.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_nullable.h"
namespace doris {
namespace pipeline {
AggContext::AggContext()
: _is_finished(false),
_is_canceled(false),
_cur_bytes_in_queue(0),
_cur_blocks_in_queue(0) {}
AggContext::~AggContext() {
DCHECK(_is_finished);
}
std::unique_ptr<vectorized::Block> AggContext::get_free_block() {
{
std::lock_guard<std::mutex> l(_free_blocks_lock);
if (!_free_blocks.empty()) {
auto block = std::move(_free_blocks.back());
_free_blocks.pop_back();
return block;
}
}
return std::make_unique<vectorized::Block>();
}
void AggContext::return_free_block(std::unique_ptr<vectorized::Block> block) {
DCHECK(block->rows() == 0);
std::lock_guard<std::mutex> l(_free_blocks_lock);
_free_blocks.emplace_back(std::move(block));
}
bool AggContext::has_data_or_finished() {
return _cur_blocks_in_queue > 0 || _is_finished;
}
Status AggContext::get_block(std::unique_ptr<vectorized::Block>* block) {
if (_is_canceled) {
return Status::InternalError("AggContext canceled");
}
if (_cur_blocks_in_queue > 0) {
int block_size_t;
{
std::unique_lock<std::mutex> l(_transfer_lock);
auto [block_ptr, block_size] = std::move(_blocks_queue.front());
block_size_t = block_size;
*block = std::move(block_ptr);
_blocks_queue.pop_front();
}
_cur_bytes_in_queue -= block_size_t;
_cur_blocks_in_queue -= 1;
} else {
if (_is_finished) {
_data_exhausted = true;
}
}
return Status::OK();
}
bool AggContext::has_enough_space_to_push() {
return _cur_bytes_in_queue.load() < MAX_BYTE_OF_QUEUE / 2;
}
void AggContext::push_block(std::unique_ptr<vectorized::Block> block) {
if (!block) {
return;
}
auto block_size = block->allocated_bytes();
_cur_bytes_in_queue += block_size;
{
std::unique_lock<std::mutex> l(_transfer_lock);
_blocks_queue.emplace_back(std::move(block), block_size);
_max_bytes_in_queue = std::max(_max_bytes_in_queue, _cur_bytes_in_queue.load());
_max_size_of_queue = std::max(_max_size_of_queue, (int64)_blocks_queue.size());
}
_cur_blocks_in_queue += 1;
}
void AggContext::set_finish() {
_is_finished = true;
}
void AggContext::set_canceled() {
DCHECK(!_is_finished);
_is_canceled = true;
_is_finished = true;
}
bool AggContext::is_finish() {
return _is_finished;
}
} // namespace pipeline
} // namespace doris

View File

@ -1,76 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <mutex>
#include "common/status.h"
namespace doris {
class TupleDescriptor;
namespace vectorized {
class Block;
}
namespace pipeline {
class AggContext {
public:
AggContext();
~AggContext();
std::unique_ptr<vectorized::Block> get_free_block();
void return_free_block(std::unique_ptr<vectorized::Block>);
bool has_data_or_finished();
Status get_block(std::unique_ptr<vectorized::Block>* block);
bool has_enough_space_to_push();
void push_block(std::unique_ptr<vectorized::Block>);
void set_finish();
void set_canceled(); // should set before finish
bool is_finish();
bool data_exhausted() const { return _data_exhausted; }
int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
int64_t max_size_of_queue() const { return _max_size_of_queue; }
private:
std::mutex _free_blocks_lock;
std::vector<std::unique_ptr<vectorized::Block>> _free_blocks;
std::mutex _transfer_lock;
std::list<std::pair<std::unique_ptr<vectorized::Block>, size_t>> _blocks_queue;
bool _data_exhausted = false; // only used by streaming agg source operator
std::atomic<bool> _is_finished;
std::atomic<bool> _is_canceled;
// int64_t just for counter of profile
std::atomic<int64_t> _cur_bytes_in_queue;
std::atomic<uint32_t> _cur_blocks_in_queue;
int64_t _max_bytes_in_queue = 0;
int64_t _max_size_of_queue = 0;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
};
} // namespace pipeline
} // namespace doris

View File

@ -17,7 +17,6 @@
#pragma once
#include "agg_context.h"
#include "operator.h"
namespace doris {

View File

@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "data_queue.h"
#include <mutex>
#include "vec/core/block.h"
namespace doris {
namespace vectorized {
class Block;
}
namespace pipeline {
DataQueue::DataQueue(int child_count) {
_child_count = child_count;
_flag_queue_idx = 0;
_queue_blocks.resize(child_count);
_free_blocks.resize(child_count);
_queue_blocks_lock.resize(child_count);
_free_blocks_lock.resize(child_count);
_is_finished.resize(child_count);
_is_canceled.resize(child_count);
_cur_bytes_in_queue.resize(child_count);
_cur_blocks_nums_in_queue.resize(child_count);
for (int i = 0; i < child_count; ++i) {
_queue_blocks_lock[i].reset(new std::mutex());
_free_blocks_lock[i].reset(new std::mutex());
_is_finished[i] = false;
_is_canceled[i] = false;
_cur_bytes_in_queue[i] = 0;
_cur_blocks_nums_in_queue[i] = 0;
}
}
std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
{
std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
if (!_free_blocks[child_idx].empty()) {
auto block = std::move(_free_blocks[child_idx].front());
_free_blocks[child_idx].pop_front();
return block;
}
}
return std::make_unique<vectorized::Block>();
}
void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int child_idx) {
DCHECK(block->rows() == 0);
std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
_free_blocks[child_idx].emplace_back(std::move(block));
}
//use sink to check can_write
bool DataQueue::has_enough_space_to_push(int child_idx) {
return _cur_bytes_in_queue[child_idx].load() < MAX_BYTE_OF_QUEUE / 2;
}
//use source to check can_read
bool DataQueue::has_data_or_finished(int child_idx) {
return remaining_has_data() || _is_finished[child_idx];
}
//check which queue have data, and save the idx in _flag_queue_idx,
//so next loop, will check the record idx + 1 first
//maybe it's useful with many queue, others maybe always 0
bool DataQueue::remaining_has_data() {
int count = _child_count - 1;
while (count >= 0) {
_flag_queue_idx = (_flag_queue_idx + 1) % _child_count;
if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
return true;
}
count--;
}
return false;
}
//the _flag_queue_idx indicate which queue has data, and in check can_read
//will be set idx in remaining_has_data function
Status DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* output_block,
int* child_idx) {
if (_is_canceled[_flag_queue_idx]) {
return Status::InternalError("Current queue of idx {} have beed canceled: ",
_flag_queue_idx);
}
{
std::lock_guard<std::mutex> l(*_queue_blocks_lock[_flag_queue_idx]);
if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
*output_block = std::move(_queue_blocks[_flag_queue_idx].front());
_queue_blocks[_flag_queue_idx].pop_front();
if (child_idx) {
*child_idx = _flag_queue_idx;
}
_cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes();
_cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
} else {
if (_is_finished[_flag_queue_idx]) {
_data_exhausted = true;
}
}
}
return Status::OK();
}
void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_idx) {
if (!block) {
return;
}
{
std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]);
_cur_bytes_in_queue[child_idx] += block->allocated_bytes();
_queue_blocks[child_idx].emplace_back(std::move(block));
_cur_blocks_nums_in_queue[child_idx] += 1;
//this only use to record the queue[0] for profile
_max_bytes_in_queue = std::max(_max_bytes_in_queue, _cur_bytes_in_queue[0].load());
_max_size_of_queue = std::max(_max_size_of_queue, (int64)_queue_blocks[0].size());
}
}
void DataQueue::set_finish(int child_idx) {
_is_finished[child_idx] = true;
}
void DataQueue::set_canceled(int child_idx) {
DCHECK(!_is_finished[child_idx]);
_is_canceled[child_idx] = true;
_is_finished[child_idx] = true;
}
bool DataQueue::is_finish(int child_idx) {
return _is_finished[child_idx];
}
bool DataQueue::is_all_finish() {
for (int i = 0; i < _child_count; ++i) {
if (_is_finished[i] == false) {
return false;
}
}
return true;
}
} // namespace pipeline
} // namespace doris

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <deque>
#include <memory>
#include <mutex>
#include <vector>
#include "vec/core/block.h"
namespace doris {
namespace vectorized {
class Block;
}
namespace pipeline {
class DataQueue {
public:
//always one is enough, but in union node it's has more children
DataQueue(int child_count = 1);
~DataQueue() = default;
Status get_block_from_queue(std::unique_ptr<vectorized::Block>* block,
int* child_idx = nullptr);
void push_block(std::unique_ptr<vectorized::Block> block, int child_idx = 0);
std::unique_ptr<vectorized::Block> get_free_block(int child_idx = 0);
void push_free_block(std::unique_ptr<vectorized::Block> output_block, int child_idx = 0);
void set_finish(int child_idx = 0);
void set_canceled(int child_idx = 0); // should set before finish
bool is_finish(int child_idx = 0);
bool is_all_finish();
bool has_enough_space_to_push(int child_idx = 0);
bool has_data_or_finished(int child_idx = 0);
bool remaining_has_data();
int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
int64_t max_size_of_queue() const { return _max_size_of_queue; }
bool data_exhausted() const { return _data_exhausted; }
private:
std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
std::vector<std::unique_ptr<std::mutex>> _free_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _free_blocks;
//how many deque will be init, always will be one
int _child_count = 0;
std::deque<std::atomic<bool>> _is_finished;
std::deque<std::atomic<bool>> _is_canceled;
// int64_t just for counter of profile
std::deque<std::atomic<int64_t>> _cur_bytes_in_queue;
std::deque<std::atomic<uint32_t>> _cur_blocks_nums_in_queue;
//this will be indicate which queue has data, it's useful when have many queues
std::atomic<int> _flag_queue_idx = 0;
// only used by streaming agg source operator
bool _data_exhausted = false;
//this only use to record the queue[0] for profile
int64_t _max_bytes_in_queue = 0;
int64_t _max_size_of_queue = 0;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
};
} // namespace pipeline
} // namespace doris

View File

@ -23,8 +23,8 @@ namespace doris::pipeline {
StreamingAggSinkOperator::StreamingAggSinkOperator(OperatorBuilderBase* operator_builder,
ExecNode* agg_node,
std::shared_ptr<AggContext> agg_context)
: StreamingOperator(operator_builder, agg_node), _agg_context(std::move(agg_context)) {}
std::shared_ptr<DataQueue> queue)
: StreamingOperator(operator_builder, agg_node), _data_queue(std::move(queue)) {}
Status StreamingAggSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(StreamingOperator::prepare(state));
@ -36,7 +36,7 @@ Status StreamingAggSinkOperator::prepare(RuntimeState* state) {
bool StreamingAggSinkOperator::can_write() {
// sink and source in diff threads
return _agg_context->has_enough_space_to_push();
return _data_queue->has_enough_space_to_push();
}
Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block,
@ -44,39 +44,38 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in
SCOPED_TIMER(_runtime_profile->total_time_counter());
Status ret = Status::OK();
if (in_block && in_block->rows() > 0) {
auto block_from_ctx = _agg_context->get_free_block();
auto block_from_ctx = _data_queue->get_free_block();
RETURN_IF_ERROR(_node->do_pre_agg(in_block, block_from_ctx.get()));
if (block_from_ctx->rows() == 0) {
_agg_context->return_free_block(std::move(block_from_ctx));
_data_queue->push_free_block(std::move(block_from_ctx));
} else {
_agg_context->push_block(std::move(block_from_ctx));
_data_queue->push_block(std::move(block_from_ctx));
}
}
if (UNLIKELY(source_state == SourceState::FINISHED)) {
_agg_context->set_finish();
_data_queue->set_finish();
}
return Status::OK();
}
Status StreamingAggSinkOperator::close(RuntimeState* state) {
if (_agg_context && !_agg_context->is_finish()) {
if (_data_queue && !_data_queue->is_finish()) {
// finish should be set, if not set here means error.
_agg_context->set_canceled();
_data_queue->set_canceled();
}
COUNTER_SET(_queue_size_counter, _agg_context->max_size_of_queue());
COUNTER_SET(_queue_byte_size_counter, _agg_context->max_bytes_in_queue());
COUNTER_SET(_queue_size_counter, _data_queue->max_size_of_queue());
COUNTER_SET(_queue_byte_size_counter, _data_queue->max_bytes_in_queue());
return StreamingOperator::close(state);
;
}
StreamingAggSinkOperatorBuilder::StreamingAggSinkOperatorBuilder(
int32_t id, ExecNode* exec_node, std::shared_ptr<AggContext> agg_context)
StreamingAggSinkOperatorBuilder::StreamingAggSinkOperatorBuilder(int32_t id, ExecNode* exec_node,
std::shared_ptr<DataQueue> queue)
: OperatorBuilder(id, "StreamingAggSinkOperator", exec_node),
_agg_context(std::move(agg_context)) {}
_data_queue(std::move(queue)) {}
OperatorPtr StreamingAggSinkOperatorBuilder::build_operator() {
return std::make_shared<StreamingAggSinkOperator>(this, _node, _agg_context);
return std::make_shared<StreamingAggSinkOperator>(this, _node, _data_queue);
}
} // namespace doris::pipeline

View File

@ -17,8 +17,8 @@
#pragma once
#include "agg_context.h"
#include "operator.h"
#include "pipeline/exec/data_queue.h"
namespace doris {
namespace vectorized {
@ -31,7 +31,7 @@ namespace pipeline {
class StreamingAggSinkOperatorBuilder final : public OperatorBuilder<vectorized::AggregationNode> {
public:
StreamingAggSinkOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<AggContext>);
StreamingAggSinkOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<DataQueue>);
OperatorPtr build_operator() override;
@ -39,13 +39,13 @@ public:
bool is_source() const override { return false; };
private:
std::shared_ptr<AggContext> _agg_context;
std::shared_ptr<DataQueue> _data_queue;
};
class StreamingAggSinkOperator final : public StreamingOperator<StreamingAggSinkOperatorBuilder> {
public:
StreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode*,
std::shared_ptr<AggContext>);
std::shared_ptr<DataQueue>);
Status prepare(RuntimeState*) override;
@ -61,7 +61,7 @@ private:
RuntimeProfile::Counter* _queue_byte_size_counter;
RuntimeProfile::Counter* _queue_size_counter;
std::shared_ptr<AggContext> _agg_context;
std::shared_ptr<DataQueue> _data_queue;
};
} // namespace pipeline

View File

@ -22,27 +22,27 @@
namespace doris {
namespace pipeline {
StreamingAggSourceOperator::StreamingAggSourceOperator(OperatorBuilderBase* templ, ExecNode* node,
std::shared_ptr<AggContext> agg_context)
: SourceOperator(templ, node), _agg_context(std::move(agg_context)) {}
std::shared_ptr<DataQueue> queue)
: SourceOperator(templ, node), _data_queue(std::move(queue)) {}
bool StreamingAggSourceOperator::can_read() {
return _agg_context->has_data_or_finished();
return _data_queue->has_data_or_finished();
}
Status StreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
bool eos = false;
if (!_agg_context->data_exhausted()) {
if (!_data_queue->data_exhausted()) {
std::unique_ptr<vectorized::Block> agg_block;
RETURN_IF_ERROR(_agg_context->get_block(&agg_block));
RETURN_IF_ERROR(_data_queue->get_block_from_queue(&agg_block));
if (_agg_context->data_exhausted()) {
if (_data_queue->data_exhausted()) {
RETURN_IF_ERROR(_node->pull(state, block, &eos));
} else {
block->swap(*agg_block);
agg_block->clear_column_data(_node->row_desc().num_materialized_slots());
_agg_context->return_free_block(std::move(agg_block));
_data_queue->push_free_block(std::move(agg_block));
}
} else {
RETURN_IF_ERROR(_node->pull(state, block, &eos));
@ -54,12 +54,12 @@ Status StreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Bl
}
StreamingAggSourceOperatorBuilder::StreamingAggSourceOperatorBuilder(
int32_t id, ExecNode* exec_node, std::shared_ptr<AggContext> agg_context)
int32_t id, ExecNode* exec_node, std::shared_ptr<DataQueue> queue)
: OperatorBuilder(id, "StreamingAggSourceOperator", exec_node),
_agg_context(std::move(agg_context)) {}
_data_queue(std::move(queue)) {}
OperatorPtr StreamingAggSourceOperatorBuilder::build_operator() {
return std::make_shared<StreamingAggSourceOperator>(this, _node, _agg_context);
return std::make_shared<StreamingAggSourceOperator>(this, _node, _data_queue);
}
} // namespace pipeline

View File

@ -18,8 +18,8 @@
#include <atomic>
#include "agg_context.h"
#include "operator.h"
#include "pipeline/exec/data_queue.h"
namespace doris {
namespace vectorized {
@ -30,25 +30,25 @@ namespace pipeline {
class StreamingAggSourceOperatorBuilder final
: public OperatorBuilder<vectorized::AggregationNode> {
public:
StreamingAggSourceOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<AggContext>);
StreamingAggSourceOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<DataQueue>);
bool is_source() const override { return true; }
OperatorPtr build_operator() override;
private:
std::shared_ptr<AggContext> _agg_context;
std::shared_ptr<DataQueue> _data_queue;
};
class StreamingAggSourceOperator final : public SourceOperator<StreamingAggSourceOperatorBuilder> {
public:
StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr<AggContext>);
StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr<DataQueue>);
bool can_read() override;
Status get_block(RuntimeState*, vectorized::Block*, SourceState& source_state) override;
Status open(RuntimeState*) override { return Status::OK(); }
private:
std::shared_ptr<AggContext> _agg_context;
std::shared_ptr<DataQueue> _data_queue;
};
} // namespace pipeline

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "union_sink_operator.h"
#include "common/status.h"
namespace doris::pipeline {
UnionSinkOperatorBuilder::UnionSinkOperatorBuilder(int32_t id, int child_id, ExecNode* node,
std::shared_ptr<DataQueue> queue)
: OperatorBuilder(id, "UnionSinkOperatorBuilder", node),
_cur_child_id(child_id),
_data_queue(queue) {};
UnionSinkOperator::UnionSinkOperator(OperatorBuilderBase* operator_builder, int child_id,
ExecNode* node, std::shared_ptr<DataQueue> queue)
: StreamingOperator(operator_builder, node), _cur_child_id(child_id), _data_queue(queue) {};
OperatorPtr UnionSinkOperatorBuilder::build_operator() {
return std::make_shared<UnionSinkOperator>(this, _cur_child_id, _node, _data_queue);
}
Status UnionSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
if (_output_block == nullptr) {
_output_block = _data_queue->get_free_block(_cur_child_id);
}
if (_cur_child_id < _node->get_first_materialized_child_idx()) { //pass_through
if (in_block->rows() > 0) {
_output_block->swap(*in_block);
_data_queue->push_block(std::move(_output_block), _cur_child_id);
}
} else if (_node->get_first_materialized_child_idx() != _node->children_count() &&
_cur_child_id < _node->children_count()) { //need materialized
this->_node->materialize_child_block(state, _cur_child_id, in_block, _output_block.get());
} else {
return Status::InternalError("maybe can't reach here, execute const expr: {}, {}, {}",
_cur_child_id, _node->get_first_materialized_child_idx(),
_node->children_count());
}
if (UNLIKELY(source_state == SourceState::FINISHED)) {
//if _cur_child_id eos, need check to push block
//Now here can't check _output_block rows, even it's row==0, also need push block
//because maybe sink is eos and queue have none data, if not push block
//the source can't can_read again and can't set source finished
if (_output_block) {
_data_queue->push_block(std::move(_output_block), _cur_child_id);
}
_data_queue->set_finish(_cur_child_id);
return Status::OK();
}
// not eos and block rows is enough to output,so push block
if (_output_block && (_output_block->rows() >= state->batch_size())) {
_data_queue->push_block(std::move(_output_block), _cur_child_id);
}
return Status::OK();
}
Status UnionSinkOperator::close(RuntimeState* state) {
if (_data_queue && !_data_queue->is_finish(_cur_child_id)) {
// finish should be set, if not set here means error.
_data_queue->set_canceled(_cur_child_id);
}
return StreamingOperator::close(state);
}
} // namespace doris::pipeline

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "operator.h"
#include "pipeline/exec/data_queue.h"
#include "vec/core/block.h"
namespace doris {
namespace vectorized {
class VUnionNode;
class Block;
} // namespace vectorized
namespace pipeline {
class UnionSinkOperatorBuilder final : public OperatorBuilder<vectorized::VUnionNode> {
public:
UnionSinkOperatorBuilder(int32_t id, int child_id, ExecNode* node,
std::shared_ptr<DataQueue> queue);
OperatorPtr build_operator() override;
bool is_sink() const override { return true; };
private:
int _cur_child_id;
std::shared_ptr<DataQueue> _data_queue;
};
class UnionSinkOperator final : public StreamingOperator<UnionSinkOperatorBuilder> {
public:
UnionSinkOperator(OperatorBuilderBase* operator_builder, int child_id, ExecNode* node,
std::shared_ptr<DataQueue> queue);
bool can_write() override { return true; };
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
// this operator in sink open directly return, do this work in source
Status open(RuntimeState* /*state*/) override { return Status::OK(); }
Status close(RuntimeState* state) override;
private:
int _cur_child_id;
std::shared_ptr<DataQueue> _data_queue;
std::unique_ptr<vectorized::Block> _output_block;
};
} // namespace pipeline
} // namespace doris

View File

@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline/exec/union_source_operator.h"
#include <opentelemetry/common/threadlocal.h>
#include "common/status.h"
#include "pipeline/exec/data_queue.h"
namespace doris {
namespace vectorized {
class Block;
}
namespace pipeline {
UnionSourceOperatorBuilder::UnionSourceOperatorBuilder(int32_t id, ExecNode* node,
std::shared_ptr<DataQueue> queue)
: OperatorBuilder(id, "UnionSourceOperatorBuilder", node), _data_queue(queue) {};
OperatorPtr UnionSourceOperatorBuilder::build_operator() {
return std::make_shared<UnionSourceOperator>(this, _node, _data_queue);
}
UnionSourceOperator::UnionSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* node,
std::shared_ptr<DataQueue> queue)
: SourceOperator(operator_builder, node),
_data_queue(queue),
_need_read_for_const_expr(true) {};
// we assumed it can read to process const expr, Although we don't know whether there is
// ,and queue have data, could read also
bool UnionSourceOperator::can_read() {
return _need_read_for_const_expr || _data_queue->remaining_has_data();
}
Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
// here we precess const expr firstly
if (_need_read_for_const_expr) {
if (this->_node->has_more_const(state)) {
this->_node->get_next_const(state, block);
}
_need_read_for_const_expr = this->_node->has_more_const(state);
} else {
std::unique_ptr<vectorized::Block> output_block;
int child_idx = 0;
_data_queue->get_block_from_queue(&output_block, &child_idx);
block->swap(*output_block);
output_block->clear_column_data(_node->row_desc().num_materialized_slots());
_data_queue->push_free_block(std::move(output_block), child_idx);
}
bool reached_limit = false;
this->_node->reached_limit(block, &reached_limit);
//have exectue const expr, queue have no data any more, and child could be colsed
source_state = ((!_need_read_for_const_expr && !_data_queue->remaining_has_data() &&
_data_queue->is_all_finish()) ||
reached_limit)
? SourceState::FINISHED
: SourceState::DEPEND_ON_SOURCE;
return Status::OK();
}
} // namespace pipeline
} // namespace doris

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "operator.h"
#include "pipeline/exec/data_queue.h"
namespace doris {
namespace vectorized {
class VUnionNode;
}
namespace pipeline {
class UnionSourceOperatorBuilder final : public OperatorBuilder<vectorized::VUnionNode> {
public:
UnionSourceOperatorBuilder(int32_t id, ExecNode* node, std::shared_ptr<DataQueue>);
bool is_source() const override { return true; }
OperatorPtr build_operator() override;
private:
std::shared_ptr<DataQueue> _data_queue;
};
class UnionSourceOperator final : public SourceOperator<UnionSourceOperatorBuilder> {
public:
UnionSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* node,
std::shared_ptr<DataQueue>);
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
bool can_read() override;
private:
std::shared_ptr<DataQueue> _data_queue;
bool _need_read_for_const_expr;
};
} // namespace pipeline
} // namespace doris

View File

@ -20,7 +20,6 @@
#include <gen_cpp/DataSinks_types.h>
#include <thrift/protocol/TDebugProtocol.h>
#include "exec/agg_context.h"
#include "exec/aggregation_sink_operator.h"
#include "exec/aggregation_source_operator.h"
#include "exec/analytic_sink_operator.h"
@ -52,11 +51,14 @@
#include "pipeline/exec/assert_num_rows_operator.h"
#include "pipeline/exec/broker_scan_operator.h"
#include "pipeline/exec/const_value_operator.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/table_function_operator.h"
#include "pipeline/exec/union_sink_operator.h"
#include "pipeline/exec/union_source_operator.h"
#include "pipeline_task.h"
#include "runtime/client_cache.h"
#include "runtime/fragment_mgr.h"
@ -346,14 +348,24 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
}
case TPlanNodeType::UNION_NODE: {
auto* union_node = assert_cast<vectorized::VUnionNode*>(node);
if (union_node->children_count() == 0) {
if (union_node->children_count() == 0 &&
union_node->get_first_materialized_child_idx() == 0) { // only have const expr
OperatorBuilderPtr builder =
std::make_shared<ConstValueOperatorBuilder>(next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
} else {
return Status::InternalError(
"Unsupported exec type in pipeline: {}, later will be support.",
print_plan_node_type(node_type));
int child_count = union_node->children_count();
auto data_queue = std::make_shared<DataQueue>(child_count);
for (int child_id = 0; child_id < child_count; ++child_id) {
auto new_child_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline));
OperatorBuilderPtr child_sink_builder = std::make_shared<UnionSinkOperatorBuilder>(
next_operator_builder_id(), child_id, union_node, data_queue);
RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder));
}
OperatorBuilderPtr source_builder = std::make_shared<UnionSourceOperatorBuilder>(
next_operator_builder_id(), union_node, data_queue);
RETURN_IF_ERROR(cur_pipe->add_operator(source_builder));
}
break;
}
@ -362,13 +374,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
auto new_pipe = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
if (agg_node->is_streaming_preagg()) {
auto agg_ctx = std::make_shared<AggContext>();
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>(
next_operator_builder_id(), agg_node, agg_ctx);
next_operator_builder_id(), agg_node, data_queue);
RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
OperatorBuilderPtr pre_agg_source = std::make_shared<StreamingAggSourceOperatorBuilder>(
next_operator_builder_id(), agg_node, agg_ctx);
next_operator_builder_id(), agg_node, data_queue);
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
} else {
OperatorBuilderPtr agg_sink =

View File

@ -17,6 +17,8 @@
#include "vec/exec/vunion_node.h"
#include <gen_cpp/AgentService_types.h>
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
@ -157,7 +159,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
child(_child_idx)->get_next_span(), _child_eos);
SCOPED_TIMER(_materialize_exprs_evaluate_timer);
if (child_block.rows() > 0) {
mblock.merge(materialize_block(&child_block));
mblock.merge(materialize_block(&child_block, _child_idx));
}
// It shouldn't be the case that we reached the limit because we shouldn't have
// incremented '_num_rows_returned' yet.
@ -190,7 +192,8 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
mem_reuse ? MutableBlock::build_mutable_block(block)
: MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
_row_descriptor)));
for (; _const_expr_list_idx < _const_expr_lists.size(); ++_const_expr_list_idx) {
for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <= state->batch_size();
++_const_expr_list_idx) {
Block tmp_block;
tmp_block.insert({vectorized::ColumnUInt8::create(1),
std::make_shared<vectorized::DataTypeUInt8>(), ""});
@ -203,6 +206,7 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
tmp_block.erase_not_in(result_list);
if (tmp_block.rows() > 0) {
mblock.merge(tmp_block);
tmp_block.clear();
}
}
@ -220,6 +224,27 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
return Status::OK();
}
//for pipeline operator
Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id,
vectorized::Block* input_block,
vectorized::Block* output_block) {
DCHECK_LT(child_id, _children.size());
DCHECK(!is_child_passthrough(child_id));
bool mem_reuse = output_block->mem_reuse();
MutableBlock mblock =
mem_reuse ? MutableBlock::build_mutable_block(output_block)
: MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
_row_descriptor)));
if (input_block->rows() > 0) {
mblock.merge(materialize_block(input_block, child_id));
if (!mem_reuse) {
output_block->swap(mblock.to_block());
}
}
return Status::OK();
}
Status VUnionNode::get_next(RuntimeState* state, Block* block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VUnionNode::get_next");
SCOPED_TIMER(_runtime_profile->total_time_counter());
@ -286,8 +311,8 @@ void VUnionNode::debug_string(int indentation_level, std::stringstream* out) con
*out << ")" << std::endl;
}
Block VUnionNode::materialize_block(Block* src_block) {
const std::vector<VExprContext*>& child_exprs = _child_expr_lists[_child_idx];
Block VUnionNode::materialize_block(Block* src_block, int child_idx) {
const std::vector<VExprContext*>& child_exprs = _child_expr_lists[child_idx];
ColumnsWithTypeAndName colunms;
for (size_t i = 0; i < child_exprs.size(); ++i) {
int result_column_id = -1;

View File

@ -36,9 +36,22 @@ public:
Status alloc_resource(RuntimeState* state) override;
void release_resource(RuntimeState* state) override;
Status materialize_child_block(RuntimeState* state, int child_id,
vectorized::Block* input_block, vectorized::Block* output_block);
size_t children_count() const { return _children.size(); }
int get_first_materialized_child_idx() const { return _first_materialized_child_idx; }
/// Returns true if there are still rows to be returned from constant expressions.
bool has_more_const(const RuntimeState* state) const {
return state->per_fragment_instance_idx() == 0 &&
_const_expr_list_idx < _const_expr_lists.size();
}
/// GetNext() for the constant expression case.
Status get_next_const(RuntimeState* state, Block* block);
private:
/// Const exprs materialized by this node. These exprs don't refer to any children.
/// Only materialized by the first fragment instance to avoid duplication.
@ -76,13 +89,10 @@ private:
/// non-passthrough child.
Status get_next_materialized(RuntimeState* state, Block* block);
/// GetNext() for the constant expression case.
Status get_next_const(RuntimeState* state, Block* block);
/// Evaluates exprs for the current child and materializes the results into 'tuple_buf',
/// which is attached to 'dst_block'. Runs until 'dst_block' is at capacity, or all rows
/// have been consumed from the current child block. Updates '_child_row_idx'.
Block materialize_block(Block* dst_block);
Block materialize_block(Block* dst_block, int child_idx);
Status get_error_msg(const std::vector<VExprContext*>& exprs);
@ -101,12 +111,6 @@ private:
return _first_materialized_child_idx != _children.size() && _child_idx < _children.size();
}
/// Returns true if there are still rows to be returned from constant expressions.
bool has_more_const(const RuntimeState* state) const {
return state->per_fragment_instance_idx() == 0 &&
_const_expr_list_idx < _const_expr_lists.size();
}
void debug_string(int indentation_level, std::stringstream* out) const override;
};