[improvement](statistics)External table getRowCount return -1 when row count is not available or row count is 0. (#43009) (#43771)
backport: https://github.com/apache/doris/pull/43009
This commit is contained in:
@ -131,8 +131,6 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
WAITING_STABLE
|
||||
}
|
||||
|
||||
public static long ROW_COUNT_BEFORE_REPORT = -1;
|
||||
|
||||
@SerializedName(value = "state")
|
||||
private volatile OlapTableState state;
|
||||
|
||||
@ -1519,12 +1517,12 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
if (index == null) {
|
||||
LOG.warn("Index {} not exist in partition {}, table {}, {}",
|
||||
indexId, entry.getValue().getName(), id, name);
|
||||
return ROW_COUNT_BEFORE_REPORT;
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
if (strict && !index.getRowCountReported()) {
|
||||
return ROW_COUNT_BEFORE_REPORT;
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
rowCount += index.getRowCount() == -1 ? 0 : index.getRowCount();
|
||||
rowCount += index.getRowCount() == UNKNOWN_ROW_COUNT ? 0 : index.getRowCount();
|
||||
}
|
||||
return rowCount;
|
||||
}
|
||||
|
||||
@ -586,7 +586,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
|
||||
|
||||
@Override
|
||||
public long fetchRowCount() {
|
||||
return 0;
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -56,6 +56,8 @@ import java.util.stream.Collectors;
|
||||
public interface TableIf {
|
||||
Logger LOG = LogManager.getLogger(TableIf.class);
|
||||
|
||||
long UNKNOWN_ROW_COUNT = -1;
|
||||
|
||||
default void readLock() {
|
||||
}
|
||||
|
||||
|
||||
@ -94,7 +94,7 @@ public class ExternalRowCountCache {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get cached row count for the given table. Return 0 if cached not loaded or table not exists.
|
||||
* Get cached row count for the given table. Return -1 if cached not loaded or table not exists.
|
||||
* Cached will be loaded async.
|
||||
* @param catalogId
|
||||
* @param dbId
|
||||
@ -106,13 +106,13 @@ public class ExternalRowCountCache {
|
||||
try {
|
||||
CompletableFuture<Optional<Long>> f = rowCountCache.get(key);
|
||||
if (f.isDone()) {
|
||||
return f.get().orElse(0L);
|
||||
return f.get().orElse(TableIf.UNKNOWN_ROW_COUNT);
|
||||
}
|
||||
LOG.info("Row count for table {}.{}.{} is still processing.", catalogId, dbId, tableId);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unexpected exception while returning row count", e);
|
||||
}
|
||||
return 0;
|
||||
return TableIf.UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -200,7 +200,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
makeSureInitialized();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e);
|
||||
return 0;
|
||||
return TableIf.UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
// All external table should get external row count from cache.
|
||||
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
|
||||
@ -226,7 +226,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
* This is called by ExternalRowCountCache to load row count cache.
|
||||
*/
|
||||
public long fetchRowCount() {
|
||||
return 0;
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -344,7 +344,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
}
|
||||
|
||||
private long getRowCountFromExternalSource() {
|
||||
long rowCount;
|
||||
long rowCount = UNKNOWN_ROW_COUNT;
|
||||
switch (dlaType) {
|
||||
case HIVE:
|
||||
rowCount = StatisticsUtil.getHiveRowCount(this);
|
||||
@ -358,7 +358,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
}
|
||||
rowCount = -1;
|
||||
}
|
||||
return rowCount;
|
||||
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -532,7 +532,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
// Get row count from hive metastore property.
|
||||
long rowCount = getRowCountFromExternalSource();
|
||||
// Only hive table supports estimate row count by listing file.
|
||||
if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) {
|
||||
if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) {
|
||||
LOG.info("Will estimate row count for table {} from file list.", name);
|
||||
rowCount = getRowCountFromFileList();
|
||||
}
|
||||
@ -838,11 +838,11 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
*/
|
||||
private long getRowCountFromFileList() {
|
||||
if (!GlobalVariable.enable_get_row_count_from_file_list) {
|
||||
return -1;
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
if (isView()) {
|
||||
LOG.info("Table {} is view, return 0.", name);
|
||||
return 0;
|
||||
LOG.info("Table {} is view, return -1.", name);
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
HiveMetaStoreCache.HivePartitionValues partitionValues = getAllPartitionValues();
|
||||
|
||||
@ -869,8 +869,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
estimatedRowSize += column.getDataType().getSlotSize();
|
||||
}
|
||||
if (estimatedRowSize == 0) {
|
||||
LOG.warn("Table {} estimated size is 0, return 0.", name);
|
||||
return 0;
|
||||
LOG.warn("Table {} estimated size is 0, return -1.", name);
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
|
||||
int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size();
|
||||
@ -882,7 +882,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
long rows = totalSize / estimatedRowSize;
|
||||
LOG.info("Table {} rows {}, total size is {}, estimatedRowSize is {}",
|
||||
name, rows, totalSize, estimatedRowSize);
|
||||
return rows;
|
||||
return rows > 0 ? rows : UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
|
||||
// Get all partition values from cache.
|
||||
|
||||
@ -83,7 +83,8 @@ public class IcebergExternalTable extends ExternalTable {
|
||||
@Override
|
||||
public long fetchRowCount() {
|
||||
makeSureInitialized();
|
||||
return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName());
|
||||
long rowCount = IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName());
|
||||
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
|
||||
public Table getIcebergTable() {
|
||||
|
||||
@ -41,6 +41,7 @@ import org.apache.doris.catalog.MapType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.StructField;
|
||||
import org.apache.doris.catalog.StructType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.info.SimpleTableInfo;
|
||||
@ -604,7 +605,7 @@ public class IcebergUtils {
|
||||
if (snapshot == null) {
|
||||
LOG.info("Iceberg table {}.{}.{} is empty, return row count 0.", catalog.getName(), dbName, tbName);
|
||||
// empty table
|
||||
return 0;
|
||||
return TableIf.UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
Map<String, String> summary = snapshot.summary();
|
||||
long rows = Long.parseLong(summary.get(TOTAL_RECORDS))
|
||||
@ -614,7 +615,7 @@ public class IcebergUtils {
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e);
|
||||
}
|
||||
return -1;
|
||||
return TableIf.UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -194,16 +194,16 @@ public class PaimonExternalTable extends ExternalTable {
|
||||
Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable())
|
||||
.orElse(null);
|
||||
if (paimonTable == null) {
|
||||
return -1;
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
List<Split> splits = paimonTable.newReadBuilder().newScan().plan().splits();
|
||||
for (Split split : splits) {
|
||||
rowCount += split.rowCount();
|
||||
}
|
||||
return rowCount;
|
||||
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e);
|
||||
}
|
||||
return -1;
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,7 +181,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
|
||||
if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) {
|
||||
OlapTable ot = (OlapTable) table;
|
||||
if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == OlapTable.ROW_COUNT_BEFORE_REPORT) {
|
||||
if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) {
|
||||
LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName());
|
||||
return;
|
||||
}
|
||||
|
||||
@ -560,19 +560,19 @@ public class StatisticsUtil {
|
||||
public static long getHiveRowCount(HMSExternalTable table) {
|
||||
Map<String, String> parameters = table.getRemoteTable().getParameters();
|
||||
if (parameters == null) {
|
||||
return -1;
|
||||
return TableIf.UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
// Table parameters contains row count, simply get and return it.
|
||||
if (parameters.containsKey(NUM_ROWS)) {
|
||||
long rows = Long.parseLong(parameters.get(NUM_ROWS));
|
||||
// Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need to check TOTAL_SIZE if NUM_ROWS is 0.
|
||||
if (rows != 0) {
|
||||
if (rows > 0) {
|
||||
LOG.info("Get row count {} for hive table {} in table parameters.", rows, table.getName());
|
||||
return rows;
|
||||
}
|
||||
}
|
||||
if (!parameters.containsKey(TOTAL_SIZE)) {
|
||||
return -1;
|
||||
return TableIf.UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
// Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize
|
||||
long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE));
|
||||
@ -582,7 +582,7 @@ public class StatisticsUtil {
|
||||
}
|
||||
if (estimatedRowSize == 0) {
|
||||
LOG.warn("Hive table {} estimated row size is invalid {}", table.getName(), estimatedRowSize);
|
||||
return -1;
|
||||
return TableIf.UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
long rows = totalSize / estimatedRowSize;
|
||||
LOG.info("Get row count {} for hive table {} by total size {} and row size {}",
|
||||
|
||||
@ -0,0 +1,102 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource;
|
||||
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class ExternalRowCountCacheTest {
|
||||
@Test
|
||||
public void testLoadWithException() throws Exception {
|
||||
ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool(
|
||||
1, Integer.MAX_VALUE, "TEST", true);
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
|
||||
@Mock
|
||||
protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) {
|
||||
counter.incrementAndGet();
|
||||
return null;
|
||||
}
|
||||
};
|
||||
ExternalRowCountCache cache = new ExternalRowCountCache(executor);
|
||||
long cachedRowCount = cache.getCachedRowCount(1, 1, 1);
|
||||
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
|
||||
for (int i = 0; i < 60; i++) {
|
||||
if (counter.get() == 1) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assertions.assertEquals(1, counter.get());
|
||||
|
||||
new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
|
||||
@Mock
|
||||
protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) {
|
||||
counter.incrementAndGet();
|
||||
return Optional.of(100L);
|
||||
}
|
||||
};
|
||||
cache.getCachedRowCount(1, 1, 1);
|
||||
for (int i = 0; i < 60; i++) {
|
||||
cachedRowCount = cache.getCachedRowCount(1, 1, 1);
|
||||
if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) {
|
||||
Assertions.assertEquals(100, cachedRowCount);
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
cachedRowCount = cache.getCachedRowCount(1, 1, 1);
|
||||
Assertions.assertEquals(100, cachedRowCount);
|
||||
Assertions.assertEquals(2, counter.get());
|
||||
|
||||
new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
|
||||
@Mock
|
||||
protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) {
|
||||
counter.incrementAndGet();
|
||||
try {
|
||||
Thread.sleep(1000000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return Optional.of(100L);
|
||||
}
|
||||
};
|
||||
cachedRowCount = cache.getCachedRowCount(2, 2, 2);
|
||||
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
|
||||
Thread.sleep(1000);
|
||||
cachedRowCount = cache.getCachedRowCount(2, 2, 2);
|
||||
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
|
||||
for (int i = 0; i < 60; i++) {
|
||||
if (counter.get() == 3) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assertions.assertEquals(3, counter.get());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user