diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 0e11267829..d095a959e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -321,7 +321,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI rowCount = StatisticsUtil.getHiveRowCount(this); break; case ICEBERG: - rowCount = StatisticsUtil.getIcebergRowCount(this); + rowCount = IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); break; default: if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index 21f7c1d3d2..dfc78f4494 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -87,4 +87,10 @@ public class IcebergExternalTable extends ExternalTable { makeSureInitialized(); return new ExternalAnalysisTask(info); } + + @Override + public long fetchRowCount() { + makeSureInitialized(); + return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index f66babfe03..1102527fa3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -45,6 +45,8 @@ import org.apache.doris.thrift.TExprOpcode; import com.google.common.collect.Lists; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.types.Types; @@ -54,6 +56,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.Map; /** * Iceberg utils @@ -65,6 +68,10 @@ public class IcebergUtils { // All time and timestamp values are stored with microsecond precision private static final int ICEBERG_DATETIME_SCALE_MS = 6; + public static final String TOTAL_RECORDS = "total-records"; + public static final String TOTAL_POSITION_DELETES = "total-position-deletes"; + public static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes"; + public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { return null; @@ -314,4 +321,31 @@ public class IcebergUtils { return tmpSchema; }); } + + + /** + * Estimate iceberg table row count. + * Get the row count by adding all task file recordCount. + * + * @return estimated row count + */ + public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, String tbName) { + try { + Table icebergTable = Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergMetadataCache() + .getIcebergTable(catalog, dbName, tbName); + Snapshot snapshot = icebergTable.currentSnapshot(); + if (snapshot == null) { + // empty table + return 0; + } + Map summary = snapshot.summary(); + return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); + } catch (Exception e) { + LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e); + } + return -1; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index f8b72208ea..e2564eae52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -87,9 +87,6 @@ import java.util.stream.Collectors; public class IcebergScanNode extends FileQueryScanNode { public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; - private static final String TOTAL_RECORDS = "total-records"; - private static final String TOTAL_POSITION_DELETES = "total-position-deletes"; - private static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes"; private IcebergSource source; private Table icebergTable; @@ -424,8 +421,9 @@ public class IcebergScanNode extends FileQueryScanNode { } Map summary = snapshot.summary(); - if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) { - return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); + if (summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) { + return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)) + - Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES)); } else { return -1; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index f921fcd681..41440c3f4c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.Split; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DecimalType; @@ -163,4 +164,20 @@ public class PaimonExternalTable extends ExternalTable { makeSureInitialized(); return new ExternalAnalysisTask(info); } + + @Override + public long fetchRowCount() { + makeSureInitialized(); + try { + long rowCount = 0; + List splits = originTable.newReadBuilder().newScan().plan().splits(); + for (Split split : splits) { + rowCount += split.rowCount(); + } + return rowCount; + } catch (Exception e) { + LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e); + } + return -1; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 8688447dcb..8ee08d57e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -84,7 +84,6 @@ import org.apache.commons.text.StringSubstitutor; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; @@ -596,31 +595,6 @@ public class StatisticsUtil { return parameters.containsKey(TOTAL_SIZE) ? Long.parseLong(parameters.get(TOTAL_SIZE)) : 0; } - /** - * Estimate iceberg table row count. - * Get the row count by adding all task file recordCount. - * - * @param table Iceberg HMSExternalTable to estimate row count. - * @return estimated row count - */ - public static long getIcebergRowCount(HMSExternalTable table) { - long rowCount = 0; - try { - Table icebergTable = Env.getCurrentEnv() - .getExtMetaCacheMgr() - .getIcebergMetadataCache() - .getIcebergTable(table.getCatalog(), table.getDbName(), table.getName()); - TableScan tableScan = icebergTable.newScan().includeColumnStats(); - for (FileScanTask task : tableScan.planFiles()) { - rowCount += task.file().recordCount(); - } - return rowCount; - } catch (Exception e) { - LOG.warn("Fail to collect row count for db {} table {}", table.getDbName(), table.getName(), e); - } - return -1; - } - /** * Estimate hive table row count : totalFileSize/estimatedRowSize *