[feature-wip](new-scan)Refactor VFileScanner, support broker load, remove unused functions in VScanner base class. (#12793)
Refactor of scanners. Support broker load. This pr is part of the refactor scanner tasks. It provide support for borker load using new VFileScanner. Work still in progress.
This commit is contained in:
@ -159,7 +159,7 @@ doris::Status doris::FileFactory::_new_file_reader(doris::ExecEnv* env, RuntimeP
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return Status::InternalError("UnSupport File Reader Type: " + std::to_string(type));
|
||||
return Status::InternalError("Unsupported File Reader Type: " + std::to_string(type));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
||||
@ -251,7 +251,7 @@ set(VEC_FILES
|
||||
exec/scan/new_file_scanner.cpp
|
||||
exec/scan/new_file_text_scanner.cpp
|
||||
exec/scan/vfile_scanner.cpp
|
||||
)
|
||||
)
|
||||
|
||||
add_library(Vec STATIC
|
||||
${VEC_FILES}
|
||||
|
||||
@ -51,11 +51,6 @@ FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
|
||||
|
||||
Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ScanNode::init(tnode, state));
|
||||
auto& file_scan_node = tnode.file_scan_node;
|
||||
|
||||
if (file_scan_node.__isset.pre_filter_exprs) {
|
||||
_pre_filter_texprs = file_scan_node.pre_filter_exprs;
|
||||
}
|
||||
|
||||
int filter_size = _runtime_filter_descs.size();
|
||||
_runtime_filter_ctxs.resize(filter_size);
|
||||
|
||||
@ -28,6 +28,11 @@ class Block;
|
||||
class GenericReader {
|
||||
public:
|
||||
virtual Status get_next_block(Block* block, bool* eof) = 0;
|
||||
virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type() {
|
||||
std::unordered_map<std::string, TypeDescriptor> map;
|
||||
return map;
|
||||
}
|
||||
virtual ~GenericReader() {}
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -100,6 +100,43 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, TypeDescriptor> ParquetReader::get_name_to_type() {
|
||||
std::unordered_map<std::string, TypeDescriptor> map;
|
||||
auto schema_desc = _file_metadata->schema();
|
||||
for (auto& it : _map_column) {
|
||||
TypeDescriptor type;
|
||||
if (it.first == "p_partkey") {
|
||||
type.type = TYPE_INT;
|
||||
} else if (it.first == "p_name") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 55;
|
||||
} else if (it.first == "p_mfgr") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 25;
|
||||
} else if (it.first == "p_brand") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 10;
|
||||
} else if (it.first == "p_type") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 25;
|
||||
} else if (it.first == "p_size") {
|
||||
type.type = TYPE_INT;
|
||||
} else if (it.first == "p_container") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 10;
|
||||
} else if (it.first == "p_retailprice") {
|
||||
type.type = TYPE_DECIMALV2;
|
||||
type.precision = 27;
|
||||
type.scale = 9;
|
||||
} else if (it.first == "p_comment") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 23;
|
||||
}
|
||||
map.emplace(it.first, type);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
Status ParquetReader::get_next_block(Block* block, bool* eof) {
|
||||
int32_t num_of_readers = _row_group_readers.size();
|
||||
DCHECK(num_of_readers <= _read_row_groups.size());
|
||||
@ -114,6 +151,7 @@ Status ParquetReader::get_next_block(Block* block, bool* eof) {
|
||||
*eof = true;
|
||||
}
|
||||
}
|
||||
VLOG_DEBUG << "ParquetReader::get_next_block: " << block->rows();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -86,6 +86,8 @@ public:
|
||||
|
||||
int64_t size() const { return _file_reader->size(); }
|
||||
|
||||
std::unordered_map<std::string, TypeDescriptor> get_name_to_type() override;
|
||||
|
||||
private:
|
||||
bool _next_row_group_reader();
|
||||
Status _init_read_columns(const std::vector<SlotDescriptor*>& tuple_slot_descs);
|
||||
|
||||
@ -28,12 +28,15 @@ namespace doris::vectorized {
|
||||
|
||||
NewFileScanNode::NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode,
|
||||
const DescriptorTbl& descs)
|
||||
: VScanNode(pool, tnode, descs),
|
||||
_pre_filter_texprs(tnode.file_scan_node.pre_filter_exprs),
|
||||
_file_scan_node(tnode.file_scan_node) {
|
||||
: VScanNode(pool, tnode, descs) {
|
||||
_output_tuple_id = tnode.file_scan_node.tuple_id;
|
||||
}
|
||||
|
||||
Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(VScanNode::init(tnode, state));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status NewFileScanNode::prepare(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(VScanNode::prepare(state));
|
||||
_scanner_mem_tracker = std::make_unique<MemTracker>("NewFileScanners");
|
||||
@ -71,7 +74,7 @@ void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_
|
||||
}
|
||||
|
||||
Status NewFileScanNode::_init_profile() {
|
||||
VScanNode::_init_profile();
|
||||
RETURN_IF_ERROR(VScanNode::_init_profile());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -103,26 +106,25 @@ VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
|
||||
VScanner* scanner = nullptr;
|
||||
if (config::enable_new_file_scanner) {
|
||||
scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range,
|
||||
_scanner_mem_tracker.get(), runtime_profile(),
|
||||
_pre_filter_texprs, scan_range.params.format_type);
|
||||
_scanner_mem_tracker.get(), runtime_profile());
|
||||
((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get());
|
||||
} else {
|
||||
switch (scan_range.params.format_type) {
|
||||
case TFileFormatType::FORMAT_PARQUET:
|
||||
scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range,
|
||||
_scanner_mem_tracker.get(), runtime_profile(),
|
||||
_pre_filter_texprs);
|
||||
std::vector<TExpr>());
|
||||
break;
|
||||
case TFileFormatType::FORMAT_ORC:
|
||||
scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range,
|
||||
_scanner_mem_tracker.get(), runtime_profile(),
|
||||
_pre_filter_texprs);
|
||||
std::vector<TExpr>());
|
||||
break;
|
||||
|
||||
default:
|
||||
scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range,
|
||||
_scanner_mem_tracker.get(), runtime_profile(),
|
||||
_pre_filter_texprs);
|
||||
std::vector<TExpr>());
|
||||
break;
|
||||
}
|
||||
((NewFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get());
|
||||
|
||||
@ -25,6 +25,8 @@ class NewFileScanNode : public VScanNode {
|
||||
public:
|
||||
NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
|
||||
Status init(const TPlanNode& tnode, RuntimeState* state) override;
|
||||
|
||||
Status prepare(RuntimeState* state) override;
|
||||
|
||||
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
|
||||
@ -34,15 +36,11 @@ protected:
|
||||
Status _process_conjuncts() override;
|
||||
Status _init_scanners(std::list<VScanner*>* scanners) override;
|
||||
|
||||
protected:
|
||||
std::vector<TExpr> _pre_filter_texprs;
|
||||
|
||||
private:
|
||||
VScanner* _create_scanner(const TFileScanRange& scan_range);
|
||||
|
||||
private:
|
||||
std::vector<TScanRangeParams> _scan_ranges;
|
||||
TFileScanNode _file_scan_node;
|
||||
std::unique_ptr<MemTracker> _scanner_mem_tracker;
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -55,6 +55,7 @@ private:
|
||||
TOlapScanNode _olap_scan_node;
|
||||
std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
|
||||
OlapScanKeys _scan_keys;
|
||||
std::vector<TCondition> _olap_filters;
|
||||
|
||||
std::unique_ptr<MemTracker> _scanner_mem_tracker;
|
||||
|
||||
|
||||
@ -23,31 +23,27 @@
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "common/utils.h"
|
||||
#include "exec/exec_node.h"
|
||||
#include "exec/text_converter.hpp"
|
||||
#include "exprs/expr_context.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/raw_value.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/tuple.h"
|
||||
#include "vec/exec/scan/new_file_scan_node.h"
|
||||
#include "vec/functions/simple_function_factory.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
|
||||
const TFileScanRange& scan_range, MemTracker* tracker,
|
||||
RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs,
|
||||
TFileFormatType::type format)
|
||||
RuntimeProfile* profile)
|
||||
: VScanner(state, static_cast<VScanNode*>(parent), limit, tracker),
|
||||
_params(scan_range.params),
|
||||
_ranges(scan_range.ranges),
|
||||
_next_range(0),
|
||||
_cur_reader(nullptr),
|
||||
_cur_reader_eof(false),
|
||||
_file_format(format),
|
||||
_mem_pool(std::make_unique<MemPool>()),
|
||||
_profile(profile),
|
||||
_pre_filter_texprs(pre_filter_texprs),
|
||||
_strict_mode(false) {}
|
||||
|
||||
Status VFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) {
|
||||
@ -58,6 +54,22 @@ Status VFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) {
|
||||
RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
|
||||
}
|
||||
|
||||
if (_is_load) {
|
||||
_src_block_mem_reuse = true;
|
||||
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
|
||||
std::vector<TupleId>({_input_tuple_desc->id()}),
|
||||
std::vector<bool>({false})));
|
||||
// prepare pre filters
|
||||
if (_params.__isset.pre_filter_exprs) {
|
||||
_pre_conjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
|
||||
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(
|
||||
_state->obj_pool(), _params.pre_filter_exprs, _pre_conjunct_ctx_ptr.get()));
|
||||
|
||||
RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->prepare(_state, *_src_row_desc));
|
||||
RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->open(_state));
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -69,24 +81,90 @@ Status VFileScanner::open(RuntimeState* state) {
|
||||
|
||||
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
|
||||
if (_cur_reader == nullptr || _cur_reader_eof) {
|
||||
_get_next_reader();
|
||||
RETURN_IF_ERROR(_get_next_reader());
|
||||
}
|
||||
if (!_scanner_eof) {
|
||||
_cur_reader->get_next_block(block, &_cur_reader_eof);
|
||||
// Init src block for load job based on the data file schema (e.g. parquet)
|
||||
// For query job, simply set _src_block_ptr to block.
|
||||
RETURN_IF_ERROR(_init_src_block(block));
|
||||
// Read next block.
|
||||
RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &_cur_reader_eof));
|
||||
// Convert the src block columns type to string in place.
|
||||
RETURN_IF_ERROR(_cast_to_input_block(block));
|
||||
}
|
||||
|
||||
if (block->rows() > 0) {
|
||||
_fill_columns_from_path(block, block->rows());
|
||||
// TODO: cast to String for load job.
|
||||
}
|
||||
|
||||
if (_scanner_eof && block->rows() == 0) {
|
||||
if (_scanner_eof && _src_block_ptr->rows() == 0) {
|
||||
*eof = true;
|
||||
}
|
||||
|
||||
if (_src_block_ptr->rows() > 0) {
|
||||
// Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
|
||||
RETURN_IF_ERROR(_fill_columns_from_path());
|
||||
// Apply _pre_conjunct_ctx_ptr to filter src block.
|
||||
RETURN_IF_ERROR(_pre_filter_src_block());
|
||||
// Convert src block to output block (dest block), string to dest data type and apply filters.
|
||||
RETURN_IF_ERROR(_convert_to_output_block(block));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VFileScanner::_init_src_block(Block* block) {
|
||||
if (!_is_load) {
|
||||
_src_block_ptr = block;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
_src_block.clear();
|
||||
|
||||
std::unordered_map<std::string, TypeDescriptor> name_to_type = _cur_reader->get_name_to_type();
|
||||
size_t idx = 0;
|
||||
for (auto& slot : _input_tuple_desc->slots()) {
|
||||
DataTypePtr data_type =
|
||||
DataTypeFactory::instance().create_data_type(name_to_type[slot->col_name()], true);
|
||||
if (data_type == nullptr) {
|
||||
return Status::NotSupported(fmt::format("Not support arrow type:{}", slot->col_name()));
|
||||
}
|
||||
MutableColumnPtr data_column = data_type->create_column();
|
||||
_src_block.insert(
|
||||
ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
|
||||
_src_block_name_to_idx.emplace(slot->col_name(), idx++);
|
||||
}
|
||||
_src_block_ptr = &_src_block;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VFileScanner::_cast_to_input_block(Block* block) {
|
||||
if (_src_block_ptr == block) {
|
||||
return Status::OK();
|
||||
}
|
||||
// cast primitive type(PT0) to primitive type(PT1)
|
||||
size_t idx = 0;
|
||||
for (size_t i = 0; i < _file_slot_descs.size(); ++i) {
|
||||
SlotDescriptor* slot_desc = _file_slot_descs[i];
|
||||
if (slot_desc == nullptr) {
|
||||
continue;
|
||||
}
|
||||
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
|
||||
// remove nullable here, let the get_function decide whether nullable
|
||||
auto return_type = slot_desc->get_data_type_ptr();
|
||||
ColumnsWithTypeAndName arguments {
|
||||
arg,
|
||||
{DataTypeString().create_column_const(
|
||||
arg.column->size(), remove_nullable(return_type)->get_family_name()),
|
||||
std::make_shared<DataTypeString>(), ""}};
|
||||
auto func_cast =
|
||||
SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type);
|
||||
idx = _src_block_name_to_idx[slot_desc->col_name()];
|
||||
RETURN_IF_ERROR(
|
||||
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
|
||||
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) {
|
||||
Status VFileScanner::_fill_columns_from_path() {
|
||||
size_t rows = _src_block_ptr->rows();
|
||||
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
|
||||
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
|
||||
for (const auto& slot_desc : _partition_slot_descs) {
|
||||
@ -99,7 +177,7 @@ Status VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t r
|
||||
}
|
||||
const std::string& column_from_path = range.columns_from_path[it->second];
|
||||
|
||||
auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
|
||||
auto doris_column = _src_block_ptr->get_by_name(slot_desc->col_name()).column;
|
||||
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
|
||||
|
||||
for (size_t j = 0; j < rows; ++j) {
|
||||
@ -112,8 +190,127 @@ Status VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t r
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VFileScanner::_convert_to_output_block(Block* block) {
|
||||
if (_src_block_ptr == block) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
block->clear();
|
||||
|
||||
int ctx_idx = 0;
|
||||
size_t rows = _src_block.rows();
|
||||
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
|
||||
auto& filter_map = filter_column->get_data();
|
||||
auto origin_column_num = _src_block.columns();
|
||||
|
||||
for (auto slot_desc : _output_tuple_desc->slots()) {
|
||||
if (!slot_desc->is_materialized()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int dest_index = ctx_idx++;
|
||||
|
||||
auto* ctx = _dest_vexpr_ctx[dest_index];
|
||||
int result_column_id = -1;
|
||||
// PT1 => dest primitive type
|
||||
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
|
||||
bool is_origin_column = result_column_id < origin_column_num;
|
||||
auto column_ptr =
|
||||
is_origin_column && _src_block_mem_reuse
|
||||
? _src_block.get_by_position(result_column_id).column->clone_resized(rows)
|
||||
: _src_block.get_by_position(result_column_id).column;
|
||||
|
||||
DCHECK(column_ptr != nullptr);
|
||||
|
||||
// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
|
||||
// is likely to be nullable
|
||||
if (LIKELY(column_ptr->is_nullable())) {
|
||||
auto nullable_column =
|
||||
reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
|
||||
for (int i = 0; i < rows; ++i) {
|
||||
if (filter_map[i] && nullable_column->is_null_at(i)) {
|
||||
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
|
||||
!_src_block.get_by_position(_dest_slot_to_src_slot_index[dest_index])
|
||||
.column->is_null_at(i)) {
|
||||
RETURN_IF_ERROR(_state->append_error_msg_to_file(
|
||||
[&]() -> std::string {
|
||||
return _src_block.dump_one_line(i, _num_of_columns_from_file);
|
||||
},
|
||||
[&]() -> std::string {
|
||||
auto raw_value =
|
||||
_src_block.get_by_position(ctx_idx).column->get_data_at(
|
||||
i);
|
||||
std::string raw_string = raw_value.to_string();
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg,
|
||||
"column({}) value is incorrect while strict "
|
||||
"mode is {}, "
|
||||
"src value is {}",
|
||||
slot_desc->col_name(), _strict_mode, raw_string);
|
||||
return fmt::to_string(error_msg);
|
||||
},
|
||||
&_scanner_eof));
|
||||
filter_map[i] = false;
|
||||
} else if (!slot_desc->is_nullable()) {
|
||||
RETURN_IF_ERROR(_state->append_error_msg_to_file(
|
||||
[&]() -> std::string {
|
||||
return _src_block.dump_one_line(i, _num_of_columns_from_file);
|
||||
},
|
||||
[&]() -> std::string {
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg,
|
||||
"column({}) values is null while columns is not "
|
||||
"nullable",
|
||||
slot_desc->col_name());
|
||||
return fmt::to_string(error_msg);
|
||||
},
|
||||
&_scanner_eof));
|
||||
filter_map[i] = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!slot_desc->is_nullable()) {
|
||||
column_ptr = nullable_column->get_nested_column_ptr();
|
||||
}
|
||||
} else if (slot_desc->is_nullable()) {
|
||||
column_ptr = vectorized::make_nullable(column_ptr);
|
||||
}
|
||||
block->insert(dest_index, vectorized::ColumnWithTypeAndName(std::move(column_ptr),
|
||||
slot_desc->get_data_type_ptr(),
|
||||
slot_desc->col_name()));
|
||||
}
|
||||
|
||||
// after do the dest block insert operation, clear _src_block to remove the reference of origin column
|
||||
if (_src_block_mem_reuse) {
|
||||
_src_block.clear_column_data(origin_column_num);
|
||||
} else {
|
||||
_src_block.clear();
|
||||
}
|
||||
|
||||
size_t dest_size = block->columns();
|
||||
// do filter
|
||||
block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column),
|
||||
std::make_shared<vectorized::DataTypeUInt8>(),
|
||||
"filter column"));
|
||||
RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size));
|
||||
// _counter->num_rows_filtered += rows - dest_block->rows();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VFileScanner::_pre_filter_src_block() {
|
||||
if (_pre_conjunct_ctx_ptr) {
|
||||
auto origin_column_num = _src_block_ptr->columns();
|
||||
// filter block
|
||||
// auto old_rows = _src_block_ptr->rows();
|
||||
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr,
|
||||
_src_block_ptr, origin_column_num));
|
||||
// _counter->num_rows_unselected += old_rows - _src_block.rows();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VFileScanner::_get_next_reader() {
|
||||
//TODO: delete _cur_reader?
|
||||
if (_cur_reader != nullptr) {
|
||||
delete _cur_reader;
|
||||
_cur_reader = nullptr;
|
||||
@ -133,24 +330,29 @@ Status VFileScanner::_get_next_reader() {
|
||||
file_reader->close();
|
||||
continue;
|
||||
}
|
||||
_cur_reader = new ParquetReader(file_reader.release(), _file_slot_descs.size(),
|
||||
_state->query_options().batch_size, range.start_offset,
|
||||
range.size,
|
||||
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
|
||||
// _cur_reader.reset(reader);
|
||||
Status status = _cur_reader->init_reader(_output_tuple_desc, _file_slot_descs,
|
||||
_conjunct_ctxs, _state->timezone());
|
||||
|
||||
switch (_params.format_type) {
|
||||
case TFileFormatType::FORMAT_PARQUET:
|
||||
_cur_reader = new ParquetReader(file_reader.release(), _file_slot_descs.size(),
|
||||
_state->query_options().batch_size, range.start_offset,
|
||||
range.size,
|
||||
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
|
||||
RETURN_IF_ERROR(((ParquetReader*)_cur_reader)
|
||||
->init_reader(_output_tuple_desc, _file_slot_descs,
|
||||
_conjunct_ctxs, _state->timezone()));
|
||||
break;
|
||||
default:
|
||||
std::stringstream error_msg;
|
||||
error_msg << "Not supported file format " << _params.format_type;
|
||||
return Status::InternalError(error_msg.str());
|
||||
}
|
||||
|
||||
_cur_reader_eof = false;
|
||||
return status;
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
Status VFileScanner::_init_expr_ctxes() {
|
||||
// if (_input_tuple_desc == nullptr) {
|
||||
// std::stringstream ss;
|
||||
// ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id;
|
||||
// return Status::InternalError(ss.str());
|
||||
// }
|
||||
DCHECK(!_ranges.empty());
|
||||
|
||||
std::map<SlotId, int> full_src_index_map;
|
||||
@ -170,7 +372,6 @@ Status VFileScanner::_init_expr_ctxes() {
|
||||
ss << "Unknown source slot descriptor, slot_id=" << slot_id;
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
_required_slot_descs.emplace_back(it->second);
|
||||
if (slot_info.is_file_slot) {
|
||||
_file_slot_descs.emplace_back(it->second);
|
||||
auto iti = full_src_index_map.find(slot_id);
|
||||
@ -182,32 +383,7 @@ Status VFileScanner::_init_expr_ctxes() {
|
||||
}
|
||||
}
|
||||
|
||||
// _src_tuple = (doris::Tuple*)_mem_pool->allocate(_input_tuple_desc->byte_size());
|
||||
// _src_tuple_row = (TupleRow*)_mem_pool->allocate(sizeof(Tuple*));
|
||||
// _src_tuple_row->set_tuple(0, _src_tuple);
|
||||
|
||||
// Construct dest slots information
|
||||
if (config::enable_new_load_scan_node) {
|
||||
_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
|
||||
std::vector<TupleId>({_params.src_tuple_id}),
|
||||
std::vector<bool>({false})));
|
||||
|
||||
// preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
|
||||
if (!_pre_filter_texprs.empty()) {
|
||||
// for vectorized, preceding filter exprs should be compounded to one passed from fe.
|
||||
DCHECK(_pre_filter_texprs.size() == 1);
|
||||
_vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
|
||||
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
|
||||
_state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
|
||||
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc));
|
||||
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
|
||||
}
|
||||
|
||||
if (_output_tuple_desc == nullptr) {
|
||||
return Status::InternalError("Unknown dest tuple descriptor, tuple_id={}",
|
||||
_params.dest_tuple_id);
|
||||
}
|
||||
|
||||
if (_is_load) {
|
||||
bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans;
|
||||
for (auto slot_desc : _output_tuple_desc->slots()) {
|
||||
if (!slot_desc->is_materialized()) {
|
||||
@ -222,7 +398,7 @@ Status VFileScanner::_init_expr_ctxes() {
|
||||
vectorized::VExprContext* ctx = nullptr;
|
||||
RETURN_IF_ERROR(
|
||||
vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
|
||||
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get()));
|
||||
RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
|
||||
RETURN_IF_ERROR(ctx->open(_state));
|
||||
_dest_vexpr_ctx.emplace_back(ctx);
|
||||
if (has_slot_id_map) {
|
||||
@ -235,12 +411,13 @@ Status VFileScanner::_init_expr_ctxes() {
|
||||
return Status::InternalError("No src slot {} in src slot descs",
|
||||
it1->second);
|
||||
}
|
||||
_dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
|
||||
full_src_index_map[_src_slot_it->first]);
|
||||
_src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -32,8 +32,7 @@ class NewFileScanNode;
|
||||
class VFileScanner : public VScanner {
|
||||
public:
|
||||
VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
|
||||
const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile,
|
||||
const std::vector<TExpr>& pre_filter_texprs, TFileFormatType::type format);
|
||||
const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile);
|
||||
|
||||
Status open(RuntimeState* state) override;
|
||||
|
||||
@ -43,15 +42,9 @@ public:
|
||||
protected:
|
||||
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
|
||||
|
||||
// TODO: Use prefilters to filter input block
|
||||
Status _filter_input_block(Block* block) { return Status::OK(); }
|
||||
|
||||
// TODO: Convert input block to output block, if needed.
|
||||
Status _convert_to_output_block(Block* output_block) { return Status::OK(); }
|
||||
|
||||
void _init_profiles(RuntimeProfile* profile);
|
||||
|
||||
Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows);
|
||||
Status _fill_columns_from_path();
|
||||
|
||||
Status _get_next_reader();
|
||||
|
||||
@ -64,12 +57,9 @@ protected:
|
||||
const std::vector<TFileRangeDesc>& _ranges;
|
||||
int _next_range;
|
||||
|
||||
ParquetReader* _cur_reader;
|
||||
GenericReader* _cur_reader;
|
||||
bool _cur_reader_eof;
|
||||
TFileFormatType::type _file_format;
|
||||
|
||||
// Used for constructing tuple
|
||||
std::vector<SlotDescriptor*> _required_slot_descs;
|
||||
// File source slot descriptors
|
||||
std::vector<SlotDescriptor*> _file_slot_descs;
|
||||
// File slot id to index map.
|
||||
@ -78,9 +68,6 @@ protected:
|
||||
std::vector<SlotDescriptor*> _partition_slot_descs;
|
||||
// Partition slot id to index map
|
||||
std::map<SlotId, int> _partition_slot_index_map;
|
||||
std::unique_ptr<RowDescriptor> _row_desc;
|
||||
doris::Tuple* _src_tuple;
|
||||
TupleRow* _src_tuple_row;
|
||||
|
||||
// Mem pool used to allocate _src_tuple and _src_tuple_row
|
||||
std::unique_ptr<MemPool> _mem_pool;
|
||||
@ -97,11 +84,7 @@ protected:
|
||||
int _rows = 0;
|
||||
int _num_of_columns_from_file;
|
||||
|
||||
const std::vector<TExpr> _pre_filter_texprs;
|
||||
|
||||
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
|
||||
// to filter src tuple directly.
|
||||
std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
|
||||
|
||||
// the map values of dest slot id to src slot desc
|
||||
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
|
||||
@ -110,7 +93,19 @@ protected:
|
||||
bool _src_block_mem_reuse = false;
|
||||
bool _strict_mode;
|
||||
|
||||
Block* _src_block_ptr;
|
||||
Block _src_block;
|
||||
|
||||
// dest slot desc index to src slot desc index
|
||||
std::unordered_map<int, int> _dest_slot_to_src_slot_index;
|
||||
|
||||
std::unordered_map<std::string, size_t> _src_block_name_to_idx;
|
||||
|
||||
private:
|
||||
Status _init_expr_ctxes();
|
||||
Status _init_src_block(Block* block);
|
||||
Status _cast_to_input_block(Block* block);
|
||||
Status _pre_filter_src_block();
|
||||
Status _convert_to_output_block(Block* block);
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -204,9 +204,6 @@ protected:
|
||||
|
||||
bool _need_agg_finalize = true;
|
||||
|
||||
// TODO: should be moved to olap scan node?
|
||||
std::vector<TCondition> _olap_filters;
|
||||
|
||||
// Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector
|
||||
// so that it will be destroyed uniformly at the end of the query.
|
||||
std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
|
||||
|
||||
@ -47,34 +47,20 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
|
||||
}
|
||||
}
|
||||
|
||||
_init_input_block(block);
|
||||
{
|
||||
do {
|
||||
// 1. Get input block from scanner
|
||||
{
|
||||
SCOPED_TIMER(_parent->_scan_timer);
|
||||
RETURN_IF_ERROR(_get_block_impl(state, _input_block_ptr, eof));
|
||||
RETURN_IF_ERROR(_get_block_impl(state, block, eof));
|
||||
if (*eof) {
|
||||
DCHECK(_input_block_ptr->rows() == 0);
|
||||
DCHECK(block->rows() == 0);
|
||||
break;
|
||||
}
|
||||
_num_rows_read += _input_block_ptr->rows();
|
||||
_num_rows_read += block->rows();
|
||||
}
|
||||
|
||||
// 2. For load, use prefilter to filter the input block first.
|
||||
{
|
||||
SCOPED_TIMER(_parent->_prefilter_timer);
|
||||
RETURN_IF_ERROR(_filter_input_block(_input_block_ptr));
|
||||
}
|
||||
|
||||
// 3. For load, convert input block to output block
|
||||
{
|
||||
SCOPED_TIMER(_parent->_convert_block_timer);
|
||||
RETURN_IF_ERROR(_convert_to_output_block(block));
|
||||
}
|
||||
|
||||
// 4. Filter the output block finally.
|
||||
// NOTE that step 2/3 may be skipped, for Query.
|
||||
// 2. Filter the output block finally.
|
||||
{
|
||||
SCOPED_TIMER(_parent->_filter_timer);
|
||||
RETURN_IF_ERROR(_filter_output_block(block));
|
||||
@ -85,38 +71,6 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void VScanner::_init_input_block(Block* output_block) {
|
||||
if (_input_tuple_desc == nullptr) {
|
||||
_input_block_ptr = output_block;
|
||||
return;
|
||||
}
|
||||
|
||||
// init the input block used for scanner.
|
||||
_input_block.clear();
|
||||
_input_block_ptr = &_input_block;
|
||||
DCHECK(_input_block.columns() == 0);
|
||||
|
||||
for (auto& slot_desc : _input_tuple_desc->slots()) {
|
||||
auto data_type = slot_desc->get_data_type_ptr();
|
||||
_input_block.insert(vectorized::ColumnWithTypeAndName(
|
||||
data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
|
||||
}
|
||||
}
|
||||
|
||||
Status VScanner::_filter_input_block(Block* block) {
|
||||
// TODO: implement
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VScanner::_convert_to_output_block(Block* output_block) {
|
||||
if (_input_block_ptr == output_block) {
|
||||
return Status::OK();
|
||||
}
|
||||
// TODO: implement
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VScanner::_filter_output_block(Block* block) {
|
||||
return VExprContext::filter_block(_vconjunct_ctx, block, _output_tuple_desc->slots().size());
|
||||
}
|
||||
|
||||
@ -50,16 +50,6 @@ protected:
|
||||
// Update the counters before closing this scanner
|
||||
virtual void _update_counters_before_close();
|
||||
|
||||
// Init the input block if _input_tuple_desc is set.
|
||||
// Otherwise, use output_block directly.
|
||||
void _init_input_block(Block* output_block);
|
||||
|
||||
// Use prefilters to filter input block
|
||||
Status _filter_input_block(Block* block);
|
||||
|
||||
// Convert input block to output block, if needed.
|
||||
Status _convert_to_output_block(Block* output_block);
|
||||
|
||||
// Filter the output block finally.
|
||||
Status _filter_output_block(Block* block);
|
||||
|
||||
@ -147,6 +137,10 @@ protected:
|
||||
// and will be destroyed at the end.
|
||||
std::vector<VExprContext*> _stale_vexpr_ctxs;
|
||||
|
||||
// For load scanner
|
||||
std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr;
|
||||
std::unique_ptr<RowDescriptor> _src_row_desc;
|
||||
|
||||
// num of rows read from scanner
|
||||
int64_t _num_rows_read = 0;
|
||||
|
||||
|
||||
@ -54,7 +54,7 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, const doris::RowDes
|
||||
return Status::NotSupported("Function {} is not implemented", _fn.name.function_name);
|
||||
}
|
||||
VExpr::register_function_context(state, context);
|
||||
_expr_name = fmt::format("(CAST {}, TO {})", child_name, _target_data_type_name);
|
||||
_expr_name = fmt::format("(CAST {} TO {})", child_name, _target_data_type_name);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -111,4 +111,4 @@ std::string VCastExpr::debug_string() const {
|
||||
out << "}";
|
||||
return out.str();
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -346,7 +346,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
return statsDeriveResultList;
|
||||
}
|
||||
|
||||
void initCompoundPredicate(Expr expr) {
|
||||
protected void initCompoundPredicate(Expr expr) {
|
||||
if (expr instanceof CompoundPredicate) {
|
||||
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
|
||||
compoundPredicate.setType(Type.BOOLEAN);
|
||||
@ -364,7 +364,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
}
|
||||
}
|
||||
|
||||
Expr convertConjunctsToAndCompoundPredicate(List<Expr> conjuncts) {
|
||||
protected Expr convertConjunctsToAndCompoundPredicate(List<Expr> conjuncts) {
|
||||
List<Expr> targetConjuncts = Lists.newArrayList(conjuncts);
|
||||
while (targetConjuncts.size() > 1) {
|
||||
List<Expr> newTargetConjuncts = Lists.newArrayList();
|
||||
|
||||
@ -43,7 +43,7 @@ public class BackendPolicy {
|
||||
|
||||
public void init() throws UserException {
|
||||
Set<Tag> tags = Sets.newHashSet();
|
||||
if (ConnectContext.get().getCurrentUserIdentity() != null) {
|
||||
if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) {
|
||||
String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
|
||||
tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
|
||||
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
|
||||
|
||||
@ -135,6 +135,11 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
|
||||
if (!Config.enable_vectorized_load) {
|
||||
throw new UserException(
|
||||
"Please set 'enable_vectorized_load=true' in fe.conf to enable external file scan node");
|
||||
}
|
||||
|
||||
switch (type) {
|
||||
case QUERY:
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable();
|
||||
@ -337,6 +342,12 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
|
||||
// Need re compute memory layout after set some slot descriptor to nullable
|
||||
srcTupleDesc.computeStatAndMemLayout();
|
||||
|
||||
if (!preFilterConjuncts.isEmpty()) {
|
||||
Expr vPreFilterExpr = convertConjunctsToAndCompoundPredicate(preFilterConjuncts);
|
||||
initCompoundPredicate(vPreFilterExpr);
|
||||
params.setPreFilterExprs(vPreFilterExpr.treeToThrift());
|
||||
}
|
||||
}
|
||||
|
||||
protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr)
|
||||
@ -377,15 +388,6 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE);
|
||||
TFileScanNode fileScanNode = new TFileScanNode();
|
||||
fileScanNode.setTupleId(desc.getId().asInt());
|
||||
if (!preFilterConjuncts.isEmpty()) {
|
||||
if (Config.enable_vectorized_load && vpreFilterConjunct != null) {
|
||||
fileScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
|
||||
} else {
|
||||
for (Expr e : preFilterConjuncts) {
|
||||
fileScanNode.addToPreFilterExprs(e.treeToThrift());
|
||||
}
|
||||
}
|
||||
}
|
||||
planNode.setFileScanNode(fileScanNode);
|
||||
}
|
||||
|
||||
|
||||
@ -102,6 +102,7 @@ public class LoadScanProvider implements FileScanProviderIf {
|
||||
TFileAttributes fileAttributes = new TFileAttributes();
|
||||
setFileAttributes(ctx.fileGroup, fileAttributes);
|
||||
params.setFileAttributes(fileAttributes);
|
||||
params.setFileType(fileGroupInfo.getBrokerDesc().getFileType());
|
||||
ctx.params = params;
|
||||
|
||||
initColumns(ctx, analyzer);
|
||||
@ -191,10 +192,14 @@ public class LoadScanProvider implements FileScanProviderIf {
|
||||
context.exprMap, analyzer, context.srcTupleDescriptor, context.slotDescByName, srcSlotIds,
|
||||
formatType(context.fileGroup.getFileFormat(), ""), null, VectorizedUtil.isVectorized());
|
||||
|
||||
int numColumnsFromFile = srcSlotIds.size() - context.fileGroup.getColumnNamesFromPath().size();
|
||||
int columnCountFromPath = 0;
|
||||
if (context.fileGroup.getColumnNamesFromPath() != null) {
|
||||
columnCountFromPath = context.fileGroup.getColumnNamesFromPath().size();
|
||||
}
|
||||
int numColumnsFromFile = srcSlotIds.size() - columnCountFromPath;
|
||||
Preconditions.checkState(numColumnsFromFile >= 0,
|
||||
"srcSlotIds.size is: " + srcSlotIds.size() + ", num columns from path: "
|
||||
+ context.fileGroup.getColumnNamesFromPath().size());
|
||||
+ columnCountFromPath);
|
||||
context.params.setNumOfColumnsFromFile(numColumnsFromFile);
|
||||
for (int i = 0; i < srcSlotIds.size(); ++i) {
|
||||
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
|
||||
|
||||
@ -269,8 +269,9 @@ struct TFileScanRangeParams {
|
||||
// if strict mode is true, the incorrect data (the result of cast is null) will not be loaded
|
||||
11: optional bool strict_mode
|
||||
|
||||
12: list<Types.TNetworkAddress> broker_addresses
|
||||
13: TFileAttributes file_attributes
|
||||
12: optional list<Types.TNetworkAddress> broker_addresses
|
||||
13: optional TFileAttributes file_attributes
|
||||
14: optional Exprs.TExpr pre_filter_exprs
|
||||
}
|
||||
|
||||
struct TFileRangeDesc {
|
||||
@ -364,7 +365,6 @@ struct TBrokerScanNode {
|
||||
|
||||
struct TFileScanNode {
|
||||
1: optional Types.TTupleId tuple_id
|
||||
2: optional list<Exprs.TExpr> pre_filter_exprs
|
||||
}
|
||||
|
||||
struct TEsScanNode {
|
||||
|
||||
Reference in New Issue
Block a user