[opt](hive) support orc generated from hive 1.x for all file scan node (#28806)
This commit is contained in:
@ -146,7 +146,6 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
|
||||
_range_start_offset(range.start_offset),
|
||||
_range_size(range.size),
|
||||
_ctz(ctz),
|
||||
_is_hive(params.__isset.slot_name_to_schema_pos),
|
||||
_io_ctx(io_ctx),
|
||||
_enable_lazy_mat(enable_lazy_mat),
|
||||
_is_dict_cols_converted(false) {
|
||||
@ -165,7 +164,6 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r
|
||||
_scan_params(params),
|
||||
_scan_range(range),
|
||||
_ctz(ctz),
|
||||
_is_hive(params.__isset.slot_name_to_schema_pos),
|
||||
_file_system(nullptr),
|
||||
_io_ctx(io_ctx),
|
||||
_enable_lazy_mat(enable_lazy_mat),
|
||||
@ -307,11 +305,15 @@ Status OrcReader::_init_read_columns() {
|
||||
auto& root_type = _reader->getType();
|
||||
std::vector<std::string> orc_cols;
|
||||
std::vector<std::string> orc_cols_lower_case;
|
||||
_init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map);
|
||||
bool is_hive1_orc = false;
|
||||
_init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map, &is_hive1_orc);
|
||||
|
||||
// In old version slot_name_to_schema_pos may not be set in _scan_params
|
||||
// TODO, should be removed in 2.2 or later
|
||||
_is_hive1_orc = is_hive1_orc && _scan_params.__isset.slot_name_to_schema_pos;
|
||||
for (size_t i = 0; i < _column_names->size(); ++i) {
|
||||
auto& col_name = (*_column_names)[i];
|
||||
if (_is_hive) {
|
||||
if (_is_hive1_orc) {
|
||||
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
|
||||
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
|
||||
int pos = iter->second;
|
||||
@ -346,7 +348,7 @@ Status OrcReader::_init_read_columns() {
|
||||
_read_cols_lower_case.emplace_back(col_name);
|
||||
// For hive engine, store the orc column name to schema column name map.
|
||||
// This is for Hive 1.x orc file with internal column name _col0, _col1...
|
||||
if (_is_hive) {
|
||||
if (_is_hive1_orc) {
|
||||
_removed_acid_file_col_name_to_schema_col[orc_cols[pos]] = col_name;
|
||||
}
|
||||
_col_name_to_file_col_name[col_name] = read_col;
|
||||
@ -357,20 +359,26 @@ Status OrcReader::_init_read_columns() {
|
||||
|
||||
void OrcReader::_init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
|
||||
std::vector<std::string>& orc_cols_lower_case,
|
||||
std::unordered_map<std::string, const orc::Type*>& type_map) {
|
||||
std::unordered_map<std::string, const orc::Type*>& type_map,
|
||||
bool* is_hive1_orc) {
|
||||
bool hive1_orc = true;
|
||||
for (int i = 0; i < type.getSubtypeCount(); ++i) {
|
||||
orc_cols.emplace_back(type.getFieldName(i));
|
||||
auto filed_name_lower_case = _get_field_name_lower_case(&type, i);
|
||||
if (hive1_orc) {
|
||||
hive1_orc = _is_hive1_col_name(filed_name_lower_case);
|
||||
}
|
||||
auto filed_name_lower_case_copy = filed_name_lower_case;
|
||||
orc_cols_lower_case.emplace_back(std::move(filed_name_lower_case));
|
||||
type_map.emplace(std::move(filed_name_lower_case_copy), type.getSubtype(i));
|
||||
if (_is_acid) {
|
||||
const orc::Type* sub_type = type.getSubtype(i);
|
||||
if (sub_type->getKind() == orc::TypeKind::STRUCT) {
|
||||
_init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map);
|
||||
_init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map, is_hive1_orc);
|
||||
}
|
||||
}
|
||||
}
|
||||
*is_hive1_orc = hive1_orc;
|
||||
}
|
||||
|
||||
bool OrcReader::_check_acid_schema(const orc::Type& type) {
|
||||
@ -845,7 +853,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) {
|
||||
std::string name;
|
||||
// For hive engine, translate the column name in orc file to schema column name.
|
||||
// This is for Hive 1.x which use internal column name _col0, _col1...
|
||||
if (_is_hive) {
|
||||
if (_is_hive1_orc) {
|
||||
name = _removed_acid_file_col_name_to_schema_col[type.getFieldName(i)];
|
||||
} else {
|
||||
name = _get_field_name_lower_case(&type, i);
|
||||
|
||||
@ -247,7 +247,8 @@ private:
|
||||
Status _init_read_columns();
|
||||
void _init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
|
||||
std::vector<std::string>& orc_cols_lower_case,
|
||||
std::unordered_map<std::string, const orc::Type*>& type_map);
|
||||
std::unordered_map<std::string, const orc::Type*>& type_map,
|
||||
bool* is_hive1_orc);
|
||||
static bool _check_acid_schema(const orc::Type& type);
|
||||
static const orc::Type& _remove_acid(const orc::Type& type);
|
||||
TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
|
||||
@ -483,6 +484,22 @@ private:
|
||||
int64_t get_remaining_rows() { return _remaining_rows; }
|
||||
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
|
||||
|
||||
// check if the given name is like _col0, _col1, ...
|
||||
static bool inline _is_hive1_col_name(const std::string& name) {
|
||||
if (name.size() <= 4) {
|
||||
return false;
|
||||
}
|
||||
if (name.substr(0, 4) != "_col") {
|
||||
return false;
|
||||
}
|
||||
for (size_t i = 4; i < name.size(); ++i) {
|
||||
if (!isdigit(name[i])) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
// This is only for count(*) short circuit read.
|
||||
// save the total number of rows in range
|
||||
@ -509,8 +526,9 @@ private:
|
||||
// This is used for Hive 1.x which use internal column name in Orc file.
|
||||
// _col0, _col1...
|
||||
std::unordered_map<std::string, std::string> _removed_acid_file_col_name_to_schema_col;
|
||||
// Flag for hive engine. True if the external table engine is Hive.
|
||||
bool _is_hive = false;
|
||||
// Flag for hive engine. True if the external table engine is Hive1.x with orc col name
|
||||
// as _col1, col2, ...
|
||||
bool _is_hive1_orc = false;
|
||||
std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
|
||||
std::unordered_map<std::string, const orc::Type*> _type_map;
|
||||
std::vector<const orc::Type*> _col_orc_type;
|
||||
|
||||
@ -275,6 +275,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
return;
|
||||
}
|
||||
TFileFormatType fileFormatType = getFileFormatType();
|
||||
if (fileFormatType == TFileFormatType.FORMAT_ORC) {
|
||||
genSlotToSchemaIdMapForOrc();
|
||||
}
|
||||
params.setFormatType(fileFormatType);
|
||||
boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON;
|
||||
boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL;
|
||||
@ -463,6 +466,25 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
return rangeDesc;
|
||||
}
|
||||
|
||||
// To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
|
||||
// We need to save mapping from slot name to schema position
|
||||
protected void genSlotToSchemaIdMapForOrc() {
|
||||
Preconditions.checkNotNull(params);
|
||||
List<Column> baseSchema = desc.getTable().getBaseSchema();
|
||||
Map<String, Integer> columnNameToPosition = Maps.newHashMap();
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
int idx = 0;
|
||||
for (Column col : baseSchema) {
|
||||
if (col.getName().equals(slot.getColumn().getName())) {
|
||||
columnNameToPosition.put(col.getName(), idx);
|
||||
break;
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
params.setSlotNameToSchemaPos(columnNameToPosition);
|
||||
}
|
||||
|
||||
protected abstract TFileType getLocationType() throws UserException;
|
||||
|
||||
protected abstract TFileType getLocationType(String location) throws UserException;
|
||||
|
||||
@ -18,7 +18,6 @@
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.FunctionCallExpr;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -39,7 +38,6 @@ import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
|
||||
import org.apache.doris.datasource.hive.HivePartition;
|
||||
import org.apache.doris.datasource.hive.HiveTransaction;
|
||||
import org.apache.doris.datasource.hive.HiveVersionUtil;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
|
||||
import org.apache.doris.planner.ListPartitionPrunerV2;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
@ -55,7 +53,6 @@ import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.Setter;
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
@ -117,9 +114,6 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
@Override
|
||||
protected void doInitialize() throws UserException {
|
||||
super.doInitialize();
|
||||
if (HiveVersionUtil.isHive1(hmsTable.getHiveVersion())) {
|
||||
genSlotToSchemaIdMap();
|
||||
}
|
||||
|
||||
if (hmsTable.isHiveTransactionalTable()) {
|
||||
this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
|
||||
@ -396,23 +390,6 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
return fileAttributes;
|
||||
}
|
||||
|
||||
// To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
|
||||
private void genSlotToSchemaIdMap() {
|
||||
List<Column> baseSchema = desc.getTable().getBaseSchema();
|
||||
Map<String, Integer> columnNameToPosition = Maps.newHashMap();
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
int idx = 0;
|
||||
for (Column col : baseSchema) {
|
||||
if (col.getName().equals(slot.getColumn().getName())) {
|
||||
columnNameToPosition.put(col.getName(), idx);
|
||||
break;
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
params.setSlotNameToSchemaPos(columnNameToPosition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user