[opt](hudi) using native reader to read the base file with no log file (#20988)
Two optimizations: 1. Insert string bytes directly to remove decoding&encoding process. 2. Use native reader to read the hudi base file if it has no log file. Use `explain` to show how many splits are read natively.
This commit is contained in:
@ -585,10 +585,18 @@ Status VFileScanner::_get_next_reader() {
|
||||
_current_range_path = range.path;
|
||||
|
||||
// create reader for specific format
|
||||
// TODO: add json, avro
|
||||
Status init_status;
|
||||
// TODO: use data lake type
|
||||
switch (_params.format_type) {
|
||||
TFileFormatType::type format_type = _params.format_type;
|
||||
// JNI reader can only push down column value range
|
||||
bool push_down_predicates = !_is_load && _params.format_type != TFileFormatType::FORMAT_JNI;
|
||||
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params &&
|
||||
range.table_format_params.table_format_type == "hudi") {
|
||||
if (range.table_format_params.hudi_params.delta_logs.empty()) {
|
||||
// fall back to native reader if there is no log file
|
||||
format_type = TFileFormatType::FORMAT_PARQUET;
|
||||
}
|
||||
}
|
||||
switch (format_type) {
|
||||
case TFileFormatType::FORMAT_JNI: {
|
||||
if (_real_tuple_desc->table_desc()->table_type() ==
|
||||
::doris::TTableType::type::MAX_COMPUTE_TABLE) {
|
||||
@ -625,7 +633,7 @@ Status VFileScanner::_get_next_reader() {
|
||||
SCOPED_TIMER(_open_reader_timer);
|
||||
RETURN_IF_ERROR(parquet_reader->open());
|
||||
}
|
||||
if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
|
||||
if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
|
||||
_push_down_conjuncts.resize(_conjuncts.size());
|
||||
for (size_t i = 0; i != _conjuncts.size(); ++i) {
|
||||
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
|
||||
@ -660,7 +668,7 @@ Status VFileScanner::_get_next_reader() {
|
||||
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
|
||||
_profile, _state, _params, range, _state->query_options().batch_size,
|
||||
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
|
||||
if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
|
||||
if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
|
||||
_push_down_conjuncts.resize(_conjuncts.size());
|
||||
for (size_t i = 0; i != _conjuncts.size(); ++i) {
|
||||
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
|
||||
|
||||
@ -37,6 +37,12 @@ under the License.
|
||||
<groupId>org.apache.doris</groupId>
|
||||
<artifactId>java-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>fe-common</artifactId>
|
||||
<groupId>org.apache.doris</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
@ -64,6 +70,10 @@ under the License.
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>hudi-hadoop-mr</artifactId>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
@ -92,7 +102,6 @@ under the License.
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
<!-- conflict with hudi-*-bundle -->
|
||||
</dependencies>
|
||||
<build>
|
||||
<finalName>hudi-scanner</finalName>
|
||||
|
||||
@ -25,10 +25,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
@ -62,6 +65,11 @@ public class HudiColumnValue implements ColumnValue {
|
||||
return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canGetStringAsBytes() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isNull() {
|
||||
return false;
|
||||
@ -143,6 +151,16 @@ public class HudiColumnValue implements ColumnValue {
|
||||
|
||||
@Override
|
||||
public byte[] getBytes() {
|
||||
// Get bytes directly if fieldData is BytesWritable or Text to avoid decoding&encoding
|
||||
if (fieldData instanceof BytesWritable) {
|
||||
return ((BytesWritable) fieldData).getBytes();
|
||||
}
|
||||
if (fieldData instanceof Text) {
|
||||
return ((Text) fieldData).getBytes();
|
||||
}
|
||||
if (fieldData instanceof String) {
|
||||
return ((String) fieldData).getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
return (byte[]) inspectObject();
|
||||
}
|
||||
|
||||
|
||||
@ -50,6 +50,11 @@ public class MockJniScanner extends JniScanner {
|
||||
this.j = j;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canGetStringAsBytes() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isNull() {
|
||||
return false;
|
||||
|
||||
@ -27,6 +27,9 @@ import java.util.List;
|
||||
* Column value in vector column
|
||||
*/
|
||||
public interface ColumnValue {
|
||||
// Get bytes directly when reading string value to avoid decoding&encoding
|
||||
boolean canGetStringAsBytes();
|
||||
|
||||
boolean isNull();
|
||||
|
||||
boolean getBoolean();
|
||||
|
||||
@ -118,6 +118,11 @@ public class ScanPredicate {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canGetStringAsBytes() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return inspectObject().toString();
|
||||
|
||||
@ -597,7 +597,11 @@ public class VectorColumn {
|
||||
case CHAR:
|
||||
case VARCHAR:
|
||||
case STRING:
|
||||
appendStringAndOffset(o.getString());
|
||||
if (o.canGetStringAsBytes()) {
|
||||
appendBytesAndOffset(o.getBytes());
|
||||
} else {
|
||||
appendStringAndOffset(o.getString());
|
||||
}
|
||||
break;
|
||||
case BINARY:
|
||||
appendBytesAndOffset(o.getBytes());
|
||||
|
||||
@ -57,6 +57,11 @@ public class MaxComputeColumnValue implements ColumnValue {
|
||||
this.idx = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canGetStringAsBytes() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isNull() {
|
||||
return column.isNull(idx);
|
||||
|
||||
@ -44,6 +44,11 @@ public class PaimonColumnValue implements ColumnValue {
|
||||
this.record = record;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canGetStringAsBytes() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getBoolean() {
|
||||
return record.getBoolean(idx);
|
||||
|
||||
@ -244,9 +244,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
|
||||
// set hdfs params for hdfs file type.
|
||||
Map<String, String> locationProperties = getLocationProperties();
|
||||
if (fileFormatType == TFileFormatType.FORMAT_JNI) {
|
||||
if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) {
|
||||
params.setProperties(locationProperties);
|
||||
} else if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
|
||||
}
|
||||
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
|
||||
String fsName = getFsName(inputSplit);
|
||||
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
|
||||
tHdfsParams.setFsName(fsName);
|
||||
@ -259,8 +260,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
}
|
||||
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
|
||||
}
|
||||
} else if (locationType == TFileType.FILE_S3) {
|
||||
params.setProperties(locationProperties);
|
||||
}
|
||||
|
||||
List<String> pathPartitionKeys = getPathPartitionKeys();
|
||||
|
||||
@ -30,6 +30,7 @@ import org.apache.doris.planner.external.HiveScanNode;
|
||||
import org.apache.doris.planner.external.TableFormatType;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.THudiFileDesc;
|
||||
@ -68,6 +69,8 @@ public class HudiScanNode extends HiveScanNode {
|
||||
|
||||
private final boolean isCowTable;
|
||||
|
||||
private long noLogsSplitNum = 0;
|
||||
|
||||
/**
|
||||
* External file scan node for Query Hudi table
|
||||
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
|
||||
@ -140,8 +143,11 @@ public class HudiScanNode extends HiveScanNode {
|
||||
public List<Split> getSplits() throws UserException {
|
||||
if (isCowTable) {
|
||||
// skip hidden files start with "."
|
||||
return super.getSplits().stream().filter(split -> !((FileSplit) split).getPath().getName().startsWith("."))
|
||||
List<Split> cowSplits = super.getSplits().stream()
|
||||
.filter(split -> !((FileSplit) split).getPath().getName().startsWith("."))
|
||||
.collect(Collectors.toList());
|
||||
noLogsSplitNum = cowSplits.size();
|
||||
return cowSplits;
|
||||
}
|
||||
|
||||
HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
|
||||
@ -211,6 +217,9 @@ public class HudiScanNode extends HiveScanNode {
|
||||
|
||||
List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath).map(Path::toString)
|
||||
.collect(Collectors.toList());
|
||||
if (logs.isEmpty()) {
|
||||
noLogsSplitNum++;
|
||||
}
|
||||
|
||||
HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, new String[0],
|
||||
partition.getPartitionValues());
|
||||
@ -233,4 +242,10 @@ public class HudiScanNode extends HiveScanNode {
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
return super.getNodeExplainString(prefix, detailLevel)
|
||||
+ String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum, inputSplitsNum);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user