132 lines
4.8 KiB
C++
132 lines
4.8 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#pragma once
|
|
|
|
#include "exec/exec_node.h"
|
|
#include "vec/exprs/table_function/table_function.h"
|
|
#include "vec/exprs/vexpr.h"
|
|
|
|
namespace doris::vectorized {
|
|
|
|
class VTableFunctionNode final : public ExecNode {
|
|
public:
|
|
VTableFunctionNode(doris::ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
|
~VTableFunctionNode() override = default;
|
|
|
|
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
|
|
Status prepare(RuntimeState* state) override;
|
|
Status open(RuntimeState* state) override {
|
|
START_AND_SCOPE_SPAN(state->get_tracer(), span, "TableFunctionNode::open");
|
|
RETURN_IF_ERROR(alloc_resource(state));
|
|
return _children[0]->open(state);
|
|
}
|
|
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
|
bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; }
|
|
|
|
void release_resource(doris::RuntimeState* state) override {
|
|
Expr::close(_fn_ctxs, state);
|
|
vectorized::VExpr::close(_vfn_ctxs, state);
|
|
|
|
if (_num_rows_filtered_counter != nullptr) {
|
|
COUNTER_SET(_num_rows_filtered_counter, static_cast<int64_t>(_num_rows_filtered));
|
|
}
|
|
ExecNode::release_resource(state);
|
|
}
|
|
|
|
Status push(RuntimeState*, vectorized::Block* input_block, bool eos) override {
|
|
_child_eos = eos;
|
|
if (input_block->rows() == 0) {
|
|
return Status::OK();
|
|
}
|
|
|
|
for (TableFunction* fn : _fns) {
|
|
RETURN_IF_ERROR(fn->process_init(input_block));
|
|
}
|
|
RETURN_IF_ERROR(_process_next_child_row());
|
|
return Status::OK();
|
|
}
|
|
|
|
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override {
|
|
RETURN_IF_ERROR(get_expanded_block(state, output_block, eos));
|
|
reached_limit(output_block, eos);
|
|
return Status::OK();
|
|
}
|
|
|
|
Block* get_child_block() { return &_child_block; }
|
|
|
|
private:
|
|
Status _prepare_output_slot_ids(const TPlanNode& tnode);
|
|
bool _is_inner_and_empty();
|
|
|
|
// return:
|
|
// 0: all fns are eos
|
|
// -1: all fns are not eos
|
|
// >0: some of fns are eos
|
|
int _find_last_fn_eos_idx();
|
|
|
|
bool _roll_table_functions(int last_eos_idx);
|
|
|
|
Status _process_next_child_row();
|
|
|
|
/* Now the output tuples for table function node is base_table_tuple + tf1 + tf2 + ...
|
|
But not all slots are used, the real used slots are inside table_function_node.outputSlotIds.
|
|
For case like explode_bitmap:
|
|
SELECT a2,count(*) as a3 FROM A WHERE a1 IN
|
|
(SELECT c1 FROM B LATERAL VIEW explode_bitmap(b1) C as c1)
|
|
GROUP BY a2 ORDER BY a3;
|
|
Actually we only need to output column c1, no need to output columns in bitmap table B.
|
|
Copy large bitmap columns are very expensive and slow.
|
|
|
|
Here we check if the slot is really used, otherwise we avoid copy it and just insert a default value.
|
|
|
|
A better solution is:
|
|
1. FE: create a new output tuple based on the real output slots;
|
|
2. BE: refractor (V)TableFunctionNode output rows based no the new tuple;
|
|
*/
|
|
inline bool slot_need_copy(SlotId slot_id) const {
|
|
auto id = _output_slots[slot_id]->id();
|
|
return (id < _output_slot_ids.size()) && (_output_slot_ids[id]);
|
|
}
|
|
|
|
Status get_expanded_block(RuntimeState* state, Block* output_block, bool* eos);
|
|
|
|
Block _child_block;
|
|
std::vector<SlotDescriptor*> _child_slots;
|
|
std::vector<SlotDescriptor*> _output_slots;
|
|
int64_t _cur_child_offset = 0;
|
|
|
|
std::vector<ExprContext*> _fn_ctxs;
|
|
std::vector<vectorized::VExprContext*> _vfn_ctxs;
|
|
|
|
std::vector<TableFunction*> _fns;
|
|
std::vector<void*> _fn_values;
|
|
std::vector<int64_t> _fn_value_lengths;
|
|
int _fn_num = 0;
|
|
|
|
std::vector<bool> _output_slot_ids;
|
|
|
|
std::vector<int> _child_slot_sizes;
|
|
// indicate if child node reach the end
|
|
bool _child_eos = false;
|
|
|
|
RuntimeProfile::Counter* _num_rows_filtered_counter = nullptr;
|
|
uint64_t _num_rows_filtered = 0;
|
|
};
|
|
|
|
} // namespace doris::vectorized
|