[Bug](materialized-view) fix where clause persistence replay incorrect (#18228)
fix where clause persistence replay incorrect
This commit is contained in:
@ -141,6 +141,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
// save failed task after retry three times, tabletId -> agentTask
|
||||
private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
|
||||
|
||||
private Analyzer analyzer;
|
||||
|
||||
private RollupJobV2() {
|
||||
super(JobType.ROLLUP);
|
||||
}
|
||||
@ -149,7 +151,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema,
|
||||
Column whereColumn,
|
||||
int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, short rollupShortKeyColumnCount,
|
||||
OriginStatement origStmt) {
|
||||
OriginStatement origStmt) throws AnalysisException {
|
||||
super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs);
|
||||
|
||||
this.baseIndexId = baseIndexId;
|
||||
@ -166,6 +168,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
this.rollupShortKeyColumnCount = rollupShortKeyColumnCount;
|
||||
|
||||
this.origStmt = origStmt;
|
||||
initAnalyzer();
|
||||
}
|
||||
|
||||
public void addTabletIdMap(long partitionId, long rollupTabletId, long baseTabletId) {
|
||||
@ -182,6 +185,27 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
this.storageFormat = storageFormat;
|
||||
}
|
||||
|
||||
private void initAnalyzer() throws AnalysisException {
|
||||
ConnectContext connectContext = new ConnectContext();
|
||||
Database db;
|
||||
try {
|
||||
db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
|
||||
} catch (MetaNotFoundException e) {
|
||||
throw new AnalysisException("error happens when parsing create materialized view stmt: " + origStmt, e);
|
||||
}
|
||||
String clusterName = db.getClusterName();
|
||||
// It's almost impossible that db's cluster name is null, just in case
|
||||
// because before user want to create database, he must first enter a cluster
|
||||
// which means that cluster is set to current ConnectContext
|
||||
// then when createDBStmt is executed, cluster name is set to Database
|
||||
if (clusterName == null || clusterName.length() == 0) {
|
||||
clusterName = SystemInfoService.DEFAULT_CLUSTER;
|
||||
}
|
||||
connectContext.setCluster(clusterName);
|
||||
connectContext.setDatabase(db.getFullName());
|
||||
analyzer = new Analyzer(Env.getCurrentEnv(), connectContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* runPendingJob():
|
||||
* 1. Create all rollup replicas and wait them finished.
|
||||
@ -328,9 +352,9 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
partition.createRollupIndex(rollupIndex);
|
||||
}
|
||||
|
||||
tbl.setIndexMeta(rollupIndexId, rollupIndexName, rollupSchema, whereColumn, 0 /* init schema version */,
|
||||
tbl.setIndexMeta(rollupIndexId, rollupIndexName, rollupSchema, 0 /* init schema version */,
|
||||
rollupSchemaHash, rollupShortKeyColumnCount, TStorageType.COLUMN,
|
||||
rollupKeysType, origStmt);
|
||||
rollupKeysType, origStmt, analyzer != null ? new Analyzer(analyzer) : analyzer);
|
||||
tbl.rebuildFullSchema();
|
||||
}
|
||||
|
||||
@ -840,26 +864,9 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
// parse the define stmt to schema
|
||||
SqlParser parser = new SqlParser(new SqlScanner(
|
||||
new StringReader(origStmt.originStmt), SqlModeHelper.MODE_DEFAULT));
|
||||
ConnectContext connectContext = new ConnectContext();
|
||||
Database db;
|
||||
try {
|
||||
db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
|
||||
} catch (MetaNotFoundException e) {
|
||||
throw new IOException("error happens when parsing create materialized view stmt: " + origStmt, e);
|
||||
}
|
||||
String clusterName = db.getClusterName();
|
||||
// It's almost impossible that db's cluster name is null, just in case
|
||||
// because before user want to create database, he must first enter a cluster
|
||||
// which means that cluster is set to current ConnectContext
|
||||
// then when createDBStmt is executed, cluster name is set to Database
|
||||
if (clusterName == null || clusterName.length() == 0) {
|
||||
clusterName = SystemInfoService.DEFAULT_CLUSTER;
|
||||
}
|
||||
connectContext.setCluster(clusterName);
|
||||
connectContext.setDatabase(db.getFullName());
|
||||
Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), connectContext);
|
||||
CreateMaterializedViewStmt stmt = null;
|
||||
try {
|
||||
initAnalyzer();
|
||||
stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, origStmt.idx);
|
||||
stmt.setIsReplay(true);
|
||||
stmt.analyze(analyzer);
|
||||
|
||||
@ -497,7 +497,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
|
||||
return new MVColumnItem(type, mvAggregateType, defineExpr, mvColumnBuilder(defineExpr.toSql()));
|
||||
}
|
||||
|
||||
public Map<String, Expr> parseDefineExprWithoutAnalyze() throws AnalysisException {
|
||||
public Map<String, Expr> parseDefineExpr(Analyzer analyzer) throws AnalysisException {
|
||||
Map<String, Expr> result = Maps.newHashMap();
|
||||
SelectList selectList = selectStmt.getSelectList();
|
||||
for (SelectListItem selectListItem : selectList.getItems()) {
|
||||
@ -513,7 +513,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
|
||||
case FunctionSet.BITMAP_UNION:
|
||||
case FunctionSet.HLL_UNION:
|
||||
case FunctionSet.COUNT:
|
||||
MVColumnItem item = buildMVColumnItem(null, functionCallExpr);
|
||||
MVColumnItem item = buildMVColumnItem(analyzer, functionCallExpr);
|
||||
expr = item.getDefineExpr();
|
||||
name = item.getName();
|
||||
break;
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.CreateMaterializedViewStmt;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
@ -188,7 +189,7 @@ public class MaterializedIndexMeta implements Writable, GsonPostProcessable {
|
||||
columnList += "]";
|
||||
|
||||
for (Column column : schema) {
|
||||
if (CreateMaterializedViewStmt.oldmvColumnBreaker(column.getName()).equals(name)) {
|
||||
if (CreateMaterializedViewStmt.oldmvColumnBreaker(column.getName()).equalsIgnoreCase(name)) {
|
||||
if (matchedColumn == null) {
|
||||
matchedColumn = column;
|
||||
} else {
|
||||
@ -198,6 +199,7 @@ public class MaterializedIndexMeta implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (matchedColumn != null) {
|
||||
LOG.debug("trans old MV, MV: {}, DefineExpr:{}, DefineName:{}",
|
||||
matchedColumn.getName(), entry.getValue().toSqlWithoutTbl(), entry.getKey());
|
||||
@ -265,6 +267,10 @@ public class MaterializedIndexMeta implements Writable, GsonPostProcessable {
|
||||
@Override
|
||||
public void gsonPostProcess() throws IOException {
|
||||
initColumnNameMap();
|
||||
parseStmt(null);
|
||||
}
|
||||
|
||||
public void parseStmt(Analyzer analyzer) throws IOException {
|
||||
// analyze define stmt
|
||||
if (defineStmt == null) {
|
||||
return;
|
||||
@ -275,11 +281,16 @@ public class MaterializedIndexMeta implements Writable, GsonPostProcessable {
|
||||
CreateMaterializedViewStmt stmt;
|
||||
try {
|
||||
stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, defineStmt.idx);
|
||||
setWhereClause(stmt.getWhereClause());
|
||||
if (analyzer != null) {
|
||||
stmt.analyze(analyzer);
|
||||
}
|
||||
|
||||
stmt.setIsReplay(true);
|
||||
setWhereClause(stmt.getWhereClause());
|
||||
stmt.rewriteToBitmapWithCheck();
|
||||
Map<String, Expr> columnNameToDefineExpr = stmt.parseDefineExprWithoutAnalyze();
|
||||
Map<String, Expr> columnNameToDefineExpr = stmt.parseDefineExpr(analyzer);
|
||||
setColumnsDefineExpr(columnNameToDefineExpr);
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new IOException("error happens when parsing create materialized view stmt: " + defineStmt, e);
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.alter.MaterializedViewHandler;
|
||||
import org.apache.doris.analysis.AggregateInfo;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.ColumnDef;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.analysis.DataSortInfo;
|
||||
@ -288,14 +289,15 @@ public class OlapTable extends Table {
|
||||
|
||||
public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion, int schemaHash,
|
||||
short shortKeyColumnCount, TStorageType storageType, KeysType keysType) {
|
||||
setIndexMeta(indexId, indexName, schema, null, schemaVersion, schemaHash, shortKeyColumnCount, storageType,
|
||||
setIndexMeta(indexId, indexName, schema, schemaVersion, schemaHash, shortKeyColumnCount, storageType,
|
||||
keysType,
|
||||
null);
|
||||
null, null);
|
||||
}
|
||||
|
||||
public void setIndexMeta(long indexId, String indexName, List<Column> schema, Column whereColumn, int schemaVersion,
|
||||
public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion,
|
||||
int schemaHash,
|
||||
short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement origStmt) {
|
||||
short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement origStmt,
|
||||
Analyzer analyzer) {
|
||||
// Nullable when meta comes from schema change log replay.
|
||||
// The replay log only save the index id, so we need to get name by id.
|
||||
if (indexName == null) {
|
||||
@ -319,8 +321,10 @@ public class OlapTable extends Table {
|
||||
|
||||
MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, schema, schemaVersion,
|
||||
schemaHash, shortKeyColumnCount, storageType, keysType, origStmt);
|
||||
if (whereColumn != null) {
|
||||
indexMeta.setWhereClause(whereColumn.getDefineExpr());
|
||||
try {
|
||||
indexMeta.parseStmt(analyzer);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("parse meta stmt failed", e);
|
||||
}
|
||||
|
||||
indexIdToMeta.put(indexId, indexMeta);
|
||||
|
||||
@ -78,10 +78,10 @@ public class Util {
|
||||
TYPE_STRING_MAP.put(PrimitiveType.VARCHAR, "varchar(%d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.JSONB, "jsonb");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.STRING, "string");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.DECIMALV2, "decimal(%d,%d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.DECIMAL32, "decimal(%d,%d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.DECIMAL64, "decimal(%d,%d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.DECIMAL128, "decimal(%d,%d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.DECIMALV2, "decimal(%d, %d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.DECIMAL32, "decimal(%d, %d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.DECIMAL64, "decimal(%d, %d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.DECIMAL128, "decimal(%d, %d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.HLL, "varchar(%d)");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.BOOLEAN, "bool");
|
||||
TYPE_STRING_MAP.put(PrimitiveType.BITMAP, "bitmap");
|
||||
|
||||
@ -480,7 +480,7 @@ public class MasterImpl {
|
||||
|
||||
PublishVersionTask publishVersionTask = (PublishVersionTask) task;
|
||||
publishVersionTask.addErrorTablets(errorTabletIds);
|
||||
publishVersionTask.setIsFinished(true);
|
||||
publishVersionTask.setFinished(true);
|
||||
|
||||
if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
|
||||
// not remove the task from queue and be will retry
|
||||
|
||||
@ -33,7 +33,6 @@ public class PublishVersionTask extends AgentTask {
|
||||
private long transactionId;
|
||||
private List<TPartitionVersionInfo> partitionVersionInfos;
|
||||
private List<Long> errorTablets;
|
||||
private boolean isFinished;
|
||||
|
||||
public PublishVersionTask(long backendId, long transactionId, long dbId,
|
||||
List<TPartitionVersionInfo> partitionVersionInfos, long createTime) {
|
||||
@ -69,12 +68,4 @@ public class PublishVersionTask extends AgentTask {
|
||||
}
|
||||
this.errorTablets.addAll(errorTablets);
|
||||
}
|
||||
|
||||
public void setIsFinished(boolean isFinished) {
|
||||
this.isFinished = isFinished;
|
||||
}
|
||||
|
||||
public boolean isFinished() {
|
||||
return isFinished;
|
||||
}
|
||||
}
|
||||
|
||||
@ -315,7 +315,7 @@ public class RollupJobV2Test {
|
||||
|
||||
@Test
|
||||
public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt)
|
||||
throws IOException {
|
||||
throws IOException, AnalysisException {
|
||||
// prepare file
|
||||
File file = new File(fileName);
|
||||
file.createNewFile();
|
||||
|
||||
@ -140,9 +140,9 @@ public class CreateFunctionTest {
|
||||
|
||||
queryStr = "select db1.decimal(k3, 4, 1) from db1.tbl1;";
|
||||
if (Config.enable_decimal_conversion) {
|
||||
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMALV3(4,1))"));
|
||||
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMALV3(4, 1))"));
|
||||
} else {
|
||||
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMAL(4,1))"));
|
||||
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMAL(4, 1))"));
|
||||
}
|
||||
|
||||
// cast any type to varchar with fixed length
|
||||
@ -249,9 +249,9 @@ public class CreateFunctionTest {
|
||||
|
||||
queryStr = "select decimal(k3, 4, 1) from db2.tbl1;";
|
||||
if (Config.enable_decimal_conversion) {
|
||||
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMALV3(4,1))"));
|
||||
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMALV3(4, 1))"));
|
||||
} else {
|
||||
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMAL(4,1))"));
|
||||
Assert.assertTrue(dorisAssert.query(queryStr).explainQuery().contains("CAST(`k3` AS DECIMAL(4, 1))"));
|
||||
}
|
||||
|
||||
// 5. cast any type to varchar with fixed length
|
||||
|
||||
@ -96,7 +96,7 @@ public class MaterializedIndexMetaTest {
|
||||
columnNameToDefineExpr.put(mvColumnName, new FunctionCallExpr(new FunctionName("to_bitmap"), params));
|
||||
new Expectations() {
|
||||
{
|
||||
stmt.parseDefineExprWithoutAnalyze();
|
||||
stmt.parseDefineExpr(null);
|
||||
result = columnNameToDefineExpr;
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user