[pipelineX](refactor) add repeat node in pipelineX (#23750)

This commit is contained in:
Mryange
2023-09-04 15:55:09 +08:00
committed by GitHub
parent 301a1d97e1
commit 422159bd94
7 changed files with 679 additions and 2 deletions

View File

@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.
#include "repeat_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include <memory>
#include "common/logging.h"
#include "pipeline/exec/operator.h"
#include "vec/core/block.h"
#include "vec/exec/vrepeat_node.h"
@ -42,4 +43,231 @@ Status RepeatOperator::close(doris::RuntimeState* state) {
return StatefulOperator::close(state);
}
RepeatLocalState::RepeatLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent),
_child_block(vectorized::Block::create_unique()),
_child_source_state(SourceState::DEPEND_ON_SOURCE),
_child_eos(false),
_repeat_id_idx(0) {}
Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
auto& p = _parent->cast<Parent>();
_expr_ctxs.resize(p._expr_ctxs.size());
for (size_t i = 0; i < _expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._expr_ctxs[i]->clone(state, _expr_ctxs[i]));
}
return Status::OK();
}
Status RepeatOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(OperatorXBase::init(tnode, state));
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.repeat_node.exprs, _expr_ctxs));
return Status::OK();
}
Status RepeatOperatorX::prepare(RuntimeState* state) {
VLOG_CRITICAL << "VRepeatNode::prepare";
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(OperatorXBase::prepare(state));
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
if (_output_tuple_desc == nullptr) {
return Status::InternalError("Failed to get tuple descriptor.");
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_expr_ctxs, state, _child_x->row_desc()));
for (const auto& slot_desc : _output_tuple_desc->slots()) {
_output_slots.push_back(slot_desc);
}
return Status::OK();
}
Status RepeatOperatorX::open(RuntimeState* state) {
VLOG_CRITICAL << "VRepeatNode::open";
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(OperatorXBase::open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_expr_ctxs, state));
return Status::OK();
}
RepeatOperatorX::RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: Base(pool, tnode, descs),
_slot_id_set_list(tnode.repeat_node.slot_id_set_list),
_all_slot_ids(tnode.repeat_node.all_slot_ids),
_repeat_id_list(tnode.repeat_node.repeat_id_list),
_grouping_list(tnode.repeat_node.grouping_list),
_output_tuple_id(tnode.repeat_node.output_tuple_id) {};
bool RepeatOperatorX::need_more_input_data(RuntimeState* state) const {
auto& local_state = state->get_local_state(id())->cast<RepeatLocalState>();
return !local_state._child_block->rows() && !local_state._child_eos;
}
Status RepeatOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto& local_state = state->get_local_state(id())->cast<RepeatLocalState>();
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data();
RETURN_IF_ERROR(_child_x->get_next_after_projects(state, local_state._child_block.get(),
local_state._child_source_state));
source_state = local_state._child_source_state;
if (local_state._child_block->rows() == 0 &&
local_state._child_source_state != SourceState::FINISHED) {
return Status::OK();
}
RETURN_IF_ERROR(
push(state, local_state._child_block.get(), local_state._child_source_state));
}
if (!need_more_input_data(state)) {
RETURN_IF_ERROR(pull(state, block, source_state));
if (source_state != SourceState::FINISHED && !need_more_input_data(state)) {
source_state = SourceState::MORE_DATA;
} else if (source_state != SourceState::FINISHED &&
source_state == SourceState::MORE_DATA) {
source_state = local_state._child_source_state;
}
}
return Status::OK();
}
Status RepeatOperatorX::get_repeated_block(vectorized::Block* child_block, int repeat_id_idx,
vectorized::Block* output_block) {
DCHECK(child_block != nullptr);
DCHECK_EQ(output_block->rows(), 0);
size_t child_column_size = child_block->columns();
size_t column_size = _output_slots.size();
DCHECK_LT(child_column_size, column_size);
vectorized::MutableBlock m_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots);
vectorized::MutableColumns& columns = m_block.mutable_columns();
/* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2));
* insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1);
* slot_id_set_list=[[0],[1]],repeat_id_idx=0,
* child_block 1,2,1 | 1,3,1 | 2,1,1 | 3,1,1
* output_block 1,null,1,1 | 1,null,1,1 | 2,nul,1,1 | 3,null,1,1
*/
size_t cur_col = 0;
for (size_t i = 0; i < child_column_size; i++) {
const vectorized::ColumnWithTypeAndName& src_column = child_block->get_by_position(i);
std::set<SlotId>& repeat_ids = _slot_id_set_list[repeat_id_idx];
bool is_repeat_slot =
_all_slot_ids.find(_output_slots[cur_col]->id()) != _all_slot_ids.end();
bool is_set_null_slot = repeat_ids.find(_output_slots[cur_col]->id()) == repeat_ids.end();
const auto row_size = src_column.column->size();
if (is_repeat_slot) {
DCHECK(_output_slots[cur_col]->is_nullable());
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(columns[cur_col].get());
auto& null_map = nullable_column->get_null_map_data();
auto* column_ptr = columns[cur_col].get();
// set slot null not in repeat_ids
if (is_set_null_slot) {
nullable_column->resize(row_size);
memset(nullable_column->get_null_map_data().data(), 1,
sizeof(vectorized::UInt8) * row_size);
} else {
if (!src_column.type->is_nullable()) {
for (size_t j = 0; j < row_size; ++j) {
null_map.push_back(0);
}
column_ptr = &nullable_column->get_nested_column();
}
column_ptr->insert_range_from(*src_column.column, 0, row_size);
}
} else {
columns[cur_col]->insert_range_from(*src_column.column, 0, row_size);
}
cur_col++;
}
// Fill grouping ID to block
for (auto slot_idx = 0; slot_idx < _grouping_list.size(); slot_idx++) {
DCHECK_LT(slot_idx, _output_tuple_desc->slots().size());
const SlotDescriptor* _virtual_slot_desc = _output_tuple_desc->slots()[cur_col];
DCHECK_EQ(_virtual_slot_desc->type().type, _output_slots[cur_col]->type().type);
DCHECK_EQ(_virtual_slot_desc->col_name(), _output_slots[cur_col]->col_name());
int64_t val = _grouping_list[slot_idx][repeat_id_idx];
auto* column_ptr = columns[cur_col].get();
DCHECK(!_output_slots[cur_col]->is_nullable());
auto* col = assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(column_ptr);
for (size_t i = 0; i < child_block->rows(); ++i) {
col->insert_value(val);
}
cur_col++;
}
DCHECK_EQ(cur_col, column_size);
return Status::OK();
}
Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block,
SourceState& source_state) {
auto& local_state = state->get_local_state(id())->cast<RepeatLocalState>();
local_state._child_eos = source_state == SourceState::FINISHED;
auto& _intermediate_block = local_state._intermediate_block;
auto& _expr_ctxs = local_state._expr_ctxs;
DCHECK(!_intermediate_block || _intermediate_block->rows() == 0);
DCHECK(!_expr_ctxs.empty());
if (input_block->rows() > 0) {
_intermediate_block = vectorized::Block::create_unique();
for (auto& expr : _expr_ctxs) {
int result_column_id = -1;
RETURN_IF_ERROR(expr->execute(input_block, &result_column_id));
DCHECK(result_column_id != -1);
input_block->get_by_position(result_column_id).column =
input_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
_intermediate_block->insert(input_block->get_by_position(result_column_id));
}
DCHECK_EQ(_expr_ctxs.size(), _intermediate_block->columns());
}
return Status::OK();
}
Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block,
SourceState& source_state) {
auto& local_state = state->get_local_state(id())->cast<RepeatLocalState>();
auto& _repeat_id_idx = local_state._repeat_id_idx;
auto& _child_block = *local_state._child_block;
auto& _child_eos = local_state._child_eos;
auto& _intermediate_block = local_state._intermediate_block;
RETURN_IF_CANCELLED(state);
DCHECK(_repeat_id_idx >= 0);
for (const std::vector<int64_t>& v : _grouping_list) {
DCHECK(_repeat_id_idx <= (int)v.size());
}
DCHECK(output_block->rows() == 0);
if (_intermediate_block && _intermediate_block->rows() > 0) {
RETURN_IF_ERROR(
get_repeated_block(_intermediate_block.get(), _repeat_id_idx, output_block));
_repeat_id_idx++;
int size = _repeat_id_list.size();
if (_repeat_id_idx >= size) {
_intermediate_block->clear();
release_block_memory(_child_block);
_repeat_id_idx = 0;
}
}
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block,
output_block->columns()));
if (_child_eos && _child_block.rows() == 0) {
source_state = SourceState::FINISHED;
}
local_state.reached_limit(output_block, source_state);
COUNTER_SET(local_state._rows_returned_counter, local_state._num_rows_returned);
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -20,7 +20,7 @@
#include <stdint.h>
#include "common/status.h"
#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/exec/vrepeat_node.h"
namespace doris {
@ -44,6 +44,59 @@ public:
Status close(RuntimeState* state) override;
};
class RepeatOperatorX;
class RepeatLocalState final : public PipelineXLocalState<FakeDependency> {
public:
ENABLE_FACTORY_CREATOR(RepeatLocalState);
using Parent = RepeatOperatorX;
using Base = PipelineXLocalState<FakeDependency>;
RepeatLocalState(RuntimeState* state, OperatorXBase* parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
private:
friend class RepeatOperatorX;
std::unique_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;
bool _child_eos;
int _repeat_id_idx;
std::unique_ptr<vectorized::Block> _intermediate_block {};
vectorized::VExprContextSPtrs _expr_ctxs;
};
class RepeatOperatorX final : public OperatorX<RepeatLocalState> {
public:
using Base = OperatorX<RepeatLocalState>;
RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
private:
friend class RepeatLocalState;
Status get_repeated_block(vectorized::Block* child_block, int repeat_id_idx,
vectorized::Block* output_block);
bool need_more_input_data(RuntimeState* state) const;
Status pull(RuntimeState* state, vectorized::Block* output_block, SourceState& source_state);
Status push(RuntimeState* state, vectorized::Block* input_block, SourceState& source_state);
// Slot id set used to indicate those slots need to set to null.
std::vector<std::set<SlotId>> _slot_id_set_list;
// all slot id
std::set<SlotId> _all_slot_ids;
// An integer bitmap list, it indicates the bit position of the exprs not null.
std::vector<int64_t> _repeat_id_list;
std::vector<std::vector<int64_t>> _grouping_list;
TupleId _output_tuple_id;
const TupleDescriptor* _output_tuple_desc;
std::vector<SlotDescriptor*> _output_slots;
vectorized::VExprContextSPtrs _expr_ctxs;
};
} // namespace pipeline
} // namespace doris

View File

@ -28,6 +28,7 @@
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
@ -157,6 +158,10 @@ Status OperatorXBase::get_next_after_projects(RuntimeState* state, vectorized::B
return get_block(state, block, source_state);
}
void OperatorXBase::release_block_memory(vectorized::Block& block) {
block.clear_column_data(_child_x->row_desc().num_materialized_slots());
}
bool PipelineXLocalStateBase::reached_limit() const {
return _parent->_limit != -1 && _num_rows_returned >= _parent->_limit;
}
@ -231,6 +236,7 @@ DECLARE_OPERATOR_X(AnalyticLocalState)
DECLARE_OPERATOR_X(SortLocalState)
DECLARE_OPERATOR_X(AggLocalState)
DECLARE_OPERATOR_X(ExchangeLocalState)
DECLARE_OPERATOR_X(RepeatLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
#undef DECLARE_OPERATOR_X

View File

@ -228,6 +228,11 @@ public:
vectorized::Block* output_block) const;
protected:
/// Release all memory of block which got from child. The block
// 1. clear mem of valid column get from child, make sure child can reuse the mem
// 2. delete and release the column which create by function all and other reason
void release_block_memory(vectorized::Block& block);
template <typename Dependency>
friend class PipelineXLocalState;
friend class PipelineXLocalStateBase;

View File

@ -54,6 +54,7 @@
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/scan_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
@ -622,6 +623,11 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
break;
}
case TPlanNodeType::REPEAT_NODE: {
op.reset(new RepeatOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
break;
}
default:
return Status::InternalError("Unsupported exec type in pipelineX: {}",
print_plan_node_type(tnode.node_type));

View File

@ -0,0 +1,290 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !pipeline --
\N \N
\N 1
\N 2
\N 3
\N 4
\N 5
\N 6
\N 7
\N 8
\N 9
\N 10
\N 11
\N 12
\N 13
\N 14
\N 15
\N 16
\N 17
\N 18
\N 19
\N 20
1 \N
1 2
2 \N
2 13
3 \N
3 7
4 \N
4 15
5 \N
5 10
6 \N
6 20
7 \N
7 18
8 \N
8 12
9 \N
9 4
10 \N
10 1
11 \N
11 17
12 \N
12 9
13 \N
13 11
14 \N
14 5
15 \N
15 19
16 \N
16 3
17 \N
17 14
18 \N
18 8
19 \N
19 6
20 \N
20 16
-- !pipeline --
\N 1 \N
\N 1 7
\N 2 \N
\N 2 18
\N 3 \N
\N 3 13
\N 4 \N
\N 4 14
\N 5 \N
\N 5 12
\N 6 \N
\N 6 4
\N 7 \N
\N 7 11
\N 8 \N
\N 8 5
\N 9 \N
\N 9 2
\N 10 \N
\N 10 15
\N 11 \N
\N 11 3
\N 12 \N
\N 12 6
\N 13 \N
\N 13 1
\N 14 \N
\N 14 17
\N 15 \N
\N 15 10
\N 16 \N
\N 16 20
\N 17 \N
\N 17 19
\N 18 \N
\N 18 8
\N 19 \N
\N 19 9
\N 20 \N
\N 20 16
1 \N \N
1 2 18
2 \N \N
2 13 1
3 \N \N
3 7 11
4 \N \N
4 15 10
5 \N \N
5 10 15
6 \N \N
6 20 16
7 \N \N
7 18 8
8 \N \N
8 12 6
9 \N \N
9 4 14
10 \N \N
10 1 7
11 \N \N
11 17 19
12 \N \N
12 9 2
13 \N \N
13 11 3
14 \N \N
14 5 12
15 \N \N
15 19 9
16 \N \N
16 3 13
17 \N \N
17 14 17
18 \N \N
18 8 5
19 \N \N
19 6 4
20 \N \N
20 16 20
-- !pipelineX --
\N \N
\N 1
\N 2
\N 3
\N 4
\N 5
\N 6
\N 7
\N 8
\N 9
\N 10
\N 11
\N 12
\N 13
\N 14
\N 15
\N 16
\N 17
\N 18
\N 19
\N 20
1 \N
1 2
2 \N
2 13
3 \N
3 7
4 \N
4 15
5 \N
5 10
6 \N
6 20
7 \N
7 18
8 \N
8 12
9 \N
9 4
10 \N
10 1
11 \N
11 17
12 \N
12 9
13 \N
13 11
14 \N
14 5
15 \N
15 19
16 \N
16 3
17 \N
17 14
18 \N
18 8
19 \N
19 6
20 \N
20 16
-- !pipelineX --
\N 1 \N
\N 1 7
\N 2 \N
\N 2 18
\N 3 \N
\N 3 13
\N 4 \N
\N 4 14
\N 5 \N
\N 5 12
\N 6 \N
\N 6 4
\N 7 \N
\N 7 11
\N 8 \N
\N 8 5
\N 9 \N
\N 9 2
\N 10 \N
\N 10 15
\N 11 \N
\N 11 3
\N 12 \N
\N 12 6
\N 13 \N
\N 13 1
\N 14 \N
\N 14 17
\N 15 \N
\N 15 10
\N 16 \N
\N 16 20
\N 17 \N
\N 17 19
\N 18 \N
\N 18 8
\N 19 \N
\N 19 9
\N 20 \N
\N 20 16
1 \N \N
1 2 18
2 \N \N
2 13 1
3 \N \N
3 7 11
4 \N \N
4 15 10
5 \N \N
5 10 15
6 \N \N
6 20 16
7 \N \N
7 18 8
8 \N \N
8 12 6
9 \N \N
9 4 14
10 \N \N
10 1 7
11 \N \N
11 17 19
12 \N \N
12 9 2
13 \N \N
13 11 3
14 \N \N
14 5 12
15 \N \N
15 19 9
16 \N \N
16 3 13
17 \N \N
17 14 17
18 \N \N
18 8 5
19 \N \N
19 6 4
20 \N \N
20 16 20

View File

@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_repeat_operator") {
sql """ DROP TABLE IF EXISTS REPEATNODE """
sql """
CREATE TABLE IF NOT EXISTS REPEATNODE (
`k1` INT(11) NULL COMMENT "",
`k2` INT(11) NULL COMMENT "",
`k3` INT(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2"
);
"""
sql """ set forbid_unknown_col_stats = false """
sql """
INSERT INTO REPEATNODE (k1, k2, k3) VALUES
(5, 10, 15),
(8, 12, 6),
(3, 7, 11),
(9, 4, 14),
(2, 13, 1),
(6, 20, 16),
(11, 17, 19),
(7, 18, 8),
(12, 9, 2),
(4, 15, 10),
(16, 3, 13),
(10, 1, 7),
(14, 5, 12),
(19, 6, 4),
(1, 2, 18),
(13, 11, 3),
(18, 8, 5),
(15, 19, 9),
(17, 14, 17),
(20, 16, 20);
"""
sql"""set enable_pipeline_engine = true; """
qt_pipeline """
SELECT k1, k2
FROM REPEATNODE
GROUP BY GROUPING SETS ((k1, k2), (k2), (k1), ())
ORDER BY k1, k2;
"""
qt_pipeline """
SELECT k1, k2 , k3
FROM REPEATNODE
GROUP BY GROUPING SETS ((k1, k2 , k3), (k2 , k3), (k1), (k2))
ORDER BY k1, k2,k3;
"""
sql"""set experimental_enable_pipeline_x_engine=true; """
qt_pipelineX """
SELECT k1, k2
FROM REPEATNODE
GROUP BY GROUPING SETS ((k1, k2), (k2), (k1), ())
ORDER BY k1, k2;
"""
qt_pipelineX """
SELECT k1, k2 , k3
FROM REPEATNODE
GROUP BY GROUPING SETS ((k1, k2 , k3), (k2 , k3), (k1), (k2))
ORDER BY k1, k2,k3;
"""
}