[fix](parquet)Fixed the problem that when Parquert reader use index to read files, there will be multiple threads modify same object (#50161) (#50496)
bp #50161
This commit is contained in:
@ -55,7 +55,7 @@ Status AvroJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor
|
||||
}
|
||||
|
||||
Status AvroJNIReader::init_fetch_table_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
_colname_to_value_range = colname_to_value_range;
|
||||
std::ostringstream required_fields;
|
||||
std::ostringstream columns_types;
|
||||
|
||||
@ -71,7 +71,7 @@ public:
|
||||
std::unordered_set<std::string>* missing_cols) override;
|
||||
|
||||
Status init_fetch_table_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
|
||||
TFileType::type get_file_type();
|
||||
|
||||
@ -85,7 +85,7 @@ public:
|
||||
private:
|
||||
const TFileScanRangeParams _params;
|
||||
const TFileRangeDesc _range;
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -79,7 +79,7 @@ Status MockJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor
|
||||
}
|
||||
|
||||
Status MockJniReader::init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
_colname_to_value_range = colname_to_value_range;
|
||||
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
|
||||
return _jni_connector->open(_state, _profile);
|
||||
|
||||
@ -83,7 +83,7 @@ public:
|
||||
std::unordered_set<std::string>* missing_cols) override;
|
||||
|
||||
Status init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
|
||||
Status close() override {
|
||||
if (_jni_connector) {
|
||||
@ -100,7 +100,7 @@ protected:
|
||||
}
|
||||
|
||||
private:
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -279,7 +279,7 @@ Status OrcReader::_create_file_reader() {
|
||||
|
||||
Status OrcReader::init_reader(
|
||||
const std::vector<std::string>* column_names,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
|
||||
const RowDescriptor* row_descriptor,
|
||||
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
|
||||
@ -694,7 +694,7 @@ bool static build_search_argument(std::vector<OrcPredicate>& predicates, int ind
|
||||
}
|
||||
|
||||
bool OrcReader::_init_search_argument(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
if ((!_enable_filter_by_min_max) || colname_to_value_range->empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -142,7 +142,7 @@ public:
|
||||
//If you want to read the file by index instead of column name, set hive_use_column_names to false.
|
||||
Status init_reader(
|
||||
const std::vector<std::string>* column_names,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, bool is_acid,
|
||||
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
|
||||
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
|
||||
@ -291,7 +291,7 @@ private:
|
||||
static bool _check_acid_schema(const orc::Type& type);
|
||||
static const orc::Type& _remove_acid(const orc::Type& type);
|
||||
bool _init_search_argument(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
void _init_bloom_filter(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
void _init_system_properties();
|
||||
@ -598,7 +598,7 @@ private:
|
||||
std::vector<DecimalScaleParams> _decimal_scale_params;
|
||||
size_t _decimal_scale_params_index;
|
||||
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
|
||||
bool _is_acid = false;
|
||||
std::unique_ptr<IColumn::Filter> _filter;
|
||||
LazyReadContext _lazy_read_ctx;
|
||||
|
||||
@ -57,7 +57,7 @@ Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
|
||||
}
|
||||
|
||||
Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index,
|
||||
ColumnValueRangeType& col_val_range,
|
||||
const ColumnValueRangeType& col_val_range,
|
||||
const FieldSchema* col_schema,
|
||||
std::vector<int>& skipped_ranges,
|
||||
const cctz::time_zone& ctz) {
|
||||
|
||||
@ -47,7 +47,7 @@ public:
|
||||
Status create_skipped_row_range(tparquet::OffsetIndex& offset_index, int total_rows_of_group,
|
||||
int page_idx, RowRange* row_range);
|
||||
Status collect_skipped_page_range(tparquet::ColumnIndex* column_index,
|
||||
ColumnValueRangeType& col_val_range,
|
||||
const ColumnValueRangeType& col_val_range,
|
||||
const FieldSchema* col_schema,
|
||||
std::vector<int>& skipped_ranges, const cctz::time_zone& ctz);
|
||||
bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns);
|
||||
|
||||
@ -295,7 +295,7 @@ void ParquetReader::iceberg_sanitize(const std::vector<std::string>& read_column
|
||||
Status ParquetReader::init_reader(
|
||||
const std::vector<std::string>& all_column_names,
|
||||
const std::vector<std::string>& missing_column_names,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
|
||||
const RowDescriptor* row_descriptor,
|
||||
const std::unordered_map<std::string, int>* colname_to_slot_id,
|
||||
@ -346,7 +346,6 @@ Status ParquetReader::init_reader(
|
||||
_missing_cols.emplace_back(name);
|
||||
}
|
||||
} else {
|
||||
std::unordered_map<std::string, ColumnValueRangeType> new_colname_to_value_range;
|
||||
const auto& table_column_idxs = _scan_params.column_idxs;
|
||||
std::map<int, int> table_col_id_to_idx;
|
||||
for (int i = 0; i < table_column_idxs.size(); i++) {
|
||||
@ -360,21 +359,15 @@ Status ParquetReader::init_reader(
|
||||
auto& table_col = all_column_names[idx];
|
||||
auto file_col = schema_desc.get_column(id)->name;
|
||||
_read_columns.emplace_back(file_col);
|
||||
_table_col_to_file_col[table_col] = file_col;
|
||||
|
||||
if (table_col != file_col) {
|
||||
_table_col_to_file_col[table_col] = file_col;
|
||||
auto iter = _colname_to_value_range->find(table_col);
|
||||
if (iter != _colname_to_value_range->end()) {
|
||||
continue;
|
||||
}
|
||||
new_colname_to_value_range[file_col] = iter->second;
|
||||
_colname_to_value_range->erase(iter->first);
|
||||
auto iter = _colname_to_value_range->find(table_col);
|
||||
if (iter != _colname_to_value_range->end()) {
|
||||
_colname_to_value_range_index_read.emplace(file_col, iter->second);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto it : new_colname_to_value_range) {
|
||||
_colname_to_value_range->emplace(it.first, std::move(it.second));
|
||||
}
|
||||
_colname_to_value_range = &_colname_to_value_range_index_read;
|
||||
}
|
||||
// build column predicates for column lazy read
|
||||
_lazy_read_ctx.conjuncts = conjuncts;
|
||||
|
||||
@ -111,7 +111,7 @@ public:
|
||||
Status init_reader(
|
||||
const std::vector<std::string>& all_column_names,
|
||||
const std::vector<std::string>& missing_column_names,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
|
||||
const RowDescriptor* row_descriptor,
|
||||
const std::unordered_map<std::string, int>* colname_to_slot_id,
|
||||
@ -251,7 +251,12 @@ private:
|
||||
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
|
||||
// table column name to file column name map. For iceberg schema evolution.
|
||||
std::unordered_map<std::string, std::string> _table_col_to_file_col;
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
|
||||
|
||||
// During initialization, multiple vfile_scanner's _colname_to_value_range will point to the same object,
|
||||
// so the content in the object cannot be modified (there is a multi-threading problem).
|
||||
// _colname_to_value_range_index_read used when _hive_use_column_names = false.
|
||||
std::unordered_map<std::string, ColumnValueRangeType> _colname_to_value_range_index_read;
|
||||
std::vector<std::string> _read_columns;
|
||||
RowRange _whole_range = RowRange(0, 0);
|
||||
const std::vector<int64_t>* _delete_rows = nullptr;
|
||||
|
||||
@ -95,7 +95,7 @@ Status HudiJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor
|
||||
}
|
||||
|
||||
Status HudiJniReader::init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
_colname_to_value_range = colname_to_value_range;
|
||||
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
|
||||
return _jni_connector->open(_state, _profile);
|
||||
|
||||
@ -58,12 +58,12 @@ public:
|
||||
std::unordered_set<std::string>* missing_cols) override;
|
||||
|
||||
Status init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
|
||||
private:
|
||||
const TFileScanRangeParams& _scan_params;
|
||||
const THudiFileDesc& _hudi_params;
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -530,7 +530,7 @@ void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFil
|
||||
Status IcebergParquetReader::init_reader(
|
||||
const std::vector<std::string>& file_col_names,
|
||||
const std::unordered_map<int, std::string>& col_id_name_map,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
|
||||
const RowDescriptor* row_descriptor,
|
||||
const std::unordered_map<std::string, int>* colname_to_slot_id,
|
||||
@ -603,7 +603,7 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d
|
||||
Status IcebergOrcReader::init_reader(
|
||||
const std::vector<std::string>& file_col_names,
|
||||
const std::unordered_map<int, std::string>& col_id_name_map,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
|
||||
const RowDescriptor* row_descriptor,
|
||||
const std::unordered_map<std::string, int>* colname_to_slot_id,
|
||||
|
||||
@ -150,7 +150,7 @@ protected:
|
||||
std::unordered_map<std::string, std::string> _file_col_to_table_col;
|
||||
// table column name to file column name map. For iceberg schema evolution.
|
||||
std::unordered_map<std::string, std::string> _table_col_to_file_col;
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
|
||||
// copy from _colname_to_value_range with new column name that is in parquet/orc file, to support schema evolution.
|
||||
std::unordered_map<std::string, ColumnValueRangeType> _new_colname_to_value_range;
|
||||
// column id to name map. Collect from FE slot descriptor.
|
||||
@ -205,7 +205,7 @@ public:
|
||||
Status init_reader(
|
||||
const std::vector<std::string>& file_col_names,
|
||||
const std::unordered_map<int, std::string>& col_id_name_map,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
|
||||
const RowDescriptor* row_descriptor,
|
||||
const std::unordered_map<std::string, int>* colname_to_slot_id,
|
||||
@ -251,7 +251,7 @@ public:
|
||||
Status init_reader(
|
||||
const std::vector<std::string>& file_col_names,
|
||||
const std::unordered_map<int, std::string>& col_id_name_map,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
|
||||
const RowDescriptor* row_descriptor,
|
||||
const std::unordered_map<std::string, int>* colname_to_slot_id,
|
||||
|
||||
@ -104,7 +104,7 @@ Status MaxComputeJniReader::get_columns(
|
||||
}
|
||||
|
||||
Status MaxComputeJniReader::init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
_colname_to_value_range = colname_to_value_range;
|
||||
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
|
||||
return _jni_connector->open(_state, _profile);
|
||||
|
||||
@ -65,13 +65,13 @@ public:
|
||||
std::unordered_set<std::string>* missing_cols) override;
|
||||
|
||||
Status init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
|
||||
private:
|
||||
const MaxComputeTableDescriptor* _table_desc = nullptr;
|
||||
const TMaxComputeFileDesc& _max_compute_params;
|
||||
const TFileRangeDesc& _range;
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -96,7 +96,7 @@ Status PaimonJniReader::get_columns(std::unordered_map<std::string, TypeDescript
|
||||
}
|
||||
|
||||
Status PaimonJniReader::init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
_colname_to_value_range = colname_to_value_range;
|
||||
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
|
||||
return _jni_connector->open(_state, _profile);
|
||||
|
||||
@ -64,10 +64,10 @@ public:
|
||||
std::unordered_set<std::string>* missing_cols) override;
|
||||
|
||||
Status init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
|
||||
private:
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -56,7 +56,7 @@ TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader>
|
||||
|
||||
Status TransactionalHiveReader::init_reader(
|
||||
const std::vector<std::string>& column_names,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
|
||||
const RowDescriptor* row_descriptor,
|
||||
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
|
||||
|
||||
@ -98,7 +98,7 @@ public:
|
||||
|
||||
Status init_reader(
|
||||
const std::vector<std::string>& column_names,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
|
||||
const RowDescriptor* row_descriptor,
|
||||
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
|
||||
|
||||
@ -91,7 +91,7 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
|
||||
}
|
||||
|
||||
Status JniConnector::init(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
// TODO: This logic need to be changed.
|
||||
// See the comment of "predicates" field in JniScanner.java
|
||||
|
||||
@ -408,7 +408,7 @@ Status JniConnector::_fill_struct_column(TableMetaAddress& address, MutableColum
|
||||
}
|
||||
|
||||
void JniConnector::_generate_predicates(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
if (colname_to_value_range == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -222,7 +222,8 @@ public:
|
||||
* number_filters(4) | length(4) | column_name | op(4) | scale(4) | num_values(4) | value_length(4) | value | ...
|
||||
* Then, pass the byte array address in configuration map, like "push_down_predicates=${address}"
|
||||
*/
|
||||
Status init(std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
Status init(
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
|
||||
/**
|
||||
* Call java side function JniScanner.getNextBatchMeta. The columns information are stored as long array:
|
||||
@ -353,7 +354,7 @@ private:
|
||||
}
|
||||
|
||||
void _generate_predicates(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
|
||||
template <PrimitiveType primitive_type>
|
||||
void _parse_value_range(const ColumnValueRange<primitive_type>& col_val_range,
|
||||
|
||||
@ -107,7 +107,7 @@ protected:
|
||||
|
||||
std::unique_ptr<GenericReader> _cur_reader;
|
||||
bool _cur_reader_eof;
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
|
||||
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
|
||||
// File source slot descriptors
|
||||
std::vector<SlotDescriptor*> _file_slot_descs;
|
||||
// col names from _file_slot_descs
|
||||
|
||||
@ -56,10 +56,9 @@ public:
|
||||
ParquetReaderTest() {}
|
||||
};
|
||||
|
||||
TEST_F(ParquetReaderTest, normal) {
|
||||
TDescriptorTable t_desc_table;
|
||||
TTableDescriptor t_table_desc;
|
||||
|
||||
static void create_table_desc(TDescriptorTable& t_desc_table, TTableDescriptor& t_table_desc,
|
||||
std::vector<std::string> table_column_names,
|
||||
std::vector<TPrimitiveType::type> types) {
|
||||
t_table_desc.id = 0;
|
||||
t_table_desc.tableType = TTableType::OLAP_TABLE;
|
||||
t_table_desc.numCols = 0;
|
||||
@ -68,10 +67,7 @@ TEST_F(ParquetReaderTest, normal) {
|
||||
t_desc_table.__isset.tableDescriptors = true;
|
||||
|
||||
// init boolean and numeric slot
|
||||
std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col", "smallint_col",
|
||||
"int_col", "bigint_col", "float_col",
|
||||
"double_col"};
|
||||
for (int i = 0; i < numeric_types.size(); i++) {
|
||||
for (int i = 0; i < table_column_names.size(); i++) {
|
||||
TSlotDescriptor tslot_desc;
|
||||
{
|
||||
tslot_desc.id = i;
|
||||
@ -81,7 +77,7 @@ TEST_F(ParquetReaderTest, normal) {
|
||||
TTypeNode node;
|
||||
node.__set_type(TTypeNodeType::SCALAR);
|
||||
TScalarType scalar_type;
|
||||
scalar_type.__set_type(TPrimitiveType::type(i + 2));
|
||||
scalar_type.__set_type(types[i]);
|
||||
node.__set_scalar_type(scalar_type);
|
||||
type.types.push_back(node);
|
||||
}
|
||||
@ -90,7 +86,7 @@ TEST_F(ParquetReaderTest, normal) {
|
||||
tslot_desc.byteOffset = 0;
|
||||
tslot_desc.nullIndicatorByte = 0;
|
||||
tslot_desc.nullIndicatorBit = -1;
|
||||
tslot_desc.colName = numeric_types[i];
|
||||
tslot_desc.colName = table_column_names[i];
|
||||
tslot_desc.slotIdx = 0;
|
||||
tslot_desc.isMaterialized = true;
|
||||
t_desc_table.slotDescriptors.push_back(tslot_desc);
|
||||
@ -108,6 +104,19 @@ TEST_F(ParquetReaderTest, normal) {
|
||||
t_tuple_desc.__isset.tableId = true;
|
||||
t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(ParquetReaderTest, normal) {
|
||||
TDescriptorTable t_desc_table;
|
||||
TTableDescriptor t_table_desc;
|
||||
std::vector<std::string> table_column_names = {"boolean_col", "tinyint_col", "smallint_col",
|
||||
"int_col", "bigint_col", "float_col",
|
||||
"double_col"};
|
||||
std::vector<TPrimitiveType::type> table_column_types = {
|
||||
TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT, TPrimitiveType::SMALLINT,
|
||||
TPrimitiveType::INT, TPrimitiveType::BIGINT, TPrimitiveType::FLOAT,
|
||||
TPrimitiveType::DOUBLE};
|
||||
create_table_desc(t_desc_table, t_table_desc, table_column_names, table_column_types);
|
||||
DescriptorTbl* desc_tbl;
|
||||
ObjectPool obj_pool;
|
||||
static_cast<void>(DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl));
|
||||
@ -164,5 +173,163 @@ TEST_F(ParquetReaderTest, normal) {
|
||||
delete p_reader;
|
||||
}
|
||||
|
||||
static ParquetReader* create_parquet_reader(TFileScanRangeParams& scan_params,
|
||||
std::vector<std::string> table_column_names,
|
||||
std::vector<TPrimitiveType::type> types) {
|
||||
TDescriptorTable t_desc_table;
|
||||
TTableDescriptor t_table_desc;
|
||||
|
||||
create_table_desc(t_desc_table, t_table_desc, table_column_names, types);
|
||||
DescriptorTbl* desc_tbl;
|
||||
ObjectPool obj_pool;
|
||||
static_cast<void>(DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl));
|
||||
|
||||
auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
|
||||
auto local_fs = io::global_local_filesystem();
|
||||
io::FileReaderSPtr reader;
|
||||
static_cast<void>(local_fs->open_file(
|
||||
"./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", &reader));
|
||||
|
||||
cctz::time_zone ctz;
|
||||
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
|
||||
std::vector<std::string> column_names;
|
||||
std::vector<std::string> missing_column_names;
|
||||
for (int i = 0; i < slot_descs.size(); i++) {
|
||||
column_names.push_back(slot_descs[i]->col_name());
|
||||
}
|
||||
TFileRangeDesc scan_range;
|
||||
{
|
||||
scan_range.start_offset = 0;
|
||||
scan_range.size = 1000;
|
||||
}
|
||||
auto p_reader =
|
||||
new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz, nullptr, nullptr);
|
||||
p_reader->set_file_reader(reader);
|
||||
return p_reader;
|
||||
}
|
||||
|
||||
TEST_F(ParquetReaderTest, use_column_name) {
|
||||
bool use_column_name = true;
|
||||
|
||||
std::vector<std::string> table_column_names = {"boolean_col", "tinyint_col", "smallint_col",
|
||||
"int_col", "bigint_col", "float_col",
|
||||
"double_col"};
|
||||
std::vector<TPrimitiveType::type> table_column_types = {
|
||||
TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT, TPrimitiveType::SMALLINT,
|
||||
TPrimitiveType::INT, TPrimitiveType::BIGINT, TPrimitiveType::FLOAT,
|
||||
TPrimitiveType::DOUBLE};
|
||||
TFileScanRangeParams scan_params;
|
||||
|
||||
auto p_reader = create_parquet_reader(scan_params, table_column_names, table_column_types);
|
||||
std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
|
||||
colname_to_value_range.emplace("boolean_col", ColumnValueRange<TYPE_BOOLEAN>("boolean_col"));
|
||||
colname_to_value_range.emplace("tinyint_col", ColumnValueRange<TYPE_TINYINT>("tinyint_col"));
|
||||
colname_to_value_range.emplace("smallint_col", ColumnValueRange<TYPE_SMALLINT>("smallint_col"));
|
||||
colname_to_value_range.emplace("int_col", ColumnValueRange<TYPE_INT>("int_col"));
|
||||
|
||||
static_cast<void>(p_reader->open());
|
||||
static_cast<void>(p_reader->init_reader(table_column_names, {}, &colname_to_value_range, {},
|
||||
nullptr, nullptr, nullptr, nullptr, nullptr, false,
|
||||
use_column_name));
|
||||
|
||||
std::vector<std::string> read_columns_ans = {"tinyint_col", "smallint_col", "int_col",
|
||||
"bigint_col", "boolean_col", "float_col",
|
||||
"double_col"};
|
||||
EXPECT_EQ(p_reader->_read_columns, read_columns_ans);
|
||||
|
||||
std::vector<std::string> miss_columns_ans = {};
|
||||
EXPECT_EQ(p_reader->_missing_cols, miss_columns_ans);
|
||||
std::vector<std::string> colname_to_value_range_names_ans = {"tinyint_col", "smallint_col",
|
||||
"int_col", "boolean_col"};
|
||||
for (auto col : colname_to_value_range_names_ans) {
|
||||
EXPECT_TRUE(p_reader->_colname_to_value_range->contains(col));
|
||||
}
|
||||
EXPECT_EQ(p_reader->_colname_to_value_range->size(), colname_to_value_range_names_ans.size());
|
||||
delete p_reader;
|
||||
}
|
||||
|
||||
TEST_F(ParquetReaderTest, use_column_name2) {
|
||||
bool use_column_name = true;
|
||||
|
||||
std::vector<std::string> table_column_names = {"boolean_col", "tinyint_col", "smallint_col",
|
||||
"int_col", "bigint_col", "float_col",
|
||||
"test1", "double_col", "test2"};
|
||||
std::vector<TPrimitiveType::type> table_column_types = {
|
||||
TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT, TPrimitiveType::SMALLINT,
|
||||
TPrimitiveType::INT, TPrimitiveType::BIGINT, TPrimitiveType::FLOAT,
|
||||
TPrimitiveType::FLOAT, TPrimitiveType::DOUBLE, TPrimitiveType::DOUBLE};
|
||||
TFileScanRangeParams scan_params;
|
||||
|
||||
auto p_reader = create_parquet_reader(scan_params, table_column_names, table_column_types);
|
||||
std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
|
||||
colname_to_value_range.emplace("boolean_col", ColumnValueRange<TYPE_BOOLEAN>("boolean_col"));
|
||||
colname_to_value_range.emplace("tinyint_col", ColumnValueRange<TYPE_TINYINT>("tinyint_col"));
|
||||
colname_to_value_range.emplace("smallint_col", ColumnValueRange<TYPE_SMALLINT>("smallint_col"));
|
||||
colname_to_value_range.emplace("int_col", ColumnValueRange<TYPE_INT>("int_col"));
|
||||
|
||||
static_cast<void>(p_reader->open());
|
||||
static_cast<void>(p_reader->init_reader(table_column_names, {"boolean_col"},
|
||||
&colname_to_value_range, {}, nullptr, nullptr, nullptr,
|
||||
nullptr, nullptr, false, use_column_name));
|
||||
|
||||
std::vector<std::string> read_columns_ans = {"tinyint_col", "smallint_col", "int_col",
|
||||
"bigint_col", "float_col", "double_col"};
|
||||
EXPECT_EQ(p_reader->_read_columns, read_columns_ans);
|
||||
|
||||
std::vector<std::string> miss_columns_ans = {"boolean_col", "test1", "test2"};
|
||||
EXPECT_EQ(p_reader->_missing_cols, miss_columns_ans);
|
||||
std::vector<std::string> colname_to_value_range_names_ans = {"tinyint_col", "smallint_col",
|
||||
"int_col", "boolean_col"};
|
||||
for (auto col : colname_to_value_range_names_ans) {
|
||||
EXPECT_TRUE(p_reader->_colname_to_value_range->contains(col));
|
||||
}
|
||||
EXPECT_EQ(p_reader->_colname_to_value_range->size(), colname_to_value_range_names_ans.size());
|
||||
delete p_reader;
|
||||
}
|
||||
|
||||
TEST_F(ParquetReaderTest, use_column_idx) {
|
||||
bool use_column_name = false;
|
||||
|
||||
std::vector<std::string> table_column_names = {"col0", "col1", "col3",
|
||||
"col7", "col100", "col102"};
|
||||
std::vector<TPrimitiveType::type> table_column_types = {
|
||||
TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT, TPrimitiveType::SMALLINT,
|
||||
TPrimitiveType::INT, TPrimitiveType::BIGINT, TPrimitiveType::BIGINT};
|
||||
TFileScanRangeParams scan_params;
|
||||
scan_params.column_idxs.emplace_back(0);
|
||||
scan_params.column_idxs.emplace_back(1);
|
||||
scan_params.column_idxs.emplace_back(3);
|
||||
scan_params.column_idxs.emplace_back(7);
|
||||
scan_params.column_idxs.emplace_back(100);
|
||||
scan_params.column_idxs.emplace_back(102);
|
||||
|
||||
auto p_reader = create_parquet_reader(scan_params, table_column_names, table_column_types);
|
||||
std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
|
||||
colname_to_value_range.emplace("col0", ColumnValueRange<TYPE_BOOLEAN>("col0"));
|
||||
colname_to_value_range.emplace("col1", ColumnValueRange<TYPE_TINYINT>("col1"));
|
||||
colname_to_value_range.emplace("col3", ColumnValueRange<TYPE_SMALLINT>("col3"));
|
||||
colname_to_value_range.emplace("col102", ColumnValueRange<TYPE_SMALLINT>("col102"));
|
||||
|
||||
static_cast<void>(p_reader->open());
|
||||
static_cast<void>(p_reader->init_reader(table_column_names, {}, &colname_to_value_range, {},
|
||||
nullptr, nullptr, nullptr, nullptr, nullptr, false,
|
||||
use_column_name));
|
||||
|
||||
std::vector<std::string> read_columns_ans = {"tinyint_col", "smallint_col", "bigint_col",
|
||||
"string_col"};
|
||||
EXPECT_EQ(p_reader->_read_columns, read_columns_ans);
|
||||
|
||||
std::vector<std::string> miss_columns_ans = {"col100", "col102"};
|
||||
EXPECT_EQ(p_reader->_missing_cols, miss_columns_ans);
|
||||
|
||||
std::vector<std::string> colname_to_value_range_names_ans = {"tinyint_col", "smallint_col",
|
||||
"bigint_col"};
|
||||
for (auto col : colname_to_value_range_names_ans) {
|
||||
EXPECT_TRUE(p_reader->_colname_to_value_range->contains(col));
|
||||
}
|
||||
EXPECT_EQ(p_reader->_colname_to_value_range->size(), colname_to_value_range_names_ans.size());
|
||||
delete p_reader;
|
||||
}
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user