[Feature] Support SHOW DATA SKEW stmt (#6219)

SHOW DATA SKEW FROM tbl PARTITION(p1)

to view the data distribution of a specified partition

```
mysql> admin show data skew from tbl1 partition(tbl1);
+-----------+-------------+-------+---------+
| BucketIdx | AvgDataSize | Graph | Percent |
+-----------+-------------+-------+---------+
| 0         | 0           |       | 100.00% |
+-----------+-------------+-------+---------+
1 row in set (0.01 sec)
```

Also modify the result of `admin show replica distribution`, add replica size distribution

```
mysql> admin show replica distribution from tbl1 partition(tbl1);
+-----------+------------+-------------+----------+------------+-----------+-------------+
| BackendId | ReplicaNum | ReplicaSize | NumGraph | NumPercent | SizeGraph | SizePercent |
+-----------+------------+-------------+----------+------------+-----------+-------------+
| 10002     | 1          | 0           | >        | 100.00%    |           | 100.00%     |
+-----------+------------+-------------+----------+------------+-----------+-------------+
```
This commit is contained in:
Mingyu Chen
2021-08-05 14:05:41 +08:00
committed by GitHub
parent a16ad3fccd
commit 2823e4daba
13 changed files with 423 additions and 33 deletions

View File

@ -262,6 +262,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A
KW_REPAIR, KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLACE_IF_NOT_NULL, KW_REPLICA, KW_RESOURCE, KW_RESOURCES, KW_RESTORE, KW_RETURNS, KW_RESUME, KW_REVOKE,
KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROUTINE, KW_ROW, KW_ROWS,
KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SET_VAR, KW_SHOW, KW_SIGNED,
KW_SKEW,
KW_SMALLINT, KW_SNAPSHOT, KW_SONAME, KW_SPLIT, KW_START, KW_STATUS, KW_STOP, KW_STORAGE, KW_STREAM, KW_STRING, KW_STRUCT,
KW_SUM, KW_SUPERUSER, KW_SYNC, KW_SYSTEM,
KW_TABLE, KW_TABLES, KW_TABLET, KW_TASK, KW_TEMPORARY, KW_TERMINATED, KW_THAN, KW_TIME, KW_THEN, KW_TIMESTAMP, KW_TINYINT,
@ -4823,6 +4824,10 @@ admin_stmt ::=
{:
RESULT = new AdminCheckTabletsStmt(tabletIds, properties);
:}
| KW_ADMIN KW_SHOW KW_DATA KW_SKEW KW_FROM base_table_ref:table_ref
{:
RESULT = new AdminShowDataSkewStmt(table_ref);
:}
;
truncate_stmt ::=
@ -5132,6 +5137,8 @@ keyword ::=
{: RESULT = id; :}
| KW_SESSION:id
{: RESULT = id; :}
| KW_SKEW:id
{: RESULT = id; :}
| KW_SNAPSHOT:id
{: RESULT = id; :}
| KW_SONAME:id

View File

@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.collect.ImmutableList;
// admin show data skew from tbl [partition(p1, p2, ...)]
public class AdminShowDataSkewStmt extends ShowStmt {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("BucketIdx").add("AvgDataSize")
.add("Graph").add("Percent")
.build();
private TableRef tblRef;
public AdminShowDataSkewStmt(TableRef tblRef) {
this.tblRef = tblRef;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// check auth
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
tblRef.getName().analyze(analyzer);
PartitionNames partitionNames = tblRef.getPartitionNames();
if (partitionNames == null || partitionNames.getPartitionNames().size() != 1) {
throw new AnalysisException("Should specify one and only one partition");
}
}
public String getDbName() {
return tblRef.getName().getDb();
}
public String getTblName() {
return tblRef.getName().getTbl();
}
public PartitionNames getPartitionNames() {
return tblRef.getPartitionNames();
}
@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : TITLE_NAMES) {
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();
}
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}

View File

@ -35,7 +35,10 @@ import com.google.common.collect.ImmutableList;
// admin show replica distribution from tbl [partition(p1, p2, ...)]
public class AdminShowReplicaDistributionStmt extends ShowStmt {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("BackendId").add("ReplicaNum").add("Graph").add("Percent").build();
.add("BackendId").add("ReplicaNum").add("ReplicaSize")
.add("NumGraph").add("NumPercent")
.add("SizeGraph").add("SizePercent")
.build();
private TableRef tblRef;
@ -88,10 +91,6 @@ public class AdminShowReplicaDistributionStmt extends ShowStmt {
@Override
public RedirectStatus getRedirectStatus() {
if (ConnectContext.get().getSessionVariable().getForwardToMaster()) {
return RedirectStatus.FORWARD_NO_SYNC;
} else {
return RedirectStatus.NO_FORWARD;
}
return RedirectStatus.FORWARD_NO_SYNC;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.AdminShowDataSkewStmt;
import org.apache.doris.analysis.AdminShowReplicaDistributionStmt;
import org.apache.doris.analysis.AdminShowReplicaStatusStmt;
import org.apache.doris.analysis.BinaryPredicate.Operator;
@ -169,7 +170,7 @@ public class MetadataViewer {
private static List<List<String>> getTabletDistribution(String dbName, String tblName, PartitionNames partitionNames)
throws DdlException {
DecimalFormat df = new DecimalFormat("##.00 %");
DecimalFormat df = new DecimalFormat("00.00 %");
List<List<String>> result = Lists.newArrayList();
@ -188,9 +189,7 @@ public class MetadataViewer {
tbl.readLock();
try {
OlapTable olapTable = (OlapTable) tbl;
List<Long> partitionIds = Lists.newArrayList();
if (partitionNames == null) {
for (Partition partition : olapTable.getPartitions()) {
@ -206,16 +205,20 @@ public class MetadataViewer {
partitionIds.add(partition.getId());
}
}
// backend id -> replica count
Map<Long, Integer> countMap = Maps.newHashMap();
// backend id -> replica size
Map<Long, Long> sizeMap = Maps.newHashMap();
// init map
List<Long> beIds = infoService.getBackendIds(false);
for (long beId : beIds) {
countMap.put(beId, 0);
sizeMap.put(beId, 0L);
}
int totalReplicaNum = 0;
long totalReplicaSize = 0;
for (long partId : partitionIds) {
Partition partition = olapTable.getPartition(partId);
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
@ -225,7 +228,9 @@ public class MetadataViewer {
continue;
}
countMap.put(replica.getBackendId(), countMap.get(replica.getBackendId()) + 1);
sizeMap.put(replica.getBackendId(), sizeMap.get(replica.getBackendId()) + replica.getDataSize());
totalReplicaNum++;
totalReplicaSize += replica.getDataSize();
}
}
}
@ -237,8 +242,11 @@ public class MetadataViewer {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(beId));
row.add(String.valueOf(countMap.get(beId)));
row.add(String.valueOf(sizeMap.get(beId)));
row.add(graph(countMap.get(beId), totalReplicaNum, beIds.size()));
row.add(df.format((double) countMap.get(beId) / totalReplicaNum));
row.add(totalReplicaNum == countMap.get(beId) ? "100.00%" : df.format((double) countMap.get(beId) / totalReplicaNum));
row.add(graph(sizeMap.get(beId), totalReplicaSize, beIds.size()));
row.add(totalReplicaSize == sizeMap.get(beId) ? "100.00%" : df.format((double) sizeMap.get(beId) / totalReplicaSize));
result.add(row);
}
@ -249,12 +257,96 @@ public class MetadataViewer {
return result;
}
private static String graph(int num, int totalNum, int mod) {
private static String graph(long num, long totalNum, int mod) {
StringBuilder sb = new StringBuilder();
int normalized = (int) Math.ceil(num * mod / totalNum);
long normalized = num == totalNum ? totalNum : (int) Math.ceil(num * mod / totalNum);
for (int i = 0; i < normalized; ++i) {
sb.append(">");
}
return sb.toString();
}
public static List<List<String>> getDataSkew(AdminShowDataSkewStmt stmt) throws DdlException {
return getDataSkew(stmt.getDbName(), stmt.getTblName(), stmt.getPartitionNames());
}
private static List<List<String>> getDataSkew(String dbName, String tblName, PartitionNames partitionNames)
throws DdlException {
DecimalFormat df = new DecimalFormat("00.00 %");
List<List<String>> result = Lists.newArrayList();
Catalog catalog = Catalog.getCurrentCatalog();
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
if (partitionNames == null || partitionNames.getPartitionNames().size() != 1) {
throw new DdlException("Should specify one and only one partitions");
}
Database db = catalog.getDb(dbName);
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist");
}
Table tbl = db.getTable(tblName);
if (tbl == null || tbl.getType() != TableType.OLAP) {
throw new DdlException("Table does not exist or is not OLAP table: " + tblName);
}
tbl.readLock();
try {
OlapTable olapTable = (OlapTable) tbl;
long partitionId = -1;
// check partition
for (String partName : partitionNames.getPartitionNames()) {
Partition partition = olapTable.getPartition(partName, partitionNames.isTemp());
if (partition == null) {
throw new DdlException("Partition does not exist: " + partName);
}
partitionId = partition.getId();
break;
}
// backend id -> replica count
Map<Long, Integer> countMap = Maps.newHashMap();
// backend id -> replica size
Map<Long, Long> sizeMap = Maps.newHashMap();
// init map
List<Long> beIds = infoService.getBackendIds(false);
for (long beId : beIds) {
countMap.put(beId, 0);
}
Partition partition = olapTable.getPartition(partitionId);
DistributionInfo distributionInfo = partition.getDistributionInfo();
List<Long> tabletInfos = Lists.newArrayListWithCapacity(distributionInfo.getBucketNum());
for (long i = 0; i < distributionInfo.getBucketNum(); i++) {
tabletInfos.add(0L);
}
long totalSize = 0;
for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
List<Long> tabletIds = mIndex.getTabletIdsInOrder();
for (int i = 0; i < tabletIds.size(); i++) {
Tablet tablet = mIndex.getTablet(tabletIds.get(i));
long dataSize = tablet.getDataSize(false);
tabletInfos.set(i, tabletInfos.get(i) + dataSize);
totalSize += dataSize;
}
}
// graph
for (int i = 0; i < tabletInfos.size(); i++) {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(i));
row.add(tabletInfos.get(i).toString());
row.add(graph(tabletInfos.get(i), totalSize, tabletInfos.size()));
row.add(totalSize == tabletInfos.get(i) ? "100.00%" : df.format((double) tabletInfos.get(i) / totalSize));
result.add(row);
}
} finally {
tbl.readUnlock();
}
return result;
}
}

View File

@ -43,13 +43,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
/**
* This class represents the olap tablet related metadata.
*/
public class Tablet extends MetaObject implements Writable {
private static final Logger LOG = LogManager.getLogger(Tablet.class);
public enum TabletStatus {
HEALTHY,
REPLICA_MISSING, // not enough alive replica num.
@ -376,25 +377,15 @@ public class Tablet extends MetaObject implements Writable {
}
public long getDataSize(boolean singleReplica) {
long dataSize = 0;
int count = 0;
for (Replica replica : getReplicas()) {
if (replica.getState() == ReplicaState.NORMAL
|| replica.getState() == ReplicaState.SCHEMA_CHANGE) {
dataSize += replica.getDataSize();
count++;
}
}
if (count == 0) {
return 0;
}
LongStream s = replicas.stream().filter(r -> r.getState() == ReplicaState.NORMAL)
.mapToLong(Replica::getDataSize);
return singleReplica ? Double.valueOf(s.average().getAsDouble()).longValue() : s.sum();
}
if (singleReplica) {
// get the avg replica size
dataSize /= count;
}
return dataSize;
public long getRowNum(boolean singleReplica) {
LongStream s = replicas.stream().filter(r -> r.getState() == ReplicaState.NORMAL)
.mapToLong(Replica::getRowCount);
return singleReplica ? Double.valueOf(s.average().getAsDouble()).longValue() : s.sum();
}
/**

View File

@ -18,6 +18,7 @@
package org.apache.doris.qe;
import org.apache.doris.analysis.AdminShowConfigStmt;
import org.apache.doris.analysis.AdminShowDataSkewStmt;
import org.apache.doris.analysis.AdminShowReplicaDistributionStmt;
import org.apache.doris.analysis.AdminShowReplicaStatusStmt;
import org.apache.doris.analysis.DescribeStmt;
@ -299,6 +300,8 @@ public class ShowExecutor {
handleShowQueryProfile();
} else if (stmt instanceof ShowLoadProfileStmt) {
handleShowLoadProfile();
} else if (stmt instanceof AdminShowDataSkewStmt) {
handleAdminShowDataSkew();
} else {
handleEmtpy();
}
@ -2026,6 +2029,16 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showCreateRoutineLoadStmt.getMetaData(), rows);
}
private void handleAdminShowDataSkew() throws AnalysisException {
AdminShowDataSkewStmt showStmt = (AdminShowDataSkewStmt) stmt;
List<List<String>> results;
try {
results = MetadataViewer.getDataSkew(showStmt);
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
resultSet = new ShowResultSet(showStmt.getMetaData(), results);
}
}

View File

@ -341,6 +341,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("sets", new Integer(SqlParserSymbols.KW_SETS));
keywordMap.put("show", new Integer(SqlParserSymbols.KW_SHOW));
keywordMap.put("signed", new Integer(SqlParserSymbols.KW_SIGNED));
keywordMap.put("skew", new Integer(SqlParserSymbols.KW_SKEW));
keywordMap.put("smallint", new Integer(SqlParserSymbols.KW_SMALLINT));
keywordMap.put("snapshot", new Integer(SqlParserSymbols.KW_SNAPSHOT));
keywordMap.put("soname", new Integer(SqlParserSymbols.KW_SONAME));

View File

@ -17,16 +17,114 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowExecutor;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.utframe.UtFrameUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.StringReader;
import java.lang.reflect.Method;
import java.util.UUID;
public class AdminShowReplicaTest {
// use a unique dir so that it won't be conflict with other unit test which
// may also start a Mocked Frontend
private static String runningDir = "fe/mocked/AdminShowReplicaTest/" + UUID.randomUUID().toString() + "/";
private static ConnectContext connectContext;
@BeforeClass
public static void beforeClass() throws Exception {
UtFrameUtils.createMinDorisCluster(runningDir);
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
// create database
String createDbStmtStr = "create database test;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
Catalog.getCurrentCatalog().createDb(createDbStmt);
createTable("create table test.tbl1\n" +
"(k1 date, k2 int)\n" +
"partition by range(k1)\n" +
"(\n" +
" partition p1 values less than(\"2021-07-01\"),\n" +
" partition p2 values less than(\"2021-08-01\")\n" +
")\n" +
"distributed by hash(k2) buckets 10\n" +
"properties(\"replication_num\" = \"1\");");
}
@AfterClass
public static void tearDown() {
File file = new File(runningDir);
file.delete();
}
private static void createTable(String sql) throws Exception {
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Catalog.getCurrentCatalog().createTable(createTableStmt);
}
@Test
public void testShowReplicaDistribution() throws Exception {
String stmtStr = "admin show replica distribution from test.tbl1 partition(p1)";
AdminShowReplicaDistributionStmt stmt = (AdminShowReplicaDistributionStmt) UtFrameUtils.parseAndAnalyzeStmt(
stmtStr, connectContext);
ShowExecutor executor = new ShowExecutor(connectContext, stmt);
ShowResultSet resultSet = executor.execute();
Assert.assertEquals(1, resultSet.getResultRows().size());
Assert.assertEquals(7, resultSet.getResultRows().get(0).size());
stmtStr = "admin show data skew from test.tbl1 partition(p1)";
AdminShowDataSkewStmt skewStmt = (AdminShowDataSkewStmt) UtFrameUtils.parseAndAnalyzeStmt(
stmtStr, connectContext);
executor = new ShowExecutor(connectContext, skewStmt);
resultSet = executor.execute();
Assert.assertEquals(10, resultSet.getResultRows().size());
Assert.assertEquals(4, resultSet.getResultRows().get(0).size());
// update tablets' data size and row count
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
OlapTable olapTable = (OlapTable) db.getTable("tbl1");
for (Partition partition : olapTable.getPartitions()) {
for (MaterializedIndex mIndex : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : mIndex.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
replica.updateStat(1024, 2);
}
}
}
}
executor = new ShowExecutor(connectContext, stmt);
resultSet = executor.execute();
Assert.assertEquals(1, resultSet.getResultRows().size());
Assert.assertEquals(7, resultSet.getResultRows().get(0).size());
executor = new ShowExecutor(connectContext, skewStmt);
resultSet = executor.execute();
Assert.assertEquals(10, resultSet.getResultRows().size());
Assert.assertEquals("4", resultSet.getResultRows().get(4).get(0));
Assert.assertEquals(4, resultSet.getResultRows().get(0).size());
}
@Test
public void testShowReplicaStatus() {
String stmt = new String("ADMIN SHOW REPLICA STATUS FROM db.tbl1 WHERE status = 'ok'");

View File

@ -1579,7 +1579,6 @@ public class QueryPlanTest {
sql = "select * from test1 where from_unixtime(query_time, 'yyyy-MM-dd') < '2021-03-02 10:01:28'";
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
Assert.assertTrue(explainString.contains("PREDICATES: `query_time` < 1614614400, `query_time` >= 0"));
}
@Test