[optimize](multi-catalog) use dictionary encode&filter to process delete files (#15441)

**Optimize**
PR #14470 has used `Expr` to filter delete rows to match current data file,
but the rows in the delete file are [sorted by file_path then position](https://iceberg.apache.org/spec/#position-delete-files)
to optimize filtering rows while scanning, so this PR remove `Expr` and use binary search to filter delete rows.

In addition, delete files are likely to be encoded in dictionary, it's time-consuming to decode `file_path`
columns into `ColumnString`, so this PR use `ColumnDictionary` to read `file_path` column.

After testing, the performance of iceberg v2's MOR is improved by 30%+.

**Fix Bug**
Lazy-read-block may not have the filter column, if the whole group is filtered by `Expr`
and the batch_eof is generated from next batch.
This commit is contained in:
Ashin Gau
2022-12-30 08:57:55 +08:00
committed by GitHub
parent 85c7c531f1
commit 2c8de30cce
14 changed files with 274 additions and 141 deletions

View File

@ -18,20 +18,10 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BaseTableRef;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
@ -43,12 +33,8 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
import org.apache.doris.thrift.TIcebergTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableFormatFileDesc;
import org.apache.doris.thrift.TTableType;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplit;
@ -81,10 +67,6 @@ import java.util.OptionalLong;
public class IcebergScanProvider extends HiveScanProvider {
private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
public static final String V2_DELETE_TBL = "iceberg#delete#tbl";
public static final String V2_DELETE_DB = "iceberg#delete#db";
private static final DeleteFileTempTable scanDeleteTable =
new DeleteFileTempTable(TableIf.TableType.HMS_EXTERNAL_TABLE);
private final Analyzer analyzer;
public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer, TupleDescriptor desc,
@ -103,7 +85,6 @@ public class IcebergScanProvider extends HiveScanProvider {
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
} else {
setPathSelectConjunct(fileDesc, icebergSplit);
for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) {
TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
deleteFileDesc.setPath(filter.getDeleteFilePath());
@ -132,19 +113,6 @@ public class IcebergScanProvider extends HiveScanProvider {
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
private static void setPathSelectConjunct(TIcebergFileDesc fileDesc, IcebergSplit icebergSplit)
throws UserException {
BaseTableRef tableRef = icebergSplit.getDeleteTableRef();
fileDesc.setDeleteTableTupleId(tableRef.getDesc().getId().asInt());
SlotRef lhs = new SlotRef(tableRef.getName(), DeleteFileTempTable.DATA_FILE_PATH);
lhs.analyze(icebergSplit.getAnalyzer());
lhs.getDesc().setIsMaterialized(true);
StringLiteral rhs = new StringLiteral(icebergSplit.getPath().toUri().toString());
BinaryPredicate pathSelectConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs);
pathSelectConjunct.analyze(icebergSplit.getAnalyzer());
fileDesc.setFileSelectConjunct(pathSelectConjunct.treeToThrift());
}
@Override
public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
TFileFormatType type;
@ -192,14 +160,6 @@ public class IcebergScanProvider extends HiveScanProvider {
}
List<InputSplit> splits = new ArrayList<>();
int formatVersion = ((BaseTable) table).operations().current().formatVersion();
BaseTableRef tableRef = null;
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
TableName fullName = analyzer.getFqTableName(scanDeleteTable.getTableName());
fullName.analyze(analyzer);
TableRef ref = new TableRef(fullName, fullName.toString(), null);
tableRef = new BaseTableRef(ref, scanDeleteTable, scanDeleteTable.getTableName());
tableRef.analyze(analyzer);
}
for (FileScanTask task : scan.planFiles()) {
for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) {
String dataFilePath = spitTask.file().path().toString();
@ -208,7 +168,6 @@ public class IcebergScanProvider extends HiveScanProvider {
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(spitTask));
split.setDeleteTableRef(tableRef);
}
split.setTableFormatType(TableFormatType.ICEBERG);
split.setAnalyzer(analyzer);
@ -279,32 +238,4 @@ public class IcebergScanProvider extends HiveScanProvider {
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
return Collections.emptyList();
}
@Data
static class DeleteFileTempTable extends ExternalTable {
public static final String DATA_FILE_PATH = "file_path";
private final TableName tableName;
private final List<Column> fullSchema = new ArrayList<>();
public DeleteFileTempTable(TableType type) {
super(0, V2_DELETE_TBL, null, V2_DELETE_DB, type);
this.tableName = new TableName(null, V2_DELETE_DB, V2_DELETE_TBL);
Column dataFilePathCol = new Column(DATA_FILE_PATH, PrimitiveType.STRING, true);
this.fullSchema.add(dataFilePathCol);
}
@Override
public List<Column> getFullSchema() {
return fullSchema;
}
@Override
public TTableDescriptor toThrift() {
TIcebergTable tIcebergTable = new TIcebergTable(V2_DELETE_DB, V2_DELETE_TBL, new HashMap<>());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE,
fullSchema.size(), 0, getName(), "");
tTableDescriptor.setIcebergTable(tIcebergTable);
return tTableDescriptor;
}
}
}

View File

@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BaseTableRef;
import lombok.Data;
import org.apache.hadoop.fs.Path;
@ -34,7 +33,6 @@ public class IcebergSplit extends HiveSplit {
private Analyzer analyzer;
private String dataFilePath;
private Integer formatVersion;
private BaseTableRef deleteTableRef;
private List<IcebergDeleteFileFilter> deleteFileFilters;
}