[improvement](statistics)Improve analyze partition column and key column corner case. (#48757) (#49101)
backport: https://github.com/apache/doris/pull/48757
This commit is contained in:
@ -25,6 +25,10 @@ import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.MaterializedIndexMeta;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.RangePartitionItem;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
@ -33,6 +37,8 @@ import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Range;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
@ -59,6 +65,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
private boolean partitionColumnSampleTooManyRows = false;
|
||||
private boolean scanFullTable = false;
|
||||
private static final long MAXIMUM_SAMPLE_ROWS = 1_000_000_000;
|
||||
public static final long NO_SKIP_TABLET_ID = -1;
|
||||
|
||||
@VisibleForTesting
|
||||
public OlapAnalysisTask() {
|
||||
@ -173,6 +180,8 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
// Sort the partitions to get stable result.
|
||||
List<Partition> sortedPartitions = olapTable.getPartitions().stream().sorted(
|
||||
Comparator.comparing(Partition::getName)).collect(Collectors.toList());
|
||||
long largeTabletId = 0;
|
||||
long largeTabletRows = Long.MAX_VALUE;
|
||||
for (Partition p : sortedPartitions) {
|
||||
MaterializedIndex materializedIndex = info.indexId == -1 ? p.getBaseIndex() : p.getIndex(info.indexId);
|
||||
if (materializedIndex == null) {
|
||||
@ -191,8 +200,20 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
for (int i = 0; i < tabletCounts; i++) {
|
||||
int seekTid = (int) ((i + seek) % ids.size());
|
||||
long tabletId = ids.get(seekTid);
|
||||
sampleTabletIds.add(tabletId);
|
||||
long tabletRows = materializedIndex.getTablet(tabletId).getMinReplicaRowCount(p.getVisibleVersion());
|
||||
if (tabletRows > MAXIMUM_SAMPLE_ROWS) {
|
||||
LOG.debug("Found one large tablet id {} in table {}, rows {}",
|
||||
largeTabletId, tbl.getName(), largeTabletRows);
|
||||
// Skip very large tablet and record the smallest large tablet id and row count.
|
||||
if (tabletRows < largeTabletRows) {
|
||||
LOG.debug("Current smallest large tablet id {} in table {}, rows {}",
|
||||
largeTabletId, tbl.getName(), largeTabletRows);
|
||||
largeTabletId = tabletId;
|
||||
largeTabletRows = tabletRows;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
sampleTabletIds.add(tabletId);
|
||||
if (tabletRows > 0) {
|
||||
selectedRows += tabletRows;
|
||||
// For regular column, will stop adding more tablets when selected tablets'
|
||||
@ -209,6 +230,13 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// If we skipped some large tablets and this cause the sampled rows is not enough, we add the large tablet back.
|
||||
if (!enough && largeTabletId != 0) {
|
||||
sampleTabletIds.add(largeTabletId);
|
||||
selectedRows += largeTabletRows;
|
||||
LOG.info("Add large tablet {} in table {} back, with rows {}",
|
||||
largeTabletId, tbl.getName(), largeTabletRows);
|
||||
}
|
||||
if (selectedRows < targetSampleRows) {
|
||||
scanFullTable = true;
|
||||
} else if (forPartitionColumn && selectedRows > MAXIMUM_SAMPLE_ROWS) {
|
||||
@ -216,7 +244,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
partitionColumnSampleTooManyRows = true;
|
||||
sampleTabletIds.clear();
|
||||
Collections.shuffle(sortedPartitions);
|
||||
selectedRows = pickSamplePartition(sortedPartitions, sampleTabletIds);
|
||||
selectedRows = pickSamplePartition(sortedPartitions, sampleTabletIds, getSkipPartitionId(sortedPartitions));
|
||||
} else if (col.isKey() && selectedRows > MAXIMUM_SAMPLE_ROWS) {
|
||||
// For key column, if a single tablet contains too many rows, need to use limit to control rows to read.
|
||||
// In most cases, a single tablet shouldn't contain more than MAXIMUM_SAMPLE_ROWS, in this case, we
|
||||
@ -322,12 +350,65 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
}
|
||||
|
||||
protected long pickSamplePartition(List<Partition> partitions, List<Long> pickedTabletIds) {
|
||||
long averageRowsPerPartition = tbl.getRowCount() / partitions.size();
|
||||
// For partition tables with single time type partition column, we'd better to skip sampling the partition
|
||||
// that contains all the history data. Because this partition may contain many old data which is not
|
||||
// visited by most queries. To sample this partition may cause the statistics not accurate.
|
||||
// For example, one table has 366 partitions, partition 1 ~ 365 store date for each day of the year from now.
|
||||
// Partition 0 stores all the history data earlier than 1 year. We want to skip sampling partition 0.
|
||||
protected long getSkipPartitionId(List<Partition> partitions) {
|
||||
if (partitions == null || partitions.size() < StatisticsUtil.getPartitionSampleCount()) {
|
||||
return NO_SKIP_TABLET_ID;
|
||||
}
|
||||
PartitionInfo partitionInfo = ((OlapTable) tbl).getPartitionInfo();
|
||||
if (!PartitionType.RANGE.equals(partitionInfo.getType())) {
|
||||
return NO_SKIP_TABLET_ID;
|
||||
}
|
||||
if (partitionInfo.getPartitionColumns().size() != 1) {
|
||||
return NO_SKIP_TABLET_ID;
|
||||
}
|
||||
Column column = partitionInfo.getPartitionColumns().get(0);
|
||||
if (!column.getType().isDateType()) {
|
||||
return NO_SKIP_TABLET_ID;
|
||||
}
|
||||
PartitionKey lowestKey = PartitionKey.createMaxPartitionKey();
|
||||
long lowestPartitionId = -1;
|
||||
for (Partition p : partitions) {
|
||||
RangePartitionItem item = (RangePartitionItem) partitionInfo.getItem(p.getId());
|
||||
Range<PartitionKey> items = item.getItems();
|
||||
if (!items.hasLowerBound()) {
|
||||
lowestPartitionId = p.getId();
|
||||
break;
|
||||
}
|
||||
if (items.lowerEndpoint().compareTo(lowestKey) < 0) {
|
||||
lowestKey = items.lowerEndpoint();
|
||||
lowestPartitionId = p.getId();
|
||||
}
|
||||
}
|
||||
return lowestPartitionId;
|
||||
}
|
||||
|
||||
protected long pickSamplePartition(List<Partition> partitions, List<Long> pickedTabletIds, long skipPartitionId) {
|
||||
Partition partition = ((OlapTable) tbl).getPartition(skipPartitionId);
|
||||
long averageRowsPerPartition;
|
||||
if (partition != null) {
|
||||
LOG.debug("Going to skip partition {} in table {}", skipPartitionId, tbl.getName());
|
||||
// If we want to skip the oldest partition, calculate the average rows per partition value without
|
||||
// the oldest partition, otherwise if the oldest partition is very large, we may skip all partitions.
|
||||
// Because we only pick partitions which meet partitionRowCount >= averageRowsPerPartition.
|
||||
Preconditions.checkNotNull(partitions, "Partition list of table " + tbl.getName() + " is null");
|
||||
Preconditions.checkState(partitions.size() > 1, "Too few partitions in " + tbl.getName());
|
||||
averageRowsPerPartition = (tbl.getRowCount() - partition.getRowCount()) / (partitions.size() - 1);
|
||||
} else {
|
||||
averageRowsPerPartition = tbl.getRowCount() / partitions.size();
|
||||
}
|
||||
long indexId = info.indexId == -1 ? ((OlapTable) tbl).getBaseIndexId() : info.indexId;
|
||||
long pickedRows = 0;
|
||||
int pickedPartitionCount = 0;
|
||||
for (Partition p : partitions) {
|
||||
if (skipPartitionId == p.getId()) {
|
||||
LOG.info("Partition {} in table {} skipped", skipPartitionId, tbl.getName());
|
||||
continue;
|
||||
}
|
||||
long partitionRowCount = p.getRowCount();
|
||||
if (partitionRowCount >= averageRowsPerPartition) {
|
||||
pickedRows += partitionRowCount;
|
||||
|
||||
@ -17,26 +17,34 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
import org.apache.doris.analysis.TableSample;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DataProperty;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.RandomDistributionInfo;
|
||||
import org.apache.doris.catalog.RangePartitionItem;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
|
||||
import org.apache.doris.statistics.AnalysisInfo.JobType;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Range;
|
||||
import mockit.Expectations;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
@ -45,6 +53,7 @@ import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -302,7 +311,7 @@ public class OlapAnalysisTaskTest {
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
long rows = task.pickSamplePartition(partitions, ids);
|
||||
long rows = task.pickSamplePartition(partitions, ids, 0);
|
||||
Assertions.assertEquals(900000000, rows);
|
||||
Assertions.assertEquals(4, ids.size());
|
||||
Assertions.assertEquals(0, ids.get(0));
|
||||
@ -442,4 +451,228 @@ public class OlapAnalysisTaskTest {
|
||||
Assertions.assertEquals("2000000000", params.get("ndvFunction"));
|
||||
Assertions.assertEquals("limit 1000000000", params.get("limit"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSkipPartitionId(@Mocked OlapTable tableIf) throws AnalysisException {
|
||||
// test null partition list
|
||||
OlapAnalysisTask task = new OlapAnalysisTask();
|
||||
long skipPartitionId = task.getSkipPartitionId(null);
|
||||
Assertions.assertEquals(OlapAnalysisTask.NO_SKIP_TABLET_ID, skipPartitionId);
|
||||
|
||||
// test empty partition list
|
||||
List<Partition> partitions = Lists.newArrayList();
|
||||
skipPartitionId = task.getSkipPartitionId(partitions);
|
||||
Assertions.assertEquals(OlapAnalysisTask.NO_SKIP_TABLET_ID, skipPartitionId);
|
||||
|
||||
// test partition list item less than session variable partition_sample_count
|
||||
Partition p1 = new Partition(1, "p1", new MaterializedIndex(), new RandomDistributionInfo());
|
||||
partitions.add(p1);
|
||||
skipPartitionId = task.getSkipPartitionId(partitions);
|
||||
Assertions.assertEquals(OlapAnalysisTask.NO_SKIP_TABLET_ID, skipPartitionId);
|
||||
|
||||
partitions.clear();
|
||||
int partitionSampleCount = StatisticsUtil.getPartitionSampleCount();
|
||||
for (int i = 1; i <= partitionSampleCount; i++) {
|
||||
Partition p = new Partition(i, "p" + i, new MaterializedIndex(), new RandomDistributionInfo());
|
||||
partitions.add(p);
|
||||
}
|
||||
|
||||
// Test List partition return NO_SKIP_TABLET_ID
|
||||
new MockUp<OlapTable>() {
|
||||
@Mock
|
||||
public PartitionInfo getPartitionInfo() {
|
||||
return new PartitionInfo(PartitionType.LIST);
|
||||
}
|
||||
};
|
||||
task.tbl = tableIf;
|
||||
skipPartitionId = task.getSkipPartitionId(partitions);
|
||||
Assertions.assertEquals(OlapAnalysisTask.NO_SKIP_TABLET_ID, skipPartitionId);
|
||||
|
||||
// Test Unpartition return NO_SKIP_TABLET_ID
|
||||
new MockUp<OlapTable>() {
|
||||
@Mock
|
||||
public PartitionInfo getPartitionInfo() {
|
||||
return new PartitionInfo(PartitionType.UNPARTITIONED);
|
||||
}
|
||||
};
|
||||
skipPartitionId = task.getSkipPartitionId(partitions);
|
||||
Assertions.assertEquals(OlapAnalysisTask.NO_SKIP_TABLET_ID, skipPartitionId);
|
||||
|
||||
// Test more than 1 partition column return NO_SKIP_TABLET_ID
|
||||
new MockUp<OlapTable>() {
|
||||
@Mock
|
||||
public PartitionInfo getPartitionInfo() {
|
||||
ArrayList<Column> columns = Lists.newArrayList();
|
||||
columns.add(new Column("col1", PrimitiveType.DATEV2));
|
||||
columns.add(new Column("col2", PrimitiveType.DATEV2));
|
||||
return new PartitionInfo(PartitionType.RANGE, columns);
|
||||
}
|
||||
};
|
||||
skipPartitionId = task.getSkipPartitionId(partitions);
|
||||
Assertions.assertEquals(OlapAnalysisTask.NO_SKIP_TABLET_ID, skipPartitionId);
|
||||
|
||||
// Test not Date type return NO_SKIP_TABLET_ID
|
||||
new MockUp<OlapTable>() {
|
||||
@Mock
|
||||
public PartitionInfo getPartitionInfo() {
|
||||
ArrayList<Column> columns = Lists.newArrayList();
|
||||
columns.add(new Column("col1", PrimitiveType.STRING));
|
||||
return new PartitionInfo(PartitionType.RANGE, columns);
|
||||
}
|
||||
};
|
||||
skipPartitionId = task.getSkipPartitionId(partitions);
|
||||
Assertions.assertEquals(OlapAnalysisTask.NO_SKIP_TABLET_ID, skipPartitionId);
|
||||
|
||||
// Test return the partition id with the oldest date range.
|
||||
ArrayList<Column> columns = Lists.newArrayList();
|
||||
Column col1 = new Column("col1", PrimitiveType.DATEV2);
|
||||
columns.add(col1);
|
||||
PartitionInfo partitionInfo = new PartitionInfo(PartitionType.RANGE, columns);
|
||||
|
||||
List<PartitionValue> lowKey = Lists.newArrayList();
|
||||
lowKey.add(new PartitionValue("2025-01-01"));
|
||||
List<PartitionValue> highKey = Lists.newArrayList();
|
||||
highKey.add(new PartitionValue("2025-01-02"));
|
||||
Range<PartitionKey> range1 = Range.closedOpen(PartitionKey.createPartitionKey(lowKey, columns),
|
||||
PartitionKey.createPartitionKey(highKey, columns));
|
||||
RangePartitionItem item1 = new RangePartitionItem(range1);
|
||||
|
||||
lowKey.clear();
|
||||
lowKey.add(new PartitionValue("2024-11-01"));
|
||||
highKey.clear();
|
||||
highKey.add(new PartitionValue("2024-11-02"));
|
||||
Range<PartitionKey> range2 = Range.closedOpen(PartitionKey.createPartitionKey(lowKey, columns),
|
||||
PartitionKey.createPartitionKey(highKey, columns));
|
||||
RangePartitionItem item2 = new RangePartitionItem(range2);
|
||||
|
||||
lowKey.clear();
|
||||
lowKey.add(new PartitionValue("2025-02-13"));
|
||||
highKey.clear();
|
||||
highKey.add(new PartitionValue("2025-02-14"));
|
||||
Range<PartitionKey> range3 = Range.closedOpen(PartitionKey.createPartitionKey(lowKey, columns),
|
||||
PartitionKey.createPartitionKey(highKey, columns));
|
||||
RangePartitionItem item3 = new RangePartitionItem(range3);
|
||||
|
||||
partitionInfo.addPartition(1, false, item1, new DataProperty(TStorageMedium.HDD), null, false, false);
|
||||
partitionInfo.addPartition(2, false, item2, new DataProperty(TStorageMedium.HDD), null, false, false);
|
||||
partitionInfo.addPartition(3, false, item3, new DataProperty(TStorageMedium.HDD), null, false, false);
|
||||
|
||||
new MockUp<OlapTable>() {
|
||||
@Mock
|
||||
public PartitionInfo getPartitionInfo() {
|
||||
return partitionInfo;
|
||||
}
|
||||
};
|
||||
new MockUp<StatisticsUtil>() {
|
||||
@Mock
|
||||
public int getPartitionSampleCount() {
|
||||
return 3;
|
||||
}
|
||||
};
|
||||
partitions.clear();
|
||||
for (int i = 1; i <= 3; i++) {
|
||||
Partition p = new Partition(i, "p" + i, new MaterializedIndex(), new RandomDistributionInfo());
|
||||
partitions.add(p);
|
||||
}
|
||||
skipPartitionId = task.getSkipPartitionId(partitions);
|
||||
Assertions.assertEquals(2, skipPartitionId);
|
||||
|
||||
// Test less than partition
|
||||
partitions.add(new Partition(4, "p4", new MaterializedIndex(), new RandomDistributionInfo()));
|
||||
partitions.add(new Partition(5, "p5", new MaterializedIndex(), new RandomDistributionInfo()));
|
||||
new MockUp<StatisticsUtil>() {
|
||||
@Mock
|
||||
public int getPartitionSampleCount() {
|
||||
return 5;
|
||||
}
|
||||
};
|
||||
highKey.clear();
|
||||
highKey.add(new PartitionValue("2024-01-01"));
|
||||
Range<PartitionKey> range4 = Range.lessThan(PartitionKey.createPartitionKey(highKey, columns));
|
||||
RangePartitionItem item4 = new RangePartitionItem(range4);
|
||||
partitionInfo.addPartition(4, false, item4, new DataProperty(TStorageMedium.HDD), null, false, false);
|
||||
lowKey.clear();
|
||||
lowKey.add(new PartitionValue("2024-03-13"));
|
||||
highKey.clear();
|
||||
highKey.add(new PartitionValue("2024-03-14"));
|
||||
Range<PartitionKey> range5 = Range.closedOpen(PartitionKey.createPartitionKey(lowKey, columns),
|
||||
PartitionKey.createPartitionKey(highKey, columns));
|
||||
RangePartitionItem item5 = new RangePartitionItem(range5);
|
||||
partitionInfo.addPartition(5, false, item5, new DataProperty(TStorageMedium.HDD), null, false, false);
|
||||
skipPartitionId = task.getSkipPartitionId(partitions);
|
||||
Assertions.assertEquals(4, skipPartitionId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSampleTablets(@Mocked MaterializedIndex index, @Mocked Tablet t) {
|
||||
OlapAnalysisTask task = new OlapAnalysisTask();
|
||||
task.tbl = new OlapTable();
|
||||
task.col = new Column("col1", PrimitiveType.STRING);
|
||||
task.info = new AnalysisInfoBuilder().setIndexId(-1L).build();
|
||||
task.tableSample = new TableSample(false, 4000000L, 0L);
|
||||
List<Partition> partitions = Lists.newArrayList();
|
||||
partitions.add(new Partition(1, "p1", new MaterializedIndex(), new RandomDistributionInfo()));
|
||||
final int[] i = {0};
|
||||
long[] tabletsRowCount = {1100000000, 100000000};
|
||||
List<Long> ret = Lists.newArrayList();
|
||||
ret.add(10001L);
|
||||
ret.add(10002L);
|
||||
new MockUp<OlapAnalysisTask>() {
|
||||
@Mock
|
||||
protected long getSampleRows() {
|
||||
return 4000000;
|
||||
}
|
||||
};
|
||||
new MockUp<OlapTable>() {
|
||||
@Mock
|
||||
boolean isPartitionColumn(String columnName) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public Collection<Partition> getPartitions() {
|
||||
return partitions;
|
||||
}
|
||||
};
|
||||
new MockUp<Partition>() {
|
||||
@Mock
|
||||
public MaterializedIndex getBaseIndex() {
|
||||
return index;
|
||||
}
|
||||
};
|
||||
new MockUp<MaterializedIndex>() {
|
||||
@Mock
|
||||
public List<Long> getTabletIdsInOrder() {
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public long getRowCount() {
|
||||
return 1_200_000_000L;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public Tablet getTablet(long tabletId) {
|
||||
return t;
|
||||
}
|
||||
};
|
||||
new MockUp<Tablet>() {
|
||||
@Mock
|
||||
public long getMinReplicaRowCount(long version) {
|
||||
return tabletsRowCount[i[0]++];
|
||||
}
|
||||
};
|
||||
// Test set large tablet id back if it doesn't pick enough sample rows.
|
||||
Pair<List<Long>, Long> sampleTablets = task.getSampleTablets();
|
||||
Assertions.assertEquals(1, sampleTablets.first.size());
|
||||
Assertions.assertEquals(10001, sampleTablets.first.get(0));
|
||||
Assertions.assertEquals(1100000000L, sampleTablets.second);
|
||||
|
||||
// Test normal pick
|
||||
task.tableSample = new TableSample(false, 4000000L, 1L);
|
||||
sampleTablets = task.getSampleTablets();
|
||||
Assertions.assertEquals(1, sampleTablets.first.size());
|
||||
Assertions.assertEquals(10002, sampleTablets.first.get(0));
|
||||
Assertions.assertEquals(100000000L, sampleTablets.second);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user