[improvement](iceberg/paimon)support estimate row count (#31204)

Get the number of rows evaluated for iceberg and paimon.
This commit is contained in:
wuwenchi
2024-02-26 11:05:09 +08:00
committed by yiguolei
parent f951ca2efb
commit b1fc0ebbe7
6 changed files with 61 additions and 32 deletions

View File

@ -321,7 +321,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
rowCount = StatisticsUtil.getHiveRowCount(this);
break;
case ICEBERG:
rowCount = StatisticsUtil.getIcebergRowCount(this);
rowCount = IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName());
break;
default:
if (LOG.isDebugEnabled()) {

View File

@ -87,4 +87,10 @@ public class IcebergExternalTable extends ExternalTable {
makeSureInitialized();
return new ExternalAnalysisTask(info);
}
@Override
public long fetchRowCount() {
makeSureInitialized();
return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName());
}
}

View File

@ -45,6 +45,8 @@ import org.apache.doris.thrift.TExprOpcode;
import com.google.common.collect.Lists;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Types;
@ -54,6 +56,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
* Iceberg utils
@ -65,6 +68,10 @@ public class IcebergUtils {
// All time and timestamp values are stored with microsecond precision
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
public static final String TOTAL_RECORDS = "total-records";
public static final String TOTAL_POSITION_DELETES = "total-position-deletes";
public static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes";
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
if (expr == null) {
return null;
@ -314,4 +321,31 @@ public class IcebergUtils {
return tmpSchema;
});
}
/**
* Estimate iceberg table row count.
* Get the row count by adding all task file recordCount.
*
* @return estimated row count
*/
public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, String tbName) {
try {
Table icebergTable = Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
.getIcebergTable(catalog, dbName, tbName);
Snapshot snapshot = icebergTable.currentSnapshot();
if (snapshot == null) {
// empty table
return 0;
}
Map<String, String> summary = snapshot.summary();
return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
} catch (Exception e) {
LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e);
}
return -1;
}
}

View File

@ -87,9 +87,6 @@ import java.util.stream.Collectors;
public class IcebergScanNode extends FileQueryScanNode {
public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
private static final String TOTAL_RECORDS = "total-records";
private static final String TOTAL_POSITION_DELETES = "total-position-deletes";
private static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes";
private IcebergSource source;
private Table icebergTable;
@ -424,8 +421,9 @@ public class IcebergScanNode extends FileQueryScanNode {
}
Map<String, String> summary = snapshot.summary();
if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) {
return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
if (summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) {
return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS))
- Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES));
} else {
return -1;
}

View File

@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
@ -163,4 +164,20 @@ public class PaimonExternalTable extends ExternalTable {
makeSureInitialized();
return new ExternalAnalysisTask(info);
}
@Override
public long fetchRowCount() {
makeSureInitialized();
try {
long rowCount = 0;
List<Split> splits = originTable.newReadBuilder().newScan().plan().splits();
for (Split split : splits) {
rowCount += split.rowCount();
}
return rowCount;
} catch (Exception e) {
LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e);
}
return -1;
}
}

View File

@ -84,7 +84,6 @@ import org.apache.commons.text.StringSubstitutor;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
@ -596,31 +595,6 @@ public class StatisticsUtil {
return parameters.containsKey(TOTAL_SIZE) ? Long.parseLong(parameters.get(TOTAL_SIZE)) : 0;
}
/**
* Estimate iceberg table row count.
* Get the row count by adding all task file recordCount.
*
* @param table Iceberg HMSExternalTable to estimate row count.
* @return estimated row count
*/
public static long getIcebergRowCount(HMSExternalTable table) {
long rowCount = 0;
try {
Table icebergTable = Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
.getIcebergTable(table.getCatalog(), table.getDbName(), table.getName());
TableScan tableScan = icebergTable.newScan().includeColumnStats();
for (FileScanTask task : tableScan.planFiles()) {
rowCount += task.file().recordCount();
}
return rowCount;
} catch (Exception e) {
LOG.warn("Fail to collect row count for db {} table {}", table.getDbName(), table.getName(), e);
}
return -1;
}
/**
* Estimate hive table row count : totalFileSize/estimatedRowSize
*