[feature-wip](unique-key-merge-on-write) remove AggType on unique table with MoW, enable preAggreation, DSIP-018[5/2] (#11205)

remove AggType on unique table with MoW, enable preAggreation
This commit is contained in:
zhannngchen
2022-07-28 17:03:05 +08:00
committed by GitHub
parent 97874dd125
commit 70c7e3d7aa
9 changed files with 163 additions and 51 deletions

View File

@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.external.elasticsearch.EsUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
@ -47,6 +48,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -276,6 +278,14 @@ public class CreateTableStmt extends DdlStmt {
if (engineName.equals("hive") && !Config.enable_spark_load) {
throw new AnalysisException("Spark Load from hive table is coming soon");
}
// `analyzeUniqueKeyMergeOnWrite` would modify `properties`, which will be used later,
// so we just clone a properties map here.
boolean enableUniqueKeyMergeOnWrite = false;
if (properties != null) {
enableUniqueKeyMergeOnWrite = PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(new HashMap<>(properties));
}
// analyze key desc
if (engineName.equalsIgnoreCase("olap")) {
// olap table
@ -339,6 +349,9 @@ public class CreateTableStmt extends DdlStmt {
if (keysDesc.getKeysType() == KeysType.DUP_KEYS) {
type = AggregateType.NONE;
}
if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS && enableUniqueKeyMergeOnWrite) {
type = AggregateType.NONE;
}
for (int i = keysDesc.keysColumnSize(); i < columnDefs.size(); ++i) {
columnDefs.get(i).setAggregateType(type);
}
@ -363,7 +376,11 @@ public class CreateTableStmt extends DdlStmt {
if (Config.enable_batch_delete_by_default
&& keysDesc != null
&& keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
// TODO(zhangchen): Disable the delete sign column for primary key temporary, will replace
// with a better solution later.
if (!enableUniqueKeyMergeOnWrite) {
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
}
}
boolean hasObjectStored = false;
String objectStoredColumn = "";

View File

@ -1890,6 +1890,11 @@ public class InternalDataSource implements DataSourceIf<Database> {
try {
sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType());
if (sequenceColType != null) {
// TODO(zhannngchen) will support sequence column later.
if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Unique key table with MoW(merge on write) not support "
+ "sequence column for now");
}
olapTable.setSequenceInfo(sequenceColType);
}
} catch (Exception e) {

View File

@ -140,13 +140,14 @@ public class MaterializedViewSelector {
// Step2: check all columns in compensating predicates are available in the view output
checkCompensatingPredicates(columnNamesInPredicates.get(tableId), candidateIndexIdToMeta);
// Step3: group by list in query is the subset of group by list in view or view contains no aggregation
checkGrouping(columnNamesInGrouping.get(tableId), candidateIndexIdToMeta);
checkGrouping(table, columnNamesInGrouping.get(tableId), candidateIndexIdToMeta);
// Step4: aggregation functions are available in the view output
checkAggregationFunction(aggColumnsInQuery.get(tableId), candidateIndexIdToMeta);
checkAggregationFunction(table, aggColumnsInQuery.get(tableId), candidateIndexIdToMeta);
// Step5: columns required to compute output expr are available in the view output
checkOutputColumns(columnNamesInQueryOutput.get(tableId), candidateIndexIdToMeta);
// Step6: if table type is aggregate and the candidateIndexIdToSchema is empty,
if ((table.getKeysType() == KeysType.AGG_KEYS || table.getKeysType() == KeysType.UNIQUE_KEYS)
if ((table.getKeysType() == KeysType.AGG_KEYS || (table.getKeysType() == KeysType.UNIQUE_KEYS
&& !table.getTableProperty().getEnableUniqueKeyMergeOnWrite()))
&& candidateIndexIdToMeta.size() == 0) {
// the base index will be added in the candidateIndexIdToSchema.
/**
@ -299,7 +300,7 @@ public class MaterializedViewSelector {
* @param candidateIndexIdToMeta
*/
private void checkGrouping(Set<String> columnsInGrouping, Map<Long, MaterializedIndexMeta>
private void checkGrouping(OlapTable table, Set<String> columnsInGrouping, Map<Long, MaterializedIndexMeta>
candidateIndexIdToMeta) {
Iterator<Map.Entry<Long, MaterializedIndexMeta>> iterator = candidateIndexIdToMeta.entrySet().iterator();
while (iterator.hasNext()) {
@ -325,8 +326,10 @@ public class MaterializedViewSelector {
ISSUE-3016, MaterializedViewFunctionTest: testDeduplicateQueryInAgg
*/
if (indexNonAggregatedColumnNames.size() == candidateIndexSchema.size()
&& candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS) {
boolean noNeedAggregation = candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS
|| (candidateIndexMeta.getKeysType() == KeysType.UNIQUE_KEYS
&& table.getTableProperty().getEnableUniqueKeyMergeOnWrite());
if (indexNonAggregatedColumnNames.size() == candidateIndexSchema.size() && noNeedAggregation) {
continue;
}
// When the query is SPJ type but the candidate index is SPJG type, it will not pass directly.
@ -348,7 +351,7 @@ public class MaterializedViewSelector {
+ Joiner.on(",").join(candidateIndexIdToMeta.keySet()));
}
private void checkAggregationFunction(Set<FunctionCallExpr> aggregatedColumnsInQueryOutput,
private void checkAggregationFunction(OlapTable table, Set<FunctionCallExpr> aggregatedColumnsInQueryOutput,
Map<Long, MaterializedIndexMeta> candidateIndexIdToMeta) throws AnalysisException {
Iterator<Map.Entry<Long, MaterializedIndexMeta>> iterator = candidateIndexIdToMeta.entrySet().iterator();
while (iterator.hasNext()) {
@ -356,7 +359,10 @@ public class MaterializedViewSelector {
MaterializedIndexMeta candidateIndexMeta = entry.getValue();
List<FunctionCallExpr> indexAggColumnExpsList = mvAggColumnsToExprList(candidateIndexMeta);
// When the candidate index is SPJ type, it passes the verification directly
if (indexAggColumnExpsList.size() == 0 && candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS) {
boolean noNeedAggregation = candidateIndexMeta.getKeysType() == KeysType.DUP_KEYS
|| (candidateIndexMeta.getKeysType() == KeysType.UNIQUE_KEYS
&& table.getTableProperty().getEnableUniqueKeyMergeOnWrite());
if (indexAggColumnExpsList.size() == 0 && noNeedAggregation) {
continue;
}
// When the query is SPJ type but the candidate index is SPJG type, it will not pass directly.

View File

@ -275,8 +275,9 @@ public class OlapScanNode extends ScanNode {
String situation;
boolean update;
CHECK: { // CHECKSTYLE IGNORE THIS LINE
if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
situation = "The key type of table is duplicate.";
if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
situation = "The key type of table is duplicate, or unique key with merge-on-write.";
update = true;
break CHECK;
}
@ -659,7 +660,8 @@ public class OlapScanNode extends ScanNode {
public void selectBestRollupByRollupSelector(Analyzer analyzer) throws UserException {
// Step2: select best rollup
long start = System.currentTimeMillis();
if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
// This function is compatible with the INDEX selection logic of ROLLUP,
// so the Duplicate table here returns base index directly
// and the selection logic of materialized view is selected in

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalDataSource;
import org.apache.doris.mysql.privilege.MockedAuth;
import org.apache.doris.mysql.privilege.PaloAuth;
@ -122,6 +123,51 @@ public class CreateTableStmtTest {
Assert.assertTrue(stmt.toSql().contains("DISTRIBUTED BY RANDOM\nBUCKETS 6"));
}
@Test
public void testCreateTableUniqueKeyNormal() throws UserException {
// setup
Map<String, String> properties = new HashMap<>();
ColumnDef col3 = new ColumnDef("col3", new TypeDef(ScalarType.createType(PrimitiveType.BIGINT)));
col3.setIsKey(false);
cols.add(col3);
ColumnDef col4 = new ColumnDef("col4", new TypeDef(ScalarType.createType(PrimitiveType.STRING)));
col4.setIsKey(false);
cols.add(col4);
// test normal case
CreateTableStmt stmt = new CreateTableStmt(false, false, tblName, cols, "olap",
new KeysDesc(KeysType.UNIQUE_KEYS, colsName), null,
new HashDistributionDesc(10, Lists.newArrayList("col1")), properties, null, "");
stmt.analyze(analyzer);
Assert.assertEquals(col3.getAggregateType(), AggregateType.REPLACE);
Assert.assertEquals(col4.getAggregateType(), AggregateType.REPLACE);
// clear
cols.remove(col3);
cols.remove(col4);
}
@Test
public void testCreateTableUniqueKeyMoW() throws UserException {
// setup
Map<String, String> properties = new HashMap<>();
properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "true");
ColumnDef col3 = new ColumnDef("col3", new TypeDef(ScalarType.createType(PrimitiveType.BIGINT)));
col3.setIsKey(false);
cols.add(col3);
ColumnDef col4 = new ColumnDef("col4", new TypeDef(ScalarType.createType(PrimitiveType.STRING)));
col4.setIsKey(false);
cols.add(col4);
// test merge-on-write
CreateTableStmt stmt = new CreateTableStmt(false, false, tblName, cols, "olap",
new KeysDesc(KeysType.UNIQUE_KEYS, colsName), null,
new HashDistributionDesc(10, Lists.newArrayList("col1")), properties, null, "");
stmt.analyze(analyzer);
Assert.assertEquals(col3.getAggregateType(), AggregateType.NONE);
Assert.assertEquals(col4.getAggregateType(), AggregateType.NONE);
// clear
cols.remove(col3);
cols.remove(col4);
}
@Test
public void testCreateTableWithRollup() throws UserException {
List<AlterClause> ops = Lists.newArrayList();

View File

@ -210,6 +210,7 @@ public class MaterializedViewSelectorTest {
@Test
public void testCheckGrouping(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer,
@Injectable OlapTable table,
@Injectable MaterializedIndexMeta indexMeta1,
@Injectable MaterializedIndexMeta indexMeta2,
@Injectable MaterializedIndexMeta indexMeta3) {
@ -249,7 +250,7 @@ public class MaterializedViewSelectorTest {
MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer);
Deencapsulation.setField(selector, "isSPJQuery", false);
Deencapsulation.invoke(selector, "checkGrouping", tableAColumnNames, candidateIndexIdToSchema);
Deencapsulation.invoke(selector, "checkGrouping", table, tableAColumnNames, candidateIndexIdToSchema);
Assert.assertEquals(2, candidateIndexIdToSchema.size());
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(1)));
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(2)));
@ -257,6 +258,7 @@ public class MaterializedViewSelectorTest {
@Test
public void testCheckAggregationFunction(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer,
@Injectable OlapTable table,
@Injectable MaterializedIndexMeta indexMeta1,
@Injectable MaterializedIndexMeta indexMeta2,
@Injectable MaterializedIndexMeta indexMeta3) {
@ -299,8 +301,8 @@ public class MaterializedViewSelectorTest {
Set<FunctionCallExpr> aggregatedColumnsInQueryOutput = Sets.newHashSet();
aggregatedColumnsInQueryOutput.add(functionCallExpr);
Deencapsulation.setField(selector, "isSPJQuery", false);
Deencapsulation.invoke(selector, "checkAggregationFunction", aggregatedColumnsInQueryOutput,
candidateIndexIdToSchema);
Deencapsulation.invoke(selector, "checkAggregationFunction", table, aggregatedColumnsInQueryOutput,
candidateIndexIdToSchema);
Assert.assertEquals(2, candidateIndexIdToSchema.size());
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(1)));
Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(3)));