The file list is got from external meta cache, and the file may already be removed from storage. We should ignore not found files and that query continue.
1224 lines
58 KiB
C++
1224 lines
58 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "vec/exec/scan/vfile_scanner.h"
|
|
|
|
#include <fmt/format.h>
|
|
#include <gen_cpp/Exprs_types.h>
|
|
#include <gen_cpp/Metrics_types.h>
|
|
#include <gen_cpp/PaloInternalService_types.h>
|
|
#include <gen_cpp/PlanNodes_types.h>
|
|
|
|
#include <algorithm>
|
|
#include <boost/iterator/iterator_facade.hpp>
|
|
#include <iterator>
|
|
#include <map>
|
|
#include <ostream>
|
|
#include <tuple>
|
|
#include <utility>
|
|
|
|
#include "common/compiler_util.h" // IWYU pragma: keep
|
|
#include "common/config.h"
|
|
#include "common/logging.h"
|
|
#include "common/object_pool.h"
|
|
#include "io/cache/block/block_file_cache_profile.h"
|
|
#include "runtime/descriptors.h"
|
|
#include "runtime/runtime_state.h"
|
|
#include "runtime/types.h"
|
|
#include "vec/aggregate_functions/aggregate_function.h"
|
|
#include "vec/columns/column.h"
|
|
#include "vec/columns/column_nullable.h"
|
|
#include "vec/columns/column_vector.h"
|
|
#include "vec/columns/columns_number.h"
|
|
#include "vec/common/string_ref.h"
|
|
#include "vec/core/column_with_type_and_name.h"
|
|
#include "vec/core/columns_with_type_and_name.h"
|
|
#include "vec/core/field.h"
|
|
#include "vec/data_types/data_type.h"
|
|
#include "vec/data_types/data_type_factory.hpp"
|
|
#include "vec/data_types/data_type_nullable.h"
|
|
#include "vec/data_types/data_type_number.h"
|
|
#include "vec/data_types/data_type_string.h"
|
|
#include "vec/exec/format/arrow/arrow_stream_reader.h"
|
|
#include "vec/exec/format/avro/avro_jni_reader.h"
|
|
#include "vec/exec/format/csv/csv_reader.h"
|
|
#include "vec/exec/format/json/new_json_reader.h"
|
|
#include "vec/exec/format/orc/vorc_reader.h"
|
|
#include "vec/exec/format/parquet/vparquet_reader.h"
|
|
#include "vec/exec/format/table/hudi_jni_reader.h"
|
|
#include "vec/exec/format/table/iceberg_reader.h"
|
|
#include "vec/exec/format/table/max_compute_jni_reader.h"
|
|
#include "vec/exec/format/table/paimon_jni_reader.h"
|
|
#include "vec/exec/format/table/paimon_reader.h"
|
|
#include "vec/exec/format/table/transactional_hive_reader.h"
|
|
#include "vec/exec/format/wal/wal_reader.h"
|
|
#include "vec/exec/scan/new_file_scan_node.h"
|
|
#include "vec/exec/scan/vscan_node.h"
|
|
#include "vec/exprs/vexpr.h"
|
|
#include "vec/exprs/vexpr_context.h"
|
|
#include "vec/exprs/vslot_ref.h"
|
|
#include "vec/functions/function.h"
|
|
#include "vec/functions/function_string.h"
|
|
#include "vec/functions/simple_function_factory.h"
|
|
|
|
namespace cctz {
|
|
class time_zone;
|
|
} // namespace cctz
|
|
namespace doris {
|
|
namespace vectorized {
|
|
class ShardedKVCache;
|
|
} // namespace vectorized
|
|
} // namespace doris
|
|
|
|
namespace doris::vectorized {
|
|
using namespace ErrorCode;
|
|
|
|
VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
|
|
std::shared_ptr<vectorized::SplitSourceConnector> split_source,
|
|
RuntimeProfile* profile, ShardedKVCache* kv_cache)
|
|
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
|
|
_split_source(split_source),
|
|
_cur_reader(nullptr),
|
|
_cur_reader_eof(false),
|
|
_kv_cache(kv_cache),
|
|
_strict_mode(false) {
|
|
if (state->get_query_ctx() != nullptr &&
|
|
state->get_query_ctx()->file_scan_range_params_map.count(parent->id()) > 0) {
|
|
_params = &(state->get_query_ctx()->file_scan_range_params_map[parent->id()]);
|
|
} else {
|
|
// old fe thrift protocol
|
|
_params = _split_source->get_params();
|
|
}
|
|
if (_params->__isset.strict_mode) {
|
|
_strict_mode = _params->strict_mode;
|
|
}
|
|
|
|
// For load scanner, there are input and output tuple.
|
|
// For query scanner, there is only output tuple
|
|
_input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
|
|
_real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
|
|
_is_load = (_input_tuple_desc != nullptr);
|
|
}
|
|
|
|
VFileScanner::VFileScanner(RuntimeState* state, pipeline::FileScanLocalState* local_state,
|
|
int64_t limit,
|
|
std::shared_ptr<vectorized::SplitSourceConnector> split_source,
|
|
RuntimeProfile* profile, ShardedKVCache* kv_cache)
|
|
: VScanner(state, local_state, limit, profile),
|
|
_split_source(split_source),
|
|
_cur_reader(nullptr),
|
|
_cur_reader_eof(false),
|
|
_kv_cache(kv_cache),
|
|
_strict_mode(false) {
|
|
if (state->get_query_ctx() != nullptr &&
|
|
state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
|
|
_params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
|
|
} else {
|
|
// old fe thrift protocol
|
|
_params = _split_source->get_params();
|
|
}
|
|
if (_params->__isset.strict_mode) {
|
|
_strict_mode = _params->strict_mode;
|
|
}
|
|
|
|
// For load scanner, there are input and output tuple.
|
|
// For query scanner, there is only output tuple
|
|
_input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
|
|
_real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
|
|
_is_load = (_input_tuple_desc != nullptr);
|
|
}
|
|
|
|
Status VFileScanner::prepare(
|
|
const VExprContextSPtrs& conjuncts,
|
|
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
|
const std::unordered_map<std::string, int>* colname_to_slot_id) {
|
|
RETURN_IF_ERROR(VScanner::prepare(_state, conjuncts));
|
|
_colname_to_value_range = colname_to_value_range;
|
|
_col_name_to_slot_id = colname_to_slot_id;
|
|
if (get_parent() != nullptr) {
|
|
_get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime");
|
|
_open_reader_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerOpenReaderTime");
|
|
_cast_to_input_block_timer =
|
|
ADD_TIMER(_parent->_scanner_profile, "FileScannerCastInputBlockTime");
|
|
_fill_path_columns_timer =
|
|
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillPathColumnTime");
|
|
_fill_missing_columns_timer =
|
|
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillMissingColumnTime");
|
|
_pre_filter_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerPreFilterTimer");
|
|
_convert_to_output_block_timer =
|
|
ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
|
|
_empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT);
|
|
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
|
|
_has_fully_rf_file_counter =
|
|
ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT);
|
|
} else {
|
|
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime");
|
|
_open_reader_timer =
|
|
ADD_TIMER(_local_state->scanner_profile(), "FileScannerOpenReaderTime");
|
|
_cast_to_input_block_timer =
|
|
ADD_TIMER(_local_state->scanner_profile(), "FileScannerCastInputBlockTime");
|
|
_fill_path_columns_timer =
|
|
ADD_TIMER(_local_state->scanner_profile(), "FileScannerFillPathColumnTime");
|
|
_fill_missing_columns_timer =
|
|
ADD_TIMER(_local_state->scanner_profile(), "FileScannerFillMissingColumnTime");
|
|
_pre_filter_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerPreFilterTimer");
|
|
_convert_to_output_block_timer =
|
|
ADD_TIMER(_local_state->scanner_profile(), "FileScannerConvertOuputBlockTime");
|
|
_empty_file_counter =
|
|
ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT);
|
|
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
|
|
_has_fully_rf_file_counter =
|
|
ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT);
|
|
}
|
|
|
|
_file_cache_statistics.reset(new io::FileCacheStatistics());
|
|
_io_ctx.reset(new io::IOContext());
|
|
_io_ctx->file_cache_stats = _file_cache_statistics.get();
|
|
_io_ctx->query_id = &_state->query_id();
|
|
|
|
if (_is_load) {
|
|
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
|
|
std::vector<TupleId>({_input_tuple_desc->id()}),
|
|
std::vector<bool>({false})));
|
|
// prepare pre filters
|
|
if (_params->__isset.pre_filter_exprs_list) {
|
|
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_trees(
|
|
_params->pre_filter_exprs_list, _pre_conjunct_ctxs));
|
|
} else if (_params->__isset.pre_filter_exprs) {
|
|
VExprContextSPtr context;
|
|
RETURN_IF_ERROR(
|
|
doris::vectorized::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
|
|
_pre_conjunct_ctxs.emplace_back(context);
|
|
}
|
|
|
|
for (auto& conjunct : _pre_conjunct_ctxs) {
|
|
RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
|
|
RETURN_IF_ERROR(conjunct->open(_state));
|
|
}
|
|
|
|
_dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
|
|
std::vector<TupleId>({_output_tuple_desc->id()}),
|
|
std::vector<bool>({false})));
|
|
}
|
|
|
|
_default_val_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
|
|
std::vector<TupleId>({_real_tuple_desc->id()}),
|
|
std::vector<bool>({false})));
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_process_conjuncts_for_dict_filter() {
|
|
_slot_id_to_filter_conjuncts.clear();
|
|
_not_single_slot_filter_conjuncts.clear();
|
|
for (auto& conjunct : _push_down_conjuncts) {
|
|
auto impl = conjunct->root()->get_impl();
|
|
// If impl is not null, which means this a conjuncts from runtime filter.
|
|
auto cur_expr = impl ? impl : conjunct->root();
|
|
|
|
std::vector<int> slot_ids;
|
|
_get_slot_ids(cur_expr.get(), &slot_ids);
|
|
if (slot_ids.size() == 0) {
|
|
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
|
|
return Status::OK();
|
|
}
|
|
bool single_slot = true;
|
|
for (int i = 1; i < slot_ids.size(); i++) {
|
|
if (slot_ids[i] != slot_ids[0]) {
|
|
single_slot = false;
|
|
break;
|
|
}
|
|
}
|
|
if (single_slot) {
|
|
SlotId slot_id = slot_ids[0];
|
|
_slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
|
|
} else {
|
|
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_process_late_arrival_conjuncts() {
|
|
if (_push_down_conjuncts.size() < _conjuncts.size()) {
|
|
_push_down_conjuncts.clear();
|
|
_push_down_conjuncts.resize(_conjuncts.size());
|
|
for (size_t i = 0; i != _conjuncts.size(); ++i) {
|
|
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
|
|
}
|
|
RETURN_IF_ERROR(_process_conjuncts_for_dict_filter());
|
|
_discard_conjuncts();
|
|
}
|
|
if (_applied_rf_num == _total_rf_num) {
|
|
COUNTER_UPDATE(_has_fully_rf_file_counter, 1);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
|
|
for (auto& child_expr : expr->children()) {
|
|
if (child_expr->is_slot_ref()) {
|
|
VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
|
|
slot_ids->emplace_back(slot_ref->slot_id());
|
|
}
|
|
_get_slot_ids(child_expr.get(), slot_ids);
|
|
}
|
|
}
|
|
|
|
Status VFileScanner::open(RuntimeState* state) {
|
|
RETURN_IF_CANCELLED(state);
|
|
RETURN_IF_ERROR(VScanner::open(state));
|
|
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
|
|
if (_first_scan_range) {
|
|
RETURN_IF_ERROR(_init_expr_ctxes());
|
|
} else {
|
|
// there's no scan range in split source. stop scanner directly.
|
|
_scanner_eof = true;
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
|
|
Status st = _get_block_wrapped(state, block, eof);
|
|
if (!st.ok()) {
|
|
// add cur path in error msg for easy debugging
|
|
return std::move(st.prepend("cur path: " + get_current_scan_range_name() + ". "));
|
|
}
|
|
return st;
|
|
}
|
|
|
|
// For query:
|
|
// [exist cols] [non-exist cols] [col from path] input output
|
|
// A B C D E
|
|
// _init_src_block x x x x x - x
|
|
// get_next_block x x x - - - x
|
|
// _cast_to_input_block - - - - - - -
|
|
// _fill_columns_from_path - - - - x - x
|
|
// _fill_missing_columns - - - x - - x
|
|
// _convert_to_output_block - - - - - - -
|
|
//
|
|
// For load:
|
|
// [exist cols] [non-exist cols] [col from path] input output
|
|
// A B C D E
|
|
// _init_src_block x x x x x x -
|
|
// get_next_block x x x - - x -
|
|
// _cast_to_input_block x x x - - x -
|
|
// _fill_columns_from_path - - - - x x -
|
|
// _fill_missing_columns - - - x - x -
|
|
// _convert_to_output_block - - - - - - x
|
|
Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
|
|
do {
|
|
RETURN_IF_CANCELLED(state);
|
|
if (_cur_reader == nullptr || _cur_reader_eof) {
|
|
// The file may not exist because the file list is got from meta cache,
|
|
// And the file may already be removed from storage.
|
|
// Just ignore not found files.
|
|
Status st = _get_next_reader();
|
|
if (st.is<ErrorCode::NOT_FOUND>()) {
|
|
_cur_reader_eof = true;
|
|
COUNTER_UPDATE(_empty_file_counter, 1);
|
|
continue;
|
|
} else if (!st) {
|
|
return st;
|
|
}
|
|
}
|
|
|
|
if (_scanner_eof) {
|
|
*eof = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
// Init src block for load job based on the data file schema (e.g. parquet)
|
|
// For query job, simply set _src_block_ptr to block.
|
|
size_t read_rows = 0;
|
|
RETURN_IF_ERROR(_init_src_block(block));
|
|
{
|
|
SCOPED_TIMER(_get_block_timer);
|
|
|
|
// Read next block.
|
|
// Some of column in block may not be filled (column not exist in file)
|
|
RETURN_IF_ERROR(
|
|
_cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
|
|
}
|
|
// use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
|
|
// may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
|
|
if (read_rows > 0) {
|
|
// If the push_down_agg_type is COUNT, no need to do the rest,
|
|
// because we only save a number in block.
|
|
if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
|
|
// Convert the src block columns type to string in-place.
|
|
RETURN_IF_ERROR(_cast_to_input_block(block));
|
|
// FileReader can fill partition and missing columns itself
|
|
if (!_cur_reader->fill_all_columns()) {
|
|
// Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
|
|
RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
|
|
// Fill columns not exist in file with null or default value
|
|
RETURN_IF_ERROR(_fill_missing_columns(read_rows));
|
|
}
|
|
// Apply _pre_conjunct_ctxs to filter src block.
|
|
RETURN_IF_ERROR(_pre_filter_src_block());
|
|
// Convert src block to output block (dest block), string to dest data type and apply filters.
|
|
RETURN_IF_ERROR(_convert_to_output_block(block));
|
|
// Truncate char columns or varchar columns if size is smaller than file columns
|
|
// or not found in the file column schema.
|
|
RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
|
|
}
|
|
break;
|
|
}
|
|
} while (true);
|
|
|
|
// Update filtered rows and unselected rows for load, reset counter.
|
|
// {
|
|
// state->update_num_rows_load_filtered(_counter.num_rows_filtered);
|
|
// state->update_num_rows_load_unselected(_counter.num_rows_unselected);
|
|
// _reset_counter();
|
|
// }
|
|
return Status::OK();
|
|
}
|
|
|
|
/**
|
|
* Check whether there are complex types in parquet/orc reader in broker/stream load.
|
|
* Broker/stream load will cast any type as string type, and complex types will be casted wrong.
|
|
* This is a temporary method, and will be replaced by tvf.
|
|
*/
|
|
Status VFileScanner::_check_output_block_types() {
|
|
if (_is_load) {
|
|
TFileFormatType::type format_type = _params->format_type;
|
|
if (format_type == TFileFormatType::FORMAT_PARQUET ||
|
|
format_type == TFileFormatType::FORMAT_ORC) {
|
|
for (auto slot : _output_tuple_desc->slots()) {
|
|
if (slot->type().is_complex_type()) {
|
|
return Status::InternalError(
|
|
"Parquet/orc doesn't support complex types in broker/stream load, "
|
|
"please use tvf(table value function) to insert complex types.");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_init_src_block(Block* block) {
|
|
if (!_is_load) {
|
|
_src_block_ptr = block;
|
|
return Status::OK();
|
|
}
|
|
RETURN_IF_ERROR(_check_output_block_types());
|
|
|
|
// if (_src_block_init) {
|
|
// _src_block.clear_column_data();
|
|
// _src_block_ptr = &_src_block;
|
|
// return Status::OK();
|
|
// }
|
|
|
|
_src_block.clear();
|
|
size_t idx = 0;
|
|
// slots in _input_tuple_desc contains all slots describe in load statement, eg:
|
|
// -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
|
|
// _input_tuple_desc will contains: k1, k2, tmp1
|
|
// and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
|
|
// _input_tuple_desc also contains columns from path
|
|
for (auto& slot : _input_tuple_desc->slots()) {
|
|
DataTypePtr data_type;
|
|
auto it = _name_to_col_type.find(slot->col_name());
|
|
if (it == _name_to_col_type.end()) {
|
|
// not exist in file, using type from _input_tuple_desc
|
|
RETURN_IF_CATCH_EXCEPTION(data_type = DataTypeFactory::instance().create_data_type(
|
|
slot->type(), slot->is_nullable()));
|
|
} else {
|
|
RETURN_IF_CATCH_EXCEPTION(
|
|
data_type = DataTypeFactory::instance().create_data_type(it->second, true));
|
|
}
|
|
MutableColumnPtr data_column = data_type->create_column();
|
|
_src_block.insert(
|
|
ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
|
|
_src_block_name_to_idx.emplace(slot->col_name(), idx++);
|
|
}
|
|
_src_block_ptr = &_src_block;
|
|
_src_block_init = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_cast_to_input_block(Block* block) {
|
|
if (!_is_load) {
|
|
return Status::OK();
|
|
}
|
|
SCOPED_TIMER(_cast_to_input_block_timer);
|
|
// cast primitive type(PT0) to primitive type(PT1)
|
|
size_t idx = 0;
|
|
for (auto& slot_desc : _input_tuple_desc->slots()) {
|
|
if (_name_to_col_type.find(slot_desc->col_name()) == _name_to_col_type.end()) {
|
|
// skip columns which does not exist in file
|
|
continue;
|
|
}
|
|
if (slot_desc->type().is_variant_type()) {
|
|
// skip variant type
|
|
continue;
|
|
}
|
|
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
|
|
auto return_type = slot_desc->get_data_type_ptr();
|
|
// remove nullable here, let the get_function decide whether nullable
|
|
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(
|
|
remove_nullable(return_type)->get_type_as_type_descriptor());
|
|
ColumnsWithTypeAndName arguments {
|
|
arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
|
|
auto func_cast =
|
|
SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type);
|
|
idx = _src_block_name_to_idx[slot_desc->col_name()];
|
|
RETURN_IF_ERROR(
|
|
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
|
|
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_fill_columns_from_path(size_t rows) {
|
|
DataTypeSerDe::FormatOptions _text_formatOptions;
|
|
for (auto& kv : _partition_col_descs) {
|
|
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
|
|
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
|
|
auto& [value, slot_desc] = kv.second;
|
|
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
|
|
Slice slice(value.data(), value.size());
|
|
vector<Slice> slices(rows);
|
|
for (int i = 0; i < rows; i++) {
|
|
slices[i] = {value.data(), value.size()};
|
|
}
|
|
int num_deserialized = 0;
|
|
if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized,
|
|
_text_formatOptions) != Status::OK()) {
|
|
return Status::InternalError("Failed to fill partition column: {}={}",
|
|
slot_desc->col_name(), value);
|
|
}
|
|
if (num_deserialized != rows) {
|
|
return Status::InternalError(
|
|
"Failed to fill partition column: {}={} ."
|
|
"Number of rows expected to be written : {}, number of rows actually written : "
|
|
"{}",
|
|
slot_desc->col_name(), value, num_deserialized, rows);
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_fill_missing_columns(size_t rows) {
|
|
if (_missing_cols.empty()) {
|
|
return Status::OK();
|
|
}
|
|
|
|
SCOPED_TIMER(_fill_missing_columns_timer);
|
|
for (auto& kv : _missing_col_descs) {
|
|
if (kv.second == nullptr) {
|
|
// no default column, fill with null
|
|
auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
|
|
(*std::move(_src_block_ptr->get_by_name(kv.first).column)).mutate().get());
|
|
nullable_column->insert_many_defaults(rows);
|
|
} else {
|
|
// fill with default value
|
|
auto& ctx = kv.second;
|
|
auto origin_column_num = _src_block_ptr->columns();
|
|
int result_column_id = -1;
|
|
// PT1 => dest primitive type
|
|
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
|
|
bool is_origin_column = result_column_id < origin_column_num;
|
|
if (!is_origin_column) {
|
|
// call resize because the first column of _src_block_ptr may not be filled by reader,
|
|
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
|
|
// has only one row.
|
|
std::move(*_src_block_ptr->get_by_position(result_column_id).column)
|
|
.mutate()
|
|
->resize(rows);
|
|
auto result_column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
|
|
// result_column_ptr maybe a ColumnConst, convert it to a normal column
|
|
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
|
|
auto origin_column_type = _src_block_ptr->get_by_name(kv.first).type;
|
|
bool is_nullable = origin_column_type->is_nullable();
|
|
_src_block_ptr->replace_by_position(
|
|
_src_block_ptr->get_position_by_name(kv.first),
|
|
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
|
|
_src_block_ptr->erase(result_column_id);
|
|
}
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_pre_filter_src_block() {
|
|
if (!_is_load) {
|
|
return Status::OK();
|
|
}
|
|
if (!_pre_conjunct_ctxs.empty()) {
|
|
SCOPED_TIMER(_pre_filter_timer);
|
|
auto origin_column_num = _src_block_ptr->columns();
|
|
auto old_rows = _src_block_ptr->rows();
|
|
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr,
|
|
origin_column_num));
|
|
_counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_convert_to_output_block(Block* block) {
|
|
if (!_is_load) {
|
|
return Status::OK();
|
|
}
|
|
|
|
SCOPED_TIMER(_convert_to_output_block_timer);
|
|
// The block is passed from scanner context's free blocks,
|
|
// which is initialized by output columns
|
|
// so no need to clear it
|
|
// block->clear();
|
|
|
|
int ctx_idx = 0;
|
|
size_t rows = _src_block_ptr->rows();
|
|
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
|
|
auto& filter_map = filter_column->get_data();
|
|
|
|
// After convert, the column_ptr should be copied into output block.
|
|
// Can not use block->insert() because it may cause use_count() non-zero bug
|
|
MutableBlock mutable_output_block =
|
|
VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
|
|
auto& mutable_output_columns = mutable_output_block.mutable_columns();
|
|
|
|
// for (auto slot_desc : _output_tuple_desc->slots()) {
|
|
for (int i = 0; i < mutable_output_columns.size(); ++i) {
|
|
auto slot_desc = _output_tuple_desc->slots()[i];
|
|
if (!slot_desc->is_materialized()) {
|
|
continue;
|
|
}
|
|
int dest_index = ctx_idx;
|
|
vectorized::ColumnPtr column_ptr;
|
|
|
|
auto& ctx = _dest_vexpr_ctx[dest_index];
|
|
int result_column_id = -1;
|
|
// PT1 => dest primitive type
|
|
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
|
|
column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
|
|
// column_ptr maybe a ColumnConst, convert it to a normal column
|
|
column_ptr = column_ptr->convert_to_full_column_if_const();
|
|
DCHECK(column_ptr != nullptr);
|
|
|
|
// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
|
|
// is likely to be nullable
|
|
if (LIKELY(column_ptr->is_nullable())) {
|
|
const ColumnNullable* nullable_column =
|
|
reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
|
|
for (int i = 0; i < rows; ++i) {
|
|
if (filter_map[i] && nullable_column->is_null_at(i)) {
|
|
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
|
|
!_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index])
|
|
.column->is_null_at(i)) {
|
|
RETURN_IF_ERROR(_state->append_error_msg_to_file(
|
|
[&]() -> std::string {
|
|
return _src_block_ptr->dump_one_line(i,
|
|
_num_of_columns_from_file);
|
|
},
|
|
[&]() -> std::string {
|
|
auto raw_value = _src_block_ptr->get_by_position(ctx_idx)
|
|
.column->get_data_at(i);
|
|
std::string raw_string = raw_value.to_string();
|
|
fmt::memory_buffer error_msg;
|
|
fmt::format_to(error_msg,
|
|
"column({}) value is incorrect while strict "
|
|
"mode is {}, "
|
|
"src value is {}",
|
|
slot_desc->col_name(), _strict_mode, raw_string);
|
|
return fmt::to_string(error_msg);
|
|
},
|
|
&_scanner_eof));
|
|
filter_map[i] = false;
|
|
} else if (!slot_desc->is_nullable()) {
|
|
RETURN_IF_ERROR(_state->append_error_msg_to_file(
|
|
[&]() -> std::string {
|
|
return _src_block_ptr->dump_one_line(i,
|
|
_num_of_columns_from_file);
|
|
},
|
|
[&]() -> std::string {
|
|
fmt::memory_buffer error_msg;
|
|
fmt::format_to(error_msg,
|
|
"column({}) values is null while columns is not "
|
|
"nullable",
|
|
slot_desc->col_name());
|
|
return fmt::to_string(error_msg);
|
|
},
|
|
&_scanner_eof));
|
|
filter_map[i] = false;
|
|
}
|
|
}
|
|
}
|
|
if (!slot_desc->is_nullable()) {
|
|
column_ptr = remove_nullable(column_ptr);
|
|
}
|
|
} else if (slot_desc->is_nullable()) {
|
|
column_ptr = make_nullable(column_ptr);
|
|
}
|
|
mutable_output_columns[i]->insert_range_from(*column_ptr, 0, rows);
|
|
ctx_idx++;
|
|
}
|
|
|
|
// after do the dest block insert operation, clear _src_block to remove the reference of origin column
|
|
_src_block_ptr->clear();
|
|
|
|
size_t dest_size = block->columns();
|
|
// do filter
|
|
block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column),
|
|
std::make_shared<vectorized::DataTypeUInt8>(),
|
|
"filter column"));
|
|
RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size));
|
|
|
|
_counter.num_rows_filtered += rows - block->rows();
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_truncate_char_or_varchar_columns(Block* block) {
|
|
// Truncate char columns or varchar columns if size is smaller than file columns
|
|
// or not found in the file column schema.
|
|
if (!_state->query_options().truncate_char_or_varchar_columns) {
|
|
return Status::OK();
|
|
}
|
|
int idx = 0;
|
|
for (auto slot_desc : _real_tuple_desc->slots()) {
|
|
if (!slot_desc->is_materialized()) {
|
|
continue;
|
|
}
|
|
const TypeDescriptor& type_desc = slot_desc->type();
|
|
if (type_desc.type != TYPE_VARCHAR && type_desc.type != TYPE_CHAR) {
|
|
++idx;
|
|
continue;
|
|
}
|
|
auto iter = _source_file_col_name_types.find(slot_desc->col_name());
|
|
if (iter != _source_file_col_name_types.end()) {
|
|
const TypeDescriptor* file_type_desc =
|
|
_source_file_col_name_types[slot_desc->col_name()];
|
|
if ((type_desc.len > 0) &&
|
|
(type_desc.len < file_type_desc->len || file_type_desc->len < 0)) {
|
|
_truncate_char_or_varchar_column(block, idx, type_desc.len);
|
|
}
|
|
} else {
|
|
_truncate_char_or_varchar_column(block, idx, type_desc.len);
|
|
}
|
|
++idx;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
|
|
void VFileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
|
|
auto int_type = std::make_shared<DataTypeInt32>();
|
|
size_t num_columns_without_result = block->columns();
|
|
const ColumnNullable* col_nullable =
|
|
assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
|
|
const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
|
|
ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
|
|
block->replace_by_position(idx, std::move(string_column_ptr));
|
|
block->insert({int_type->create_column_const(block->rows(), to_field(1)), int_type,
|
|
"const 1"}); // pos is 1
|
|
block->insert({int_type->create_column_const(block->rows(), to_field(len)), int_type,
|
|
fmt::format("const {}", len)}); // len
|
|
block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
|
|
ColumnNumbers temp_arguments(3);
|
|
temp_arguments[0] = idx; // str column
|
|
temp_arguments[1] = num_columns_without_result; // pos
|
|
temp_arguments[2] = num_columns_without_result + 1; // len
|
|
size_t result_column_id = num_columns_without_result + 2;
|
|
|
|
SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
|
|
auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
|
|
null_map_column_ptr);
|
|
block->replace_by_position(idx, std::move(res));
|
|
Block::erase_useless_column(block, num_columns_without_result);
|
|
}
|
|
|
|
Status VFileScanner::_get_next_reader() {
|
|
while (true) {
|
|
if (_cur_reader) {
|
|
_cur_reader->collect_profile_before_close();
|
|
RETURN_IF_ERROR(_cur_reader->close());
|
|
_state->update_num_finished_scan_range(1);
|
|
}
|
|
_cur_reader.reset(nullptr);
|
|
_src_block_init = false;
|
|
bool has_next = _first_scan_range;
|
|
if (!_first_scan_range) {
|
|
RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
|
|
}
|
|
_first_scan_range = false;
|
|
if (!has_next || _should_stop) {
|
|
_scanner_eof = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
const TFileRangeDesc& range = _current_range;
|
|
_current_range_path = range.path;
|
|
|
|
// create reader for specific format
|
|
Status init_status;
|
|
TFileFormatType::type format_type = _params->format_type;
|
|
// JNI reader can only push down column value range
|
|
bool push_down_predicates =
|
|
!_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
|
|
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
|
|
if (range.table_format_params.table_format_type == "hudi" &&
|
|
range.table_format_params.hudi_params.delta_logs.empty()) {
|
|
// fall back to native reader if there is no log file
|
|
format_type = TFileFormatType::FORMAT_PARQUET;
|
|
} else if (range.table_format_params.table_format_type == "paimon" &&
|
|
!range.table_format_params.paimon_params.__isset.paimon_split) {
|
|
// use native reader
|
|
auto format = range.table_format_params.paimon_params.file_format;
|
|
if (format == "orc") {
|
|
format_type = TFileFormatType::FORMAT_ORC;
|
|
} else if (format == "parquet") {
|
|
format_type = TFileFormatType::FORMAT_PARQUET;
|
|
} else {
|
|
return Status::InternalError("Not supported paimon file format: {}", format);
|
|
}
|
|
}
|
|
}
|
|
bool need_to_get_parsed_schema = false;
|
|
switch (format_type) {
|
|
case TFileFormatType::FORMAT_JNI: {
|
|
if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "max_compute") {
|
|
const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
|
|
_real_tuple_desc->table_desc());
|
|
std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
|
|
mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
|
|
range, _state, _profile);
|
|
init_status = mc_reader->init_reader(_colname_to_value_range);
|
|
_cur_reader = std::move(mc_reader);
|
|
} else if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "paimon") {
|
|
_cur_reader =
|
|
PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range);
|
|
init_status = ((PaimonJniReader*)(_cur_reader.get()))
|
|
->init_reader(_colname_to_value_range);
|
|
} else if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "hudi") {
|
|
_cur_reader = HudiJniReader::create_unique(*_params,
|
|
range.table_format_params.hudi_params,
|
|
_file_slot_descs, _state, _profile);
|
|
init_status =
|
|
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
|
|
}
|
|
break;
|
|
}
|
|
case TFileFormatType::FORMAT_PARQUET: {
|
|
static const cctz::time_zone utc0 = cctz::utc_time_zone();
|
|
cctz::time_zone* tz;
|
|
if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "paimon") {
|
|
// The timestmap generated by paimon does not carry metadata information (e.g., isAdjustToUTC, etc.),
|
|
// and the stored data is UTC0 by default, so it is directly set to the UTC time zone.
|
|
// In version 0.7, paimon fixed this issue and can remove the judgment here
|
|
tz = const_cast<cctz::time_zone*>(&utc0);
|
|
} else {
|
|
tz = const_cast<cctz::time_zone*>(&_state->timezone_obj());
|
|
}
|
|
std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
|
|
_profile, *_params, range, _state->query_options().batch_size, tz,
|
|
_io_ctx.get(), _state,
|
|
_shoudl_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
|
|
: nullptr,
|
|
_state->query_options().enable_parquet_lazy_mat);
|
|
{
|
|
SCOPED_TIMER(_open_reader_timer);
|
|
RETURN_IF_ERROR(parquet_reader->open());
|
|
}
|
|
if (push_down_predicates) {
|
|
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
|
|
}
|
|
if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "iceberg") {
|
|
std::unique_ptr<IcebergParquetReader> iceberg_reader =
|
|
IcebergParquetReader::create_unique(std::move(parquet_reader), _profile,
|
|
_state, *_params, range, _kv_cache,
|
|
_io_ctx.get(), _get_push_down_count());
|
|
init_status = iceberg_reader->init_reader(
|
|
_file_col_names, _col_id_name_map, _colname_to_value_range,
|
|
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
|
|
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
|
|
&_slot_id_to_filter_conjuncts);
|
|
_cur_reader = std::move(iceberg_reader);
|
|
} else if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "paimon") {
|
|
std::vector<std::string> place_holder;
|
|
init_status = parquet_reader->init_reader(
|
|
_file_col_names, place_holder, _colname_to_value_range,
|
|
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
|
|
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
|
|
&_slot_id_to_filter_conjuncts);
|
|
std::unique_ptr<PaimonParquetReader> paimon_reader =
|
|
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
|
|
_state, *_params);
|
|
RETURN_IF_ERROR(paimon_reader->init_row_filters(range));
|
|
_cur_reader = std::move(paimon_reader);
|
|
} else {
|
|
std::vector<std::string> place_holder;
|
|
init_status = parquet_reader->init_reader(
|
|
_file_col_names, place_holder, _colname_to_value_range,
|
|
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
|
|
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
|
|
&_slot_id_to_filter_conjuncts);
|
|
_cur_reader = std::move(parquet_reader);
|
|
}
|
|
need_to_get_parsed_schema = true;
|
|
break;
|
|
}
|
|
case TFileFormatType::FORMAT_ORC: {
|
|
std::vector<orc::TypeKind>* unsupported_pushdown_types = nullptr;
|
|
if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "paimon") {
|
|
static std::vector<orc::TypeKind> paimon_unsupport_type =
|
|
std::vector<orc::TypeKind> {orc::TypeKind::CHAR};
|
|
unsupported_pushdown_types = &paimon_unsupport_type;
|
|
}
|
|
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
|
|
_profile, _state, *_params, range, _state->query_options().batch_size,
|
|
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat,
|
|
unsupported_pushdown_types);
|
|
if (push_down_predicates) {
|
|
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
|
|
}
|
|
if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "transactional_hive") {
|
|
std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
|
|
TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
|
|
_state, *_params, range,
|
|
_io_ctx.get());
|
|
init_status = tran_orc_reader->init_reader(
|
|
_file_col_names, _colname_to_value_range, _push_down_conjuncts,
|
|
_real_tuple_desc, _default_val_row_desc.get(),
|
|
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
|
|
RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range));
|
|
_cur_reader = std::move(tran_orc_reader);
|
|
} else if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "iceberg") {
|
|
std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
|
|
std::move(orc_reader), _profile, _state, *_params, range, _kv_cache,
|
|
_io_ctx.get(), _get_push_down_count());
|
|
|
|
init_status = iceberg_reader->init_reader(
|
|
_file_col_names, _col_id_name_map, _colname_to_value_range,
|
|
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
|
|
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
|
|
&_slot_id_to_filter_conjuncts);
|
|
_cur_reader = std::move(iceberg_reader);
|
|
} else if (range.__isset.table_format_params &&
|
|
range.table_format_params.table_format_type == "paimon") {
|
|
init_status = orc_reader->init_reader(
|
|
&_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,
|
|
_real_tuple_desc, _default_val_row_desc.get(),
|
|
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
|
|
std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
|
|
std::move(orc_reader), _profile, _state, *_params);
|
|
RETURN_IF_ERROR(paimon_reader->init_row_filters(range));
|
|
_cur_reader = std::move(paimon_reader);
|
|
} else {
|
|
init_status = orc_reader->init_reader(
|
|
&_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,
|
|
_real_tuple_desc, _default_val_row_desc.get(),
|
|
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
|
|
_cur_reader = std::move(orc_reader);
|
|
}
|
|
need_to_get_parsed_schema = true;
|
|
break;
|
|
}
|
|
case TFileFormatType::FORMAT_CSV_PLAIN:
|
|
case TFileFormatType::FORMAT_CSV_GZ:
|
|
case TFileFormatType::FORMAT_CSV_BZ2:
|
|
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
|
|
case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
|
|
case TFileFormatType::FORMAT_CSV_LZOP:
|
|
case TFileFormatType::FORMAT_CSV_DEFLATE:
|
|
case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
|
|
case TFileFormatType::FORMAT_PROTO: {
|
|
_cur_reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
|
|
_file_slot_descs, _io_ctx.get());
|
|
init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
|
|
break;
|
|
}
|
|
case TFileFormatType::FORMAT_JSON: {
|
|
_cur_reader =
|
|
NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
|
|
_file_slot_descs, &_scanner_eof, _io_ctx.get());
|
|
init_status =
|
|
((NewJsonReader*)(_cur_reader.get()))->init_reader(_col_default_value_ctx);
|
|
break;
|
|
}
|
|
case TFileFormatType::FORMAT_AVRO: {
|
|
_cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs,
|
|
range);
|
|
init_status = ((AvroJNIReader*)(_cur_reader.get()))
|
|
->init_fetch_table_reader(_colname_to_value_range);
|
|
break;
|
|
}
|
|
case TFileFormatType::FORMAT_WAL: {
|
|
_cur_reader.reset(new WalReader(_state));
|
|
init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
|
|
break;
|
|
}
|
|
case TFileFormatType::FORMAT_ARROW: {
|
|
_cur_reader = ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
|
|
range, _file_slot_descs, _io_ctx.get());
|
|
init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
|
|
break;
|
|
}
|
|
default:
|
|
return Status::InternalError("Not supported file format: {}", _params->format_type);
|
|
}
|
|
|
|
COUNTER_UPDATE(_file_counter, 1);
|
|
if (init_status.is<END_OF_FILE>() || init_status.is<ErrorCode::NOT_FOUND>()) {
|
|
// The VFileScanner for external table may try to open not exist files,
|
|
// Because FE file cache for external table may out of date.
|
|
// So, NOT_FOUND for VFileScanner is not a fail case.
|
|
// Will remove this after file reader refactor.
|
|
COUNTER_UPDATE(_empty_file_counter, 1);
|
|
continue;
|
|
} else if (!init_status.ok()) {
|
|
return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
|
|
}
|
|
|
|
_name_to_col_type.clear();
|
|
_missing_cols.clear();
|
|
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
|
|
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
|
|
RETURN_IF_ERROR(_generate_fill_columns());
|
|
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
|
|
fmt::memory_buffer col_buf;
|
|
for (auto& col : _missing_cols) {
|
|
fmt::format_to(col_buf, " {}", col);
|
|
}
|
|
VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
|
|
range.path);
|
|
}
|
|
|
|
_source_file_col_names.clear();
|
|
_source_file_col_types.clear();
|
|
_source_file_col_name_types.clear();
|
|
if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
|
|
Status status = _cur_reader->get_parsed_schema(&_source_file_col_names,
|
|
&_source_file_col_types);
|
|
if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
|
|
return status;
|
|
}
|
|
DCHECK(_source_file_col_names.size() == _source_file_col_types.size());
|
|
for (int i = 0; i < _source_file_col_names.size(); ++i) {
|
|
_source_file_col_name_types[_source_file_col_names[i]] = &_source_file_col_types[i];
|
|
}
|
|
}
|
|
_cur_reader_eof = false;
|
|
break;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::_generate_fill_columns() {
|
|
_partition_col_descs.clear();
|
|
_missing_col_descs.clear();
|
|
|
|
const TFileRangeDesc& range = _current_range;
|
|
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
|
|
for (const auto& slot_desc : _partition_slot_descs) {
|
|
if (slot_desc) {
|
|
auto it = _partition_slot_index_map.find(slot_desc->id());
|
|
if (it == std::end(_partition_slot_index_map)) {
|
|
return Status::InternalError("Unknown source slot descriptor, slot_id={}",
|
|
slot_desc->id());
|
|
}
|
|
const std::string& column_from_path = range.columns_from_path[it->second];
|
|
const char* data = column_from_path.c_str();
|
|
size_t size = column_from_path.size();
|
|
if (size == 4 && memcmp(data, "null", 4) == 0) {
|
|
data = const_cast<char*>("\\N");
|
|
}
|
|
_partition_col_descs.emplace(slot_desc->col_name(),
|
|
std::make_tuple(data, slot_desc));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!_missing_cols.empty()) {
|
|
for (auto slot_desc : _real_tuple_desc->slots()) {
|
|
if (!slot_desc->is_materialized()) {
|
|
continue;
|
|
}
|
|
if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
|
|
continue;
|
|
}
|
|
|
|
auto it = _col_default_value_ctx.find(slot_desc->col_name());
|
|
if (it == _col_default_value_ctx.end()) {
|
|
return Status::InternalError("failed to find default value expr for slot: {}",
|
|
slot_desc->col_name());
|
|
}
|
|
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
|
|
}
|
|
}
|
|
|
|
return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
|
|
}
|
|
|
|
Status VFileScanner::_init_expr_ctxes() {
|
|
std::map<SlotId, int> full_src_index_map;
|
|
std::map<SlotId, SlotDescriptor*> full_src_slot_map;
|
|
std::map<std::string, int> partition_name_to_key_index_map;
|
|
int index = 0;
|
|
for (const auto& slot_desc : _real_tuple_desc->slots()) {
|
|
full_src_slot_map.emplace(slot_desc->id(), slot_desc);
|
|
full_src_index_map.emplace(slot_desc->id(), index++);
|
|
}
|
|
|
|
// For external table query, find the index of column in path.
|
|
// Because query doesn't always search for all columns in a table
|
|
// and the order of selected columns is random.
|
|
// All ranges in _ranges vector should have identical columns_from_path_keys
|
|
// because they are all file splits for the same external table.
|
|
// So here use the first element of _ranges to fill the partition_name_to_key_index_map
|
|
if (_current_range.__isset.columns_from_path_keys) {
|
|
std::vector<std::string> key_map = _current_range.columns_from_path_keys;
|
|
if (!key_map.empty()) {
|
|
for (size_t i = 0; i < key_map.size(); i++) {
|
|
partition_name_to_key_index_map.emplace(key_map[i], i);
|
|
}
|
|
}
|
|
}
|
|
|
|
_num_of_columns_from_file = _params->num_of_columns_from_file;
|
|
for (const auto& slot_info : _params->required_slots) {
|
|
auto slot_id = slot_info.slot_id;
|
|
auto it = full_src_slot_map.find(slot_id);
|
|
if (it == std::end(full_src_slot_map)) {
|
|
return Status::InternalError(
|
|
fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
|
|
}
|
|
if (slot_info.is_file_slot) {
|
|
_file_slot_descs.emplace_back(it->second);
|
|
_file_col_names.push_back(it->second->col_name());
|
|
if (it->second->col_unique_id() > 0) {
|
|
_col_id_name_map.emplace(it->second->col_unique_id(), it->second->col_name());
|
|
}
|
|
} else {
|
|
_partition_slot_descs.emplace_back(it->second);
|
|
if (_is_load) {
|
|
auto iti = full_src_index_map.find(slot_id);
|
|
_partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
|
|
} else {
|
|
auto kit = partition_name_to_key_index_map.find(it->second->col_name());
|
|
_partition_slot_index_map.emplace(slot_id, kit->second);
|
|
}
|
|
}
|
|
}
|
|
|
|
// set column name to default value expr map
|
|
for (auto slot_desc : _real_tuple_desc->slots()) {
|
|
if (!slot_desc->is_materialized()) {
|
|
continue;
|
|
}
|
|
vectorized::VExprContextSPtr ctx;
|
|
auto it = _params->default_value_of_src_slot.find(slot_desc->id());
|
|
if (it != std::end(_params->default_value_of_src_slot)) {
|
|
if (!it->second.nodes.empty()) {
|
|
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx));
|
|
RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
|
|
RETURN_IF_ERROR(ctx->open(_state));
|
|
}
|
|
// if expr is empty, the default value will be null
|
|
_col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
|
|
}
|
|
}
|
|
|
|
if (_is_load) {
|
|
// follow desc expr map is only for load task.
|
|
bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
|
|
int idx = 0;
|
|
for (auto slot_desc : _output_tuple_desc->slots()) {
|
|
if (!slot_desc->is_materialized()) {
|
|
continue;
|
|
}
|
|
auto it = _params->expr_of_dest_slot.find(slot_desc->id());
|
|
if (it == std::end(_params->expr_of_dest_slot)) {
|
|
return Status::InternalError("No expr for dest slot, id={}, name={}",
|
|
slot_desc->id(), slot_desc->col_name());
|
|
}
|
|
|
|
vectorized::VExprContextSPtr ctx;
|
|
if (!it->second.nodes.empty()) {
|
|
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx));
|
|
RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
|
|
RETURN_IF_ERROR(ctx->open(_state));
|
|
}
|
|
_dest_vexpr_ctx.emplace_back(ctx);
|
|
_dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
|
|
|
|
if (has_slot_id_map) {
|
|
auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
|
|
if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
|
|
_src_slot_descs_order_by_dest.emplace_back(nullptr);
|
|
} else {
|
|
auto _src_slot_it = full_src_slot_map.find(it1->second);
|
|
if (_src_slot_it == std::end(full_src_slot_map)) {
|
|
return Status::InternalError("No src slot {} in src slot descs",
|
|
it1->second);
|
|
}
|
|
_dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
|
|
full_src_index_map[_src_slot_it->first]);
|
|
_src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status VFileScanner::close(RuntimeState* state) {
|
|
if (_is_closed) {
|
|
return Status::OK();
|
|
}
|
|
|
|
if (_cur_reader) {
|
|
RETURN_IF_ERROR(_cur_reader->close());
|
|
}
|
|
|
|
RETURN_IF_ERROR(VScanner::close(state));
|
|
return Status::OK();
|
|
}
|
|
|
|
void VFileScanner::try_stop() {
|
|
VScanner::try_stop();
|
|
if (_io_ctx) {
|
|
_io_ctx->should_stop = true;
|
|
}
|
|
}
|
|
|
|
void VFileScanner::_collect_profile_before_close() {
|
|
VScanner::_collect_profile_before_close();
|
|
if (config::enable_file_cache && _state->query_options().enable_file_cache &&
|
|
_profile != nullptr) {
|
|
io::FileCacheProfileReporter cache_profile(_profile);
|
|
cache_profile.update(_file_cache_statistics.get());
|
|
}
|
|
|
|
if (_cur_reader != nullptr) {
|
|
_cur_reader->collect_profile_before_close();
|
|
}
|
|
}
|
|
|
|
} // namespace doris::vectorized
|