Refactoring the filtering conditions in the current ExecNode from an expression tree to an array can simplify the process of adding runtime filters. It eliminates the need for complex merge operations and removes the requirement for the frontend to combine expressions into a single entity. By representing the filtering conditions as an array, each condition can be treated individually, making it easier to add runtime filters without the need for complex merging logic. The array can store the individual conditions, and the runtime filter logic can iterate through the array to apply the filters as needed. This refactoring simplifies the codebase, improves readability, and reduces the complexity associated with handling filtering conditions and adding runtime filters. It separates the conditions into discrete entities, enabling more straightforward manipulation and management within the execution node.
305 lines
11 KiB
C++
305 lines
11 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "vec/exec/vschema_scan_node.h"
|
|
|
|
#include <gen_cpp/FrontendService_types.h>
|
|
#include <gen_cpp/PlanNodes_types.h>
|
|
#include <gen_cpp/Types_types.h>
|
|
#include <opentelemetry/nostd/shared_ptr.h>
|
|
|
|
#include <boost/algorithm/string/predicate.hpp>
|
|
#include <ostream>
|
|
#include <utility>
|
|
|
|
#include "common/logging.h"
|
|
#include "common/object_pool.h"
|
|
#include "common/status.h"
|
|
#include "exec/exec_node.h"
|
|
#include "runtime/descriptors.h"
|
|
#include "runtime/runtime_state.h"
|
|
#include "runtime/types.h"
|
|
#include "util/runtime_profile.h"
|
|
#include "util/telemetry/telemetry.h"
|
|
#include "vec/columns/column.h"
|
|
#include "vec/core/block.h"
|
|
#include "vec/core/column_with_type_and_name.h"
|
|
#include "vec/data_types/data_type.h"
|
|
#include "vec/data_types/data_type_factory.hpp"
|
|
#include "vec/exprs/vexpr_context.h"
|
|
|
|
namespace doris {
|
|
class TScanRangeParams;
|
|
} // namespace doris
|
|
|
|
namespace doris::vectorized {
|
|
|
|
VSchemaScanNode::VSchemaScanNode(ObjectPool* pool, const TPlanNode& tnode,
|
|
const DescriptorTbl& descs)
|
|
: ScanNode(pool, tnode, descs),
|
|
_is_init(false),
|
|
_table_name(tnode.schema_scan_node.table_name),
|
|
_tuple_id(tnode.schema_scan_node.tuple_id),
|
|
_dest_tuple_desc(nullptr),
|
|
_tuple_idx(0),
|
|
_slot_num(0),
|
|
_schema_scanner(nullptr) {}
|
|
|
|
VSchemaScanNode::~VSchemaScanNode() {}
|
|
|
|
Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
|
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
|
if (tnode.schema_scan_node.__isset.db) {
|
|
_scanner_param.db = _pool->add(new std::string(tnode.schema_scan_node.db));
|
|
}
|
|
|
|
if (tnode.schema_scan_node.__isset.table) {
|
|
_scanner_param.table = _pool->add(new std::string(tnode.schema_scan_node.table));
|
|
}
|
|
|
|
if (tnode.schema_scan_node.__isset.wild) {
|
|
_scanner_param.wild = _pool->add(new std::string(tnode.schema_scan_node.wild));
|
|
}
|
|
|
|
if (tnode.schema_scan_node.__isset.current_user_ident) {
|
|
_scanner_param.current_user_ident =
|
|
_pool->add(new TUserIdentity(tnode.schema_scan_node.current_user_ident));
|
|
} else {
|
|
if (tnode.schema_scan_node.__isset.user) {
|
|
_scanner_param.user = _pool->add(new std::string(tnode.schema_scan_node.user));
|
|
}
|
|
if (tnode.schema_scan_node.__isset.user_ip) {
|
|
_scanner_param.user_ip = _pool->add(new std::string(tnode.schema_scan_node.user_ip));
|
|
}
|
|
}
|
|
|
|
if (tnode.schema_scan_node.__isset.ip) {
|
|
_scanner_param.ip = _pool->add(new std::string(tnode.schema_scan_node.ip));
|
|
}
|
|
if (tnode.schema_scan_node.__isset.port) {
|
|
_scanner_param.port = tnode.schema_scan_node.port;
|
|
}
|
|
|
|
if (tnode.schema_scan_node.__isset.thread_id) {
|
|
_scanner_param.thread_id = tnode.schema_scan_node.thread_id;
|
|
}
|
|
|
|
if (tnode.schema_scan_node.__isset.catalog) {
|
|
_scanner_param.catalog = _pool->add(new std::string(tnode.schema_scan_node.catalog));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VSchemaScanNode::open(RuntimeState* state) {
|
|
if (nullptr == state) {
|
|
return Status::InternalError("input pointer is nullptr.");
|
|
}
|
|
|
|
if (!_is_init) {
|
|
return Status::InternalError("Open before Init.");
|
|
}
|
|
|
|
if (nullptr == state) {
|
|
return Status::InternalError("input pointer is nullptr.");
|
|
}
|
|
|
|
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
|
RETURN_IF_CANCELLED(state);
|
|
RETURN_IF_ERROR(ExecNode::open(state));
|
|
|
|
if (_scanner_param.user) {
|
|
TSetSessionParams param;
|
|
param.__set_user(*_scanner_param.user);
|
|
//TStatus t_status;
|
|
//RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status));
|
|
//RETURN_IF_ERROR(Status(t_status));
|
|
}
|
|
|
|
return _schema_scanner->start(state);
|
|
}
|
|
|
|
Status VSchemaScanNode::prepare(RuntimeState* state) {
|
|
if (_is_init) {
|
|
return Status::OK();
|
|
}
|
|
|
|
if (nullptr == state) {
|
|
return Status::InternalError("state pointer is nullptr.");
|
|
}
|
|
RETURN_IF_ERROR(ScanNode::prepare(state));
|
|
|
|
// get dest tuple desc
|
|
_dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
|
|
|
|
if (nullptr == _dest_tuple_desc) {
|
|
return Status::InternalError("Failed to get tuple descriptor.");
|
|
}
|
|
|
|
_slot_num = _dest_tuple_desc->slots().size();
|
|
// get src tuple desc
|
|
const SchemaTableDescriptor* schema_table =
|
|
static_cast<const SchemaTableDescriptor*>(_dest_tuple_desc->table_desc());
|
|
|
|
if (nullptr == schema_table) {
|
|
return Status::InternalError("Failed to get schema table descriptor.");
|
|
}
|
|
|
|
// init schema scanner profile
|
|
_scanner_param.profile.reset(new RuntimeProfile("SchemaScanner"));
|
|
_runtime_profile->add_child(_scanner_param.profile.get(), true, nullptr);
|
|
|
|
// new one scanner
|
|
_schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
|
|
|
|
if (nullptr == _schema_scanner) {
|
|
return Status::InternalError("schema scanner get nullptr pointer.");
|
|
}
|
|
|
|
RETURN_IF_ERROR(_schema_scanner->init(&_scanner_param, _pool));
|
|
const std::vector<SchemaScanner::ColumnDesc>& columns_desc(_schema_scanner->get_column_desc());
|
|
|
|
// if src columns size is zero, it's the dummy slots.
|
|
if (0 == columns_desc.size()) {
|
|
_slot_num = 0;
|
|
}
|
|
|
|
// check if type is ok.
|
|
for (int i = 0; i < _slot_num; ++i) {
|
|
// TODO(zhaochun): Is this slow?
|
|
int j = 0;
|
|
for (; j < columns_desc.size(); ++j) {
|
|
if (boost::iequals(_dest_tuple_desc->slots()[i]->col_name(), columns_desc[j].name)) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (j >= columns_desc.size()) {
|
|
LOG(WARNING) << "no match column for this column("
|
|
<< _dest_tuple_desc->slots()[i]->col_name() << ")";
|
|
return Status::InternalError("no match column for this column.");
|
|
}
|
|
|
|
if (columns_desc[j].type != _dest_tuple_desc->slots()[i]->type().type) {
|
|
LOG(WARNING) << "schema not match. input is " << columns_desc[j].name << "("
|
|
<< columns_desc[j].type << ") and output is "
|
|
<< _dest_tuple_desc->slots()[i]->col_name() << "("
|
|
<< _dest_tuple_desc->slots()[i]->type() << ")";
|
|
return Status::InternalError("schema not match.");
|
|
}
|
|
}
|
|
|
|
// TODO(marcel): add int _tuple_idx indexed by TupleId somewhere in runtime_state.h
|
|
_tuple_idx = 0;
|
|
_is_init = true;
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VSchemaScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
|
|
if (state == nullptr || block == nullptr || eos == nullptr) {
|
|
return Status::InternalError("input is NULL pointer");
|
|
}
|
|
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
|
|
|
VLOG_CRITICAL << "VSchemaScanNode::GetNext";
|
|
if (!_is_init) {
|
|
return Status::InternalError("used before initialize.");
|
|
}
|
|
RETURN_IF_CANCELLED(state);
|
|
bool schema_eos = false;
|
|
|
|
const std::vector<SchemaScanner::ColumnDesc>& columns_desc(_schema_scanner->get_column_desc());
|
|
|
|
do {
|
|
block->clear();
|
|
for (int i = 0; i < _slot_num; ++i) {
|
|
auto dest_slot_desc = _dest_tuple_desc->slots()[i];
|
|
block->insert(ColumnWithTypeAndName(dest_slot_desc->get_empty_mutable_column(),
|
|
dest_slot_desc->get_data_type_ptr(),
|
|
dest_slot_desc->col_name()));
|
|
}
|
|
|
|
// src block columns desc is filled by schema_scanner->get_column_desc.
|
|
vectorized::Block src_block;
|
|
for (int i = 0; i < columns_desc.size(); ++i) {
|
|
TypeDescriptor descriptor(columns_desc[i].type);
|
|
auto data_type =
|
|
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
|
|
src_block.insert(ColumnWithTypeAndName(data_type->create_column(), data_type,
|
|
columns_desc[i].name));
|
|
}
|
|
while (true) {
|
|
RETURN_IF_CANCELLED(state);
|
|
|
|
// get all slots from schema table.
|
|
RETURN_IF_ERROR(_schema_scanner->get_next_block(&src_block, &schema_eos));
|
|
|
|
if (schema_eos) {
|
|
*eos = true;
|
|
break;
|
|
}
|
|
|
|
if (src_block.rows() >= state->batch_size()) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (src_block.rows()) {
|
|
// block->check_number_of_rows();
|
|
for (int i = 0; i < _slot_num; ++i) {
|
|
auto dest_slot_desc = _dest_tuple_desc->slots()[i];
|
|
vectorized::MutableColumnPtr column_ptr =
|
|
std::move(*block->get_by_position(i).column).mutate();
|
|
column_ptr->insert_range_from(
|
|
*src_block.get_by_name(dest_slot_desc->col_name()).column, 0,
|
|
src_block.rows());
|
|
}
|
|
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, block,
|
|
_dest_tuple_desc->slots().size()));
|
|
VLOG_ROW << "VSchemaScanNode output rows: " << src_block.rows();
|
|
src_block.clear();
|
|
}
|
|
} while (block->rows() == 0 && !(*eos));
|
|
|
|
reached_limit(block, eos);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VSchemaScanNode::close(RuntimeState* state) {
|
|
if (is_closed()) {
|
|
return Status::OK();
|
|
}
|
|
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
|
return ExecNode::close(state);
|
|
}
|
|
|
|
void VSchemaScanNode::debug_string(int indentation_level, std::stringstream* out) const {
|
|
*out << string(indentation_level * 2, ' ');
|
|
*out << "SchemaScanNode(tupleid=" << _tuple_id << " table=" << _table_name;
|
|
*out << ")" << std::endl;
|
|
|
|
for (int i = 0; i < _children.size(); ++i) {
|
|
_children[i]->debug_string(indentation_level + 1, out);
|
|
}
|
|
}
|
|
|
|
Status VSchemaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace doris::vectorized
|