bp #37040
This commit is contained in:
@ -1250,6 +1250,11 @@ DEFINE_Int64(min_row_group_size, "134217728");
|
||||
// The time out milliseconds for remote fetch schema RPC, default 60s
|
||||
DEFINE_mInt64(fetch_remote_schema_rpc_timeout_ms, "60000");
|
||||
|
||||
// If set to false, the parquet reader will not use page index to filter data.
|
||||
// This is only for debug purpose, in case sometimes the page index
|
||||
// filter wrong data.
|
||||
DEFINE_mBool(enable_parquet_page_index, "true");
|
||||
|
||||
// clang-format off
|
||||
#ifdef BE_TEST
|
||||
// test s3
|
||||
|
||||
@ -1334,6 +1334,8 @@ DECLARE_mInt64(fetch_remote_schema_rpc_timeout_ms);
|
||||
// The minimum row group size when exporting Parquet files.
|
||||
DECLARE_Int64(min_row_group_size);
|
||||
|
||||
DECLARE_mBool(enable_parquet_page_index);
|
||||
|
||||
#ifdef BE_TEST
|
||||
// test s3
|
||||
DECLARE_String(test_s3_resource);
|
||||
|
||||
@ -148,6 +148,10 @@ void ParquetReader::_init_profile() {
|
||||
ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "FileNum", TUnit::UNIT, parquet_profile, 1);
|
||||
_parquet_profile.page_index_filter_time =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexFilterTime", parquet_profile, 1);
|
||||
_parquet_profile.read_page_index_time =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexReadTime", parquet_profile, 1);
|
||||
_parquet_profile.parse_page_index_time =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexParseTime", parquet_profile, 1);
|
||||
_parquet_profile.row_group_filter_time =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RowGroupFilterTime", parquet_profile, 1);
|
||||
|
||||
@ -747,25 +751,32 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
|
||||
return Status::OK();
|
||||
}
|
||||
PageIndex page_index;
|
||||
if (!_has_page_index(row_group.columns, page_index)) {
|
||||
if (!config::enable_parquet_page_index || !_has_page_index(row_group.columns, page_index)) {
|
||||
read_whole_row_group();
|
||||
return Status::OK();
|
||||
}
|
||||
uint8_t col_index_buff[page_index._column_index_size];
|
||||
size_t bytes_read = 0;
|
||||
Slice result(col_index_buff, page_index._column_index_size);
|
||||
RETURN_IF_ERROR(
|
||||
_file_reader->read_at(page_index._column_index_start, result, &bytes_read, _io_ctx));
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_statistics.read_page_index_time);
|
||||
RETURN_IF_ERROR(_file_reader->read_at(page_index._column_index_start, result, &bytes_read,
|
||||
_io_ctx));
|
||||
}
|
||||
_column_statistics.read_bytes += bytes_read;
|
||||
auto& schema_desc = _file_metadata->schema();
|
||||
std::vector<RowRange> skipped_row_ranges;
|
||||
uint8_t off_index_buff[page_index._offset_index_size];
|
||||
Slice res(off_index_buff, page_index._offset_index_size);
|
||||
RETURN_IF_ERROR(
|
||||
_file_reader->read_at(page_index._offset_index_start, res, &bytes_read, _io_ctx));
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_statistics.read_page_index_time);
|
||||
RETURN_IF_ERROR(
|
||||
_file_reader->read_at(page_index._offset_index_start, res, &bytes_read, _io_ctx));
|
||||
}
|
||||
_column_statistics.read_bytes += bytes_read;
|
||||
// read twice: parse column index & parse offset index
|
||||
_column_statistics.meta_read_calls += 2;
|
||||
SCOPED_RAW_TIMER(&_statistics.parse_page_index_time);
|
||||
for (auto& read_col : _read_columns) {
|
||||
auto conjunct_iter = _colname_to_value_range->find(read_col);
|
||||
if (_colname_to_value_range->end() == conjunct_iter) {
|
||||
@ -935,6 +946,8 @@ void ParquetReader::_collect_profile() {
|
||||
COUNTER_UPDATE(_parquet_profile.open_file_time, _statistics.open_file_time);
|
||||
COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num);
|
||||
COUNTER_UPDATE(_parquet_profile.page_index_filter_time, _statistics.page_index_filter_time);
|
||||
COUNTER_UPDATE(_parquet_profile.read_page_index_time, _statistics.read_page_index_time);
|
||||
COUNTER_UPDATE(_parquet_profile.parse_page_index_time, _statistics.parse_page_index_time);
|
||||
COUNTER_UPDATE(_parquet_profile.row_group_filter_time, _statistics.row_group_filter_time);
|
||||
|
||||
COUNTER_UPDATE(_parquet_profile.skip_page_header_num, _column_statistics.skip_page_header_num);
|
||||
|
||||
@ -89,6 +89,8 @@ public:
|
||||
int64_t open_file_num = 0;
|
||||
int64_t row_group_filter_time = 0;
|
||||
int64_t page_index_filter_time = 0;
|
||||
int64_t read_page_index_time = 0;
|
||||
int64_t parse_page_index_time = 0;
|
||||
};
|
||||
|
||||
ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
|
||||
@ -170,6 +172,8 @@ private:
|
||||
RuntimeProfile::Counter* open_file_num = nullptr;
|
||||
RuntimeProfile::Counter* row_group_filter_time = nullptr;
|
||||
RuntimeProfile::Counter* page_index_filter_time = nullptr;
|
||||
RuntimeProfile::Counter* read_page_index_time = nullptr;
|
||||
RuntimeProfile::Counter* parse_page_index_time = nullptr;
|
||||
|
||||
RuntimeProfile::Counter* file_read_time = nullptr;
|
||||
RuntimeProfile::Counter* file_read_calls = nullptr;
|
||||
|
||||
@ -1944,16 +1944,21 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = false, masterOnly = false)
|
||||
public static long max_hive_partition_cache_num = 100000;
|
||||
|
||||
@ConfField(mutable = false, masterOnly = false, description = {"Hive表到分区名列表缓存的最大数量。",
|
||||
"Max cache number of hive table to partition names list."})
|
||||
@ConfField(mutable = false, masterOnly = false, description = {"Hive表名缓存的最大数量。",
|
||||
"Max cache number of hive table name list."})
|
||||
public static long max_hive_table_cache_num = 1000;
|
||||
|
||||
@ConfField(mutable = false, masterOnly = false, description = {
|
||||
"Hive分区表缓存的最大数量", "Max cache number of hive partition table"
|
||||
})
|
||||
public static long max_hive_partition_table_cache_num = 1000;
|
||||
|
||||
@ConfField(mutable = false, masterOnly = false, description = {"获取Hive分区值时候的最大返回数量,-1代表没有限制。",
|
||||
"Max number of hive partition values to return while list partitions, -1 means no limitation."})
|
||||
"Max number of hive partition values to return while list partitions, -1 means no limitation."})
|
||||
public static short max_hive_list_partition_num = -1;
|
||||
|
||||
@ConfField(mutable = false, masterOnly = false, description = {"远程文件系统缓存的最大数量",
|
||||
"Max cache number of remote file system."})
|
||||
"Max cache number of remote file system."})
|
||||
public static long max_remote_file_system_cache_num = 100;
|
||||
|
||||
@ConfField(mutable = false, masterOnly = false, description = {"外表行数缓存最大数量",
|
||||
|
||||
@ -136,16 +136,16 @@ public class HiveMetaStoreCache {
|
||||
**/
|
||||
private void init() {
|
||||
CacheFactory partitionValuesCacheFactory = new CacheFactory(
|
||||
OptionalLong.of(86400L),
|
||||
OptionalLong.of(28800L),
|
||||
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
|
||||
Config.max_hive_table_cache_num,
|
||||
Config.max_hive_partition_table_cache_num,
|
||||
false,
|
||||
null);
|
||||
partitionValuesCache = partitionValuesCacheFactory.buildCache(key -> loadPartitionValues(key), null,
|
||||
refreshExecutor);
|
||||
|
||||
CacheFactory partitionCacheFactory = new CacheFactory(
|
||||
OptionalLong.of(86400L),
|
||||
OptionalLong.of(28800L),
|
||||
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
|
||||
Config.max_hive_partition_cache_num,
|
||||
false,
|
||||
|
||||
Reference in New Issue
Block a user