Files
doris/be/src/vec/exec/scan/vfile_scanner.cpp
Mingyu Chen 7c0bcbdca1 [enhance](parquet-reader) cache file meta of parquet to speed up query (#18074)
Problem:
1. FE will split the parquet file into split. So a file can have several splits.
2. BE will scan each split, read the footer of the parquet file.
3. If 2 splits belongs to a same parquet file, the footer of this file will be read twice.

This PR mainly changes:
1. Use kv cache to cache the footer of parquet file.
2. The kv cache is belong to a scan node, so all parquet reader belong to this scan node will share same kv cache.
3. In cache, the key is "meta_file_path", the value is parsed thrift footer.

The KV Cache is sharded into mutlti sub cache.
So that different file can use different sub cache, avoid blocking each other

In my test, a query with 26 splits can reduce the footer parse time from 4s -> 1s
2023-03-25 23:22:57 +08:00

845 lines
38 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/scan/vfile_scanner.h"
#include <fmt/format.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <vec/data_types/data_type_factory.hpp>
#include "common/logging.h"
#include "common/utils.h"
#include "exec/text_converter.hpp"
#include "io/cache/block/block_file_cache_profile.h"
#include "olap/iterators.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/iceberg_reader.h"
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/simple_function_factory.h"
namespace doris::vectorized {
using namespace ErrorCode;
VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, RuntimeProfile* profile,
ShardedKVCache* kv_cache)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
_params(scan_range.params),
_ranges(scan_range.ranges),
_next_range(0),
_cur_reader(nullptr),
_cur_reader_eof(false),
_kv_cache(kv_cache),
_strict_mode(false) {
if (scan_range.params.__isset.strict_mode) {
_strict_mode = scan_range.params.strict_mode;
}
}
Status VFileScanner::prepare(
VExprContext** vconjunct_ctx_ptr,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const std::unordered_map<std::string, int>* colname_to_slot_id) {
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
_colname_to_value_range = colname_to_value_range;
_col_name_to_slot_id = colname_to_slot_id;
_get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime");
_cast_to_input_block_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerCastInputBlockTime");
_fill_path_columns_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillPathColumnTime");
_fill_missing_columns_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillMissingColumnTime");
_pre_filter_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerPreFilterTimer");
_convert_to_output_block_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
_empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT);
_file_cache_statistics.reset(new io::FileCacheStatistics());
_io_ctx.reset(new io::IOContext());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
_io_ctx->query_id = &_state->query_id();
_io_ctx->enable_file_cache = _state->query_options().enable_file_cache;
if (_is_load) {
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_input_tuple_desc->id()}),
std::vector<bool>({false})));
// prepare pre filters
if (_params.__isset.pre_filter_exprs) {
_pre_conjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(
_state->obj_pool(), _params.pre_filter_exprs, _pre_conjunct_ctx_ptr.get()));
RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->prepare(_state, *_src_row_desc));
RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->open(_state));
}
}
_default_val_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_real_tuple_desc->id()}),
std::vector<bool>({false})));
return Status::OK();
}
Status VFileScanner::_split_conjuncts(VExpr* conjunct_expr_root) {
static constexpr auto is_leaf = [](VExpr* expr) { return !expr->is_and_expr(); };
if (conjunct_expr_root != nullptr) {
if (is_leaf(conjunct_expr_root)) {
auto impl = conjunct_expr_root->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) : conjunct_expr_root;
VExprContext* new_ctx = _state->obj_pool()->add(new VExprContext(cur_expr));
_vconjunct_ctx->clone_fn_contexts(new_ctx);
RETURN_IF_ERROR(new_ctx->prepare(_state, *_default_val_row_desc));
RETURN_IF_ERROR(new_ctx->open(_state));
std::vector<int> slot_ids;
_get_slot_ids(cur_expr, &slot_ids);
if (slot_ids.size() == 0) {
_not_single_slot_filter_conjuncts.emplace_back(new_ctx);
return Status::OK();
}
bool single_slot = true;
for (int i = 1; i < slot_ids.size(); i++) {
if (slot_ids[i] != slot_ids[0]) {
single_slot = false;
break;
}
}
if (single_slot) {
SlotId slot_id = slot_ids[0];
if (_slot_id_to_filter_conjuncts.find(slot_id) ==
_slot_id_to_filter_conjuncts.end()) {
_slot_id_to_filter_conjuncts.insert({slot_id, std::vector<VExprContext*>()});
}
_slot_id_to_filter_conjuncts[slot_id].emplace_back(new_ctx);
} else {
_not_single_slot_filter_conjuncts.emplace_back(new_ctx);
}
} else {
RETURN_IF_ERROR(_split_conjuncts(conjunct_expr_root->children()[0]));
RETURN_IF_ERROR(_split_conjuncts(conjunct_expr_root->children()[1]));
}
}
return Status::OK();
}
void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
for (VExpr* child_expr : expr->children()) {
if (child_expr->is_slot_ref()) {
VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr);
slot_ids->emplace_back(slot_ref->slot_id());
}
_get_slot_ids(child_expr, slot_ids);
}
}
Status VFileScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(VScanner::open(state));
RETURN_IF_ERROR(_init_expr_ctxes());
return Status::OK();
}
// For query:
// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x - x
// get_next_block x x x - - - x
// _cast_to_input_block - - - - - - -
// _fill_columns_from_path - - - - x - x
// _fill_missing_columns - - - x - - x
// _convert_to_output_block - - - - - - -
//
// For load:
// [exist cols] [non-exist cols] [col from path] input output
// A B C D E
// _init_src_block x x x x x x -
// get_next_block x x x - - x -
// _cast_to_input_block x x x - - x -
// _fill_columns_from_path - - - - x x -
// _fill_missing_columns - - - x - x -
// _convert_to_output_block - - - - - - x
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
do {
if (_cur_reader == nullptr || _cur_reader_eof) {
RETURN_IF_ERROR(_get_next_reader());
}
if (_scanner_eof) {
*eof = true;
return Status::OK();
}
// Init src block for load job based on the data file schema (e.g. parquet)
// For query job, simply set _src_block_ptr to block.
size_t read_rows = 0;
RETURN_IF_ERROR(_init_src_block(block));
{
SCOPED_TIMER(_get_block_timer);
// Read next block.
// Some of column in block may not be filled (column not exist in file)
RETURN_IF_ERROR(
_cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
}
// use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
// may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
if (read_rows > 0) {
// Convert the src block columns type to string in-place.
RETURN_IF_ERROR(_cast_to_input_block(block));
// FileReader can fill partition and missing columns itself
if (!_cur_reader->fill_all_columns()) {
// Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
// Fill columns not exist in file with null or default value
RETURN_IF_ERROR(_fill_missing_columns(read_rows));
}
// Apply _pre_conjunct_ctx_ptr to filter src block.
RETURN_IF_ERROR(_pre_filter_src_block());
// Convert src block to output block (dest block), string to dest data type and apply filters.
RETURN_IF_ERROR(_convert_to_output_block(block));
break;
}
} while (true);
// Update filtered rows and unselected rows for load, reset counter.
// {
// state->update_num_rows_load_filtered(_counter.num_rows_filtered);
// state->update_num_rows_load_unselected(_counter.num_rows_unselected);
// _reset_counter();
// }
return Status::OK();
}
Status VFileScanner::_init_src_block(Block* block) {
if (!_is_load) {
_src_block_ptr = block;
return Status::OK();
}
// if (_src_block_init) {
// _src_block.clear_column_data();
// _src_block_ptr = &_src_block;
// return Status::OK();
// }
_src_block.clear();
size_t idx = 0;
// slots in _input_tuple_desc contains all slots describe in load statement, eg:
// -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
// _input_tuple_desc will contains: k1, k2, tmp1
// and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
// _input_tuple_desc also contains columns from path
for (auto& slot : _input_tuple_desc->slots()) {
DataTypePtr data_type;
auto it = _name_to_col_type.find(slot->col_name());
if (it == _name_to_col_type.end() || _is_dynamic_schema) {
// not exist in file, using type from _input_tuple_desc
data_type =
DataTypeFactory::instance().create_data_type(slot->type(), slot->is_nullable());
} else {
data_type = DataTypeFactory::instance().create_data_type(it->second, true);
}
if (data_type == nullptr) {
return Status::NotSupported("Not support data type {} for column {}",
it == _name_to_col_type.end() ? slot->type().debug_string()
: it->second.debug_string(),
slot->col_name());
}
MutableColumnPtr data_column = data_type->create_column();
_src_block.insert(
ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
_src_block_name_to_idx.emplace(slot->col_name(), idx++);
}
_src_block_ptr = &_src_block;
_src_block_init = true;
return Status::OK();
}
Status VFileScanner::_cast_to_input_block(Block* block) {
if (!_is_load) {
return Status::OK();
}
if (_is_dynamic_schema) {
return Status::OK();
}
SCOPED_TIMER(_cast_to_input_block_timer);
// cast primitive type(PT0) to primitive type(PT1)
size_t idx = 0;
for (auto& slot_desc : _input_tuple_desc->slots()) {
if (_name_to_col_type.find(slot_desc->col_name()) == _name_to_col_type.end()) {
// skip columns which does not exist in file
continue;
}
if (slot_desc->type().is_variant_type()) {
// skip variant type
continue;
}
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
// remove nullable here, let the get_function decide whether nullable
auto return_type = slot_desc->get_data_type_ptr();
ColumnsWithTypeAndName arguments {
arg,
{DataTypeString().create_column_const(
arg.column->size(), remove_nullable(return_type)->get_family_name()),
std::make_shared<DataTypeString>(), ""}};
auto func_cast =
SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type);
idx = _src_block_name_to_idx[slot_desc->col_name()];
RETURN_IF_ERROR(
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
}
return Status::OK();
}
Status VFileScanner::_fill_columns_from_path(size_t rows) {
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
SCOPED_TIMER(_fill_path_columns_timer);
for (const auto& slot_desc : _partition_slot_descs) {
if (slot_desc == nullptr) continue;
auto it = _partition_slot_index_map.find(slot_desc->id());
if (it == std::end(_partition_slot_index_map)) {
std::stringstream ss;
ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id();
return Status::InternalError(ss.str());
}
const std::string& column_from_path = range.columns_from_path[it->second];
auto doris_column = _src_block_ptr->get_by_name(slot_desc->col_name()).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
if (!_text_converter->write_vec_column(slot_desc, col_ptr,
const_cast<char*>(column_from_path.c_str()),
column_from_path.size(), true, false, rows)) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), column_from_path);
}
}
}
return Status::OK();
}
Status VFileScanner::_fill_missing_columns(size_t rows) {
if (_missing_cols.empty()) {
return Status::OK();
}
SCOPED_TIMER(_fill_missing_columns_timer);
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
continue;
}
auto it = _col_default_value_ctx.find(slot_desc->col_name());
if (it == _col_default_value_ctx.end()) {
return Status::InternalError("failed to find default value expr for slot: {}",
slot_desc->col_name());
}
if (it->second == nullptr) {
// no default column, fill with null
auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(_src_block_ptr->get_by_name(slot_desc->col_name()).column))
.mutate()
.get());
nullable_column->insert_many_defaults(rows);
} else {
// fill with default value
auto* ctx = it->second;
auto origin_column_num = _src_block_ptr->columns();
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
bool is_origin_column = result_column_id < origin_column_num;
if (!is_origin_column) {
// call resize because the first column of _src_block_ptr may not be filled by reader,
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
// has only one row.
std::move(*_src_block_ptr->get_by_position(result_column_id).column)
.mutate()
->resize(rows);
auto result_column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
// result_column_ptr maybe a ColumnConst, convert it to a normal column
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
auto origin_column_type = _src_block_ptr->get_by_name(slot_desc->col_name()).type;
bool is_nullable = origin_column_type->is_nullable();
_src_block_ptr->replace_by_position(
_src_block_ptr->get_position_by_name(slot_desc->col_name()),
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
_src_block_ptr->erase(result_column_id);
}
}
}
return Status::OK();
}
Status VFileScanner::_pre_filter_src_block() {
if (!_is_load) {
return Status::OK();
}
if (_pre_conjunct_ctx_ptr) {
SCOPED_TIMER(_pre_filter_timer);
auto origin_column_num = _src_block_ptr->columns();
auto old_rows = _src_block_ptr->rows();
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr,
_src_block_ptr, origin_column_num));
_counter.num_rows_unselected += old_rows - _src_block.rows();
}
return Status::OK();
}
Status VFileScanner::_convert_to_output_block(Block* block) {
if (!_is_load) {
return Status::OK();
}
SCOPED_TIMER(_convert_to_output_block_timer);
// The block is passed from scanner context's free blocks,
// which is initialized by src columns.
// But for load job, the block should be filled with dest columns.
// So need to clear it first.
block->clear();
int ctx_idx = 0;
size_t rows = _src_block.rows();
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
auto& filter_map = filter_column->get_data();
for (auto slot_desc : _output_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx++;
vectorized::ColumnPtr column_ptr;
auto* ctx = _dest_vexpr_ctx[dest_index];
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
column_ptr = _src_block.get_by_position(result_column_id).column;
// column_ptr maybe a ColumnConst, convert it to a normal column
column_ptr = column_ptr->convert_to_full_column_if_const();
DCHECK(column_ptr != nullptr);
// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
// is likely to be nullable
if (LIKELY(column_ptr->is_nullable())) {
const ColumnNullable* nullable_column =
reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
for (int i = 0; i < rows; ++i) {
if (filter_map[i] && nullable_column->is_null_at(i)) {
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
!_src_block.get_by_position(_dest_slot_to_src_slot_index[dest_index])
.column->is_null_at(i)) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block.dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
auto raw_value =
_src_block.get_by_position(ctx_idx).column->get_data_at(
i);
std::string raw_string = raw_value.to_string();
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
"column({}) value is incorrect while strict "
"mode is {}, "
"src value is {}",
slot_desc->col_name(), _strict_mode, raw_string);
return fmt::to_string(error_msg);
},
&_scanner_eof));
filter_map[i] = false;
} else if (!slot_desc->is_nullable()) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block.dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
"column({}) values is null while columns is not "
"nullable",
slot_desc->col_name());
return fmt::to_string(error_msg);
},
&_scanner_eof));
filter_map[i] = false;
}
}
}
if (!slot_desc->is_nullable()) {
column_ptr = remove_nullable(column_ptr);
}
} else if (slot_desc->is_nullable()) {
column_ptr = make_nullable(column_ptr);
}
block->insert(dest_index, vectorized::ColumnWithTypeAndName(std::move(column_ptr),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
// after do the dest block insert operation, clear _src_block to remove the reference of origin column
_src_block.clear();
size_t dest_size = block->columns();
// do filter
block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column),
std::make_shared<vectorized::DataTypeUInt8>(),
"filter column"));
RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size));
_counter.num_rows_filtered += rows - block->rows();
return Status::OK();
}
Status VFileScanner::_get_next_reader() {
while (true) {
_cur_reader.reset(nullptr);
_src_block_init = false;
if (_next_range >= _ranges.size()) {
_scanner_eof = true;
return Status::OK();
}
const TFileRangeDesc& range = _ranges[_next_range++];
// create reader for specific format
// TODO: add json, avro
Status init_status;
// TODO: use data lake type
switch (_params.format_type) {
case TFileFormatType::FORMAT_PARQUET: {
ParquetReader* parquet_reader =
new ParquetReader(_profile, _params, range, _state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj()),
_io_ctx.get(), _state, _kv_cache);
RETURN_IF_ERROR(parquet_reader->open());
if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx != nullptr) {
RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, &_push_down_expr));
_discard_conjuncts();
}
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
IcebergTableReader* iceberg_reader =
new IcebergTableReader((GenericReader*)parquet_reader, _profile, _state,
_params, range, _kv_cache, _io_ctx.get());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range, _push_down_expr,
_real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader.reset((GenericReader*)iceberg_reader);
} else {
std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_file_col_names, place_holder, _colname_to_value_range, _push_down_expr,
_real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
_cur_reader.reset((GenericReader*)parquet_reader);
}
break;
}
case TFileFormatType::FORMAT_ORC: {
_cur_reader.reset(new OrcReader(_profile, _params, range, _file_col_names,
_state->query_options().batch_size, _state->timezone(),
_io_ctx.get()));
init_status = ((OrcReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range);
break;
}
case TFileFormatType::FORMAT_CSV_PLAIN:
case TFileFormatType::FORMAT_CSV_GZ:
case TFileFormatType::FORMAT_CSV_BZ2:
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
case TFileFormatType::FORMAT_PROTO: {
_cur_reader.reset(new CsvReader(_state, _profile, &_counter, _params, range,
_file_slot_descs, _io_ctx.get()));
init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
break;
}
case TFileFormatType::FORMAT_JSON: {
_cur_reader.reset(new NewJsonReader(_state, _profile, &_counter, _params, range,
_file_slot_descs, &_scanner_eof, _io_ctx.get(),
_is_dynamic_schema));
init_status = ((NewJsonReader*)(_cur_reader.get()))->init_reader();
break;
}
default:
return Status::InternalError("Not supported file format: {}", _params.format_type);
}
if (init_status.is<END_OF_FILE>()) {
COUNTER_UPDATE(_empty_file_counter, 1);
continue;
} else if (!init_status.ok()) {
if (init_status.is<ErrorCode::NOT_FOUND>()) {
COUNTER_UPDATE(_empty_file_counter, 1);
LOG(INFO) << "failed to find file: " << range.path;
return init_status;
}
return Status::InternalError("failed to init reader for file {}, err: {}", range.path,
init_status.to_string());
}
_name_to_col_type.clear();
_missing_cols.clear();
_cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
RETURN_IF_ERROR(_generate_fill_columns());
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
fmt::format_to(col_buf, " {}", col);
}
VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
range.path);
}
_cur_reader_eof = false;
break;
}
return Status::OK();
}
Status VFileScanner::_generate_fill_columns() {
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
partition_columns;
std::unordered_map<std::string, VExprContext*> missing_columns;
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
if (slot_desc) {
auto it = _partition_slot_index_map.find(slot_desc->id());
if (it == std::end(_partition_slot_index_map)) {
return Status::InternalError("Unknown source slot descriptor, slot_id={}",
slot_desc->id());
}
const std::string& column_from_path = range.columns_from_path[it->second];
partition_columns.emplace(slot_desc->col_name(),
std::make_tuple(column_from_path, slot_desc));
}
}
}
if (!_missing_cols.empty()) {
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
continue;
}
auto it = _col_default_value_ctx.find(slot_desc->col_name());
if (it == _col_default_value_ctx.end()) {
return Status::InternalError("failed to find default value expr for slot: {}",
slot_desc->col_name());
}
missing_columns.emplace(slot_desc->col_name(), it->second);
}
}
return _cur_reader->set_fill_columns(partition_columns, missing_columns);
}
Status VFileScanner::_init_expr_ctxes() {
DCHECK(!_ranges.empty());
std::map<SlotId, int> full_src_index_map;
std::map<SlotId, SlotDescriptor*> full_src_slot_map;
std::map<std::string, int> partition_name_to_key_index_map;
int index = 0;
for (const auto& slot_desc : _real_tuple_desc->slots()) {
full_src_slot_map.emplace(slot_desc->id(), slot_desc);
full_src_index_map.emplace(slot_desc->id(), index++);
}
// For external table query, find the index of column in path.
// Because query doesn't always search for all columns in a table
// and the order of selected columns is random.
// All ranges in _ranges vector should have identical columns_from_path_keys
// because they are all file splits for the same external table.
// So here use the first element of _ranges to fill the partition_name_to_key_index_map
if (_ranges[0].__isset.columns_from_path_keys) {
std::vector<std::string> key_map = _ranges[0].columns_from_path_keys;
if (!key_map.empty()) {
for (size_t i = 0; i < key_map.size(); i++) {
partition_name_to_key_index_map.emplace(key_map[i], i);
}
}
}
_num_of_columns_from_file = _params.num_of_columns_from_file;
for (const auto& slot_info : _params.required_slots) {
auto slot_id = slot_info.slot_id;
auto it = full_src_slot_map.find(slot_id);
if (it == std::end(full_src_slot_map)) {
std::stringstream ss;
ss << "Unknown source slot descriptor, slot_id=" << slot_id;
return Status::InternalError(ss.str());
}
if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
_file_col_names.push_back(it->second->col_name());
if (it->second->col_unique_id() > 0) {
_col_id_name_map.emplace(it->second->col_unique_id(), it->second->col_name());
}
} else {
_partition_slot_descs.emplace_back(it->second);
if (_is_load) {
auto iti = full_src_index_map.find(slot_id);
_partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
} else {
auto kit = partition_name_to_key_index_map.find(it->second->col_name());
_partition_slot_index_map.emplace(slot_id, kit->second);
}
}
}
// set column name to default value expr map
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
vectorized::VExprContext* ctx = nullptr;
auto it = _params.default_value_of_src_slot.find(slot_desc->id());
if (it != std::end(_params.default_value_of_src_slot)) {
if (!it->second.nodes.empty()) {
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
}
// if expr is empty, the default value will be null
_col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
}
}
if (_is_load) {
// follow desc expr map is only for load task.
bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans;
int idx = 0;
for (auto slot_desc : _output_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
auto it = _params.expr_of_dest_slot.find(slot_desc->id());
if (it == std::end(_params.expr_of_dest_slot)) {
return Status::InternalError("No expr for dest slot, id={}, name={}",
slot_desc->id(), slot_desc->col_name());
}
vectorized::VExprContext* ctx = nullptr;
if (!it->second.nodes.empty()) {
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
RETURN_IF_ERROR(ctx->open(_state));
}
_dest_vexpr_ctx.emplace_back(ctx);
_dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
if (has_slot_id_map) {
auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) {
_src_slot_descs_order_by_dest.emplace_back(nullptr);
} else {
auto _src_slot_it = full_src_slot_map.find(it1->second);
if (_src_slot_it == std::end(full_src_slot_map)) {
return Status::InternalError("No src slot {} in src slot descs",
it1->second);
}
_dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
full_src_index_map[_src_slot_it->first]);
_src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
}
}
}
}
// If last slot is_variant from stream plan which indicate table is dynamic schema
_is_dynamic_schema =
_output_tuple_desc && _output_tuple_desc->slots().back()->type().is_variant_type();
// TODO: It should can move to scan node to process.
if (_vconjunct_ctx && _vconjunct_ctx->root()) {
_split_conjuncts(_vconjunct_ctx->root());
}
return Status::OK();
}
Status VFileScanner::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
for (auto ctx : _dest_vexpr_ctx) {
if (ctx != nullptr) {
ctx->close(state);
}
}
for (auto it : _col_default_value_ctx) {
if (it.second != nullptr) {
it.second->close(state);
}
}
if (_pre_conjunct_ctx_ptr) {
(*_pre_conjunct_ctx_ptr)->close(state);
}
if (_push_down_expr) {
_push_down_expr->close(state);
}
for (auto& [k, v] : _slot_id_to_filter_conjuncts) {
for (auto& ctx : v) {
if (ctx != nullptr) {
ctx->close(state);
}
}
}
for (auto* ctx : _not_single_slot_filter_conjuncts) {
if (ctx != nullptr) {
ctx->close(state);
}
}
if (config::enable_file_cache) {
io::FileCacheProfileReporter cache_profile(_profile);
cache_profile.update(_file_cache_statistics.get());
}
RETURN_IF_ERROR(VScanner::close(state));
return Status::OK();
}
} // namespace doris::vectorized