From 30df9fcae96419b6a9674792d6bef19d77d68dee Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Sat, 30 May 2020 20:17:46 +0800 Subject: [PATCH] Serialize origin stmt in Rollup Job and MV Meta (#3705) * Serialize origin stmt in Rollup Job and MV Meta In materialized view 2.0, the define expr is serialized in column. The method is that doris serialzie the origin stmt of Create Materialzied View Stmt in RollupJobV2 and MVMeta. The define expr will be extract from the origin stmt after meta is deserialized. The define expr is necessary for bitmap and hll materialized view. For example: MV meta: __doris_mv_bitmap_k1, bitmap_union, to_bitmap(k1) Origin stmt: select bitmap_union(to_bitmap(k1)) from table Deserialize meta: __doris_mv_bitmap_k1, bitmap_union, null After extract: the define expr `to_bitmap(k1)` from origin stmt should be extracted. __doris_mv_bitmap_v1, bitmap_union, to_bitmap(k1) (which comes from the origin stmt) Change-Id: Ic2da093188d8985f5e97be5bd094e5d60d82c9a7 * Add comment of read method Change-Id: I4e1e0f4ad0f6e76cdc43e49938de768ec3b0a0e8 * Fix ut Change-Id: I2be257d512bf541f00912a374a2e07a039fc42b4 * Change code style Change-Id: I3ab23f5c94ae781167f498fefde2d96e42e05bf9 --- .../java/org/apache/doris/alter/Alter.java | 6 +- .../org/apache/doris/alter/AlterJobV2.java | 36 ++- .../doris/alter/MaterializedViewHandler.java | 12 +- .../org/apache/doris/alter/RollupJobV2.java | 206 ++++++++++-------- .../apache/doris/alter/SchemaChangeJobV2.java | 167 +++----------- .../analysis/CreateMaterializedViewStmt.java | 8 +- .../apache/doris/analysis/StatementBase.java | 15 +- .../org/apache/doris/catalog/Catalog.java | 3 +- .../java/org/apache/doris/catalog/Column.java | 40 ++-- .../doris/catalog/MaterializedIndexMeta.java | 48 +++- .../org/apache/doris/catalog/OlapTable.java | 11 +- .../java/org/apache/doris/common/Config.java | 2 +- .../apache/doris/common/FeMetaVersion.java | 4 +- .../doris/load/loadv2/BrokerLoadJob.java | 4 +- .../apache/doris/load/loadv2/LoadManager.java | 5 +- .../load/routineload/RoutineLoadManager.java | 5 +- .../persist/gson/GsonPostProcessable.java | 4 +- .../apache/doris/persist/gson/GsonUtils.java | 12 +- .../org/apache/doris/qe/ConnectProcessor.java | 3 +- .../java/org/apache/doris/qe/DdlExecutor.java | 7 +- .../org/apache/doris/qe/StmtExecutor.java | 10 +- .../apache/doris/alter/RollupJobV2Test.java | 50 ++++- .../doris/alter/SchemaChangeJobV2Test.java | 39 ++++ .../org/apache/doris/catalog/ColumnTest.java | 3 +- .../catalog/MaterializedIndexMetaTest.java | 53 ++++- .../doris/catalog/TempPartitionTest.java | 4 - .../doris/load/loadv2/BrokerLoadJobTest.java | 13 +- .../routineload/RoutineLoadManagerTest.java | 6 +- .../doris/persist/gson/ThriftToJsonTest.java | 35 +++ .../org/apache/doris/qe/StmtExecutorTest.java | 6 +- 30 files changed, 478 insertions(+), 339 deletions(-) create mode 100644 fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java b/fe/src/main/java/org/apache/doris/alter/Alter.java index 4678d50927..9711065282 100644 --- a/fe/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/src/main/java/org/apache/doris/alter/Alter.java @@ -80,7 +80,8 @@ public class Alter { clusterHandler.start(); } - public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) throws DdlException, AnalysisException { + public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) + throws DdlException, AnalysisException { String tableName = stmt.getBaseIndexName(); // check db String dbName = stmt.getDBName(); @@ -102,7 +103,8 @@ public class Alter { OlapTable olapTable = (OlapTable) table; olapTable.checkStableAndNormal(db.getClusterName()); - ((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt, db, olapTable); + ((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt, db, + olapTable); } finally { db.writeUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java index 62c61cb8ea..3a5147295e 100644 --- a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -22,10 +22,13 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.common.Config; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,17 +62,27 @@ public abstract class AlterJobV2 implements Writable { ROLLUP, SCHEMA_CHANGE } + @SerializedName(value = "type") protected JobType type; + @SerializedName(value = "jobId") protected long jobId; + @SerializedName(value = "jobState") protected JobState jobState; + @SerializedName(value = "dbId") protected long dbId; + @SerializedName(value = "tableId") protected long tableId; + @SerializedName(value = "tableName") protected String tableName; + @SerializedName(value = "errMsg") protected String errMsg = ""; + @SerializedName(value = "createTimeMs") protected long createTimeMs = -1; + @SerializedName(value = "finishedTimeMs") protected long finishedTimeMs = -1; + @SerializedName(value = "timeoutMs") protected long timeoutMs = -1; public AlterJobV2(long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) { @@ -220,15 +233,20 @@ public abstract class AlterJobV2 implements Writable { public abstract void replay(AlterJobV2 replayedJob); public static AlterJobV2 read(DataInput in) throws IOException { - JobType type = JobType.valueOf(Text.readString(in)); - switch (type) { - case ROLLUP: - return RollupJobV2.read(in); - case SCHEMA_CHANGE: - return SchemaChangeJobV2.read(in); - default: - Preconditions.checkState(false); - return null; + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86) { + JobType type = JobType.valueOf(Text.readString(in)); + switch (type) { + case ROLLUP: + return RollupJobV2.read(in); + case SCHEMA_CHANGE: + return SchemaChangeJobV2.read(in); + default: + Preconditions.checkState(false); + return null; + } + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, AlterJobV2.class); } } diff --git a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 921350e6e2..1d3fb3ae57 100644 --- a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -57,6 +57,7 @@ import org.apache.doris.persist.BatchDropInfo; import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; @@ -89,7 +90,6 @@ import java.util.concurrent.ConcurrentHashMap; public class MaterializedViewHandler extends AlterHandler { private static final Logger LOG = LogManager.getLogger(MaterializedViewHandler.class); public static final String NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX = "__v2_"; - public static final String MATERIALIZED_VIEW_NAME_PRFIX = "__doris_materialized_view_"; public MaterializedViewHandler() { super("materialized view"); @@ -200,7 +200,7 @@ public class MaterializedViewHandler extends AlterHandler { // Step2: create mv job RollupJobV2 rollupJobV2 = createMaterializedViewJob(mvIndexName, baseIndexName, mvColumns, addMVClause - .getProperties(), olapTable, db, baseIndexId, addMVClause.getMVKeysType()); + .getProperties(), olapTable, db, baseIndexId, addMVClause.getMVKeysType(), addMVClause.getOrigStmt()); addAlterJobV2(rollupJobV2); @@ -264,7 +264,7 @@ public class MaterializedViewHandler extends AlterHandler { // step 3 create rollup job RollupJobV2 alterJobV2 = createMaterializedViewJob(rollupIndexName, baseIndexName, rollupSchema, addRollupClause.getProperties(), - olapTable, db, baseIndexId, olapTable.getKeysType()); + olapTable, db, baseIndexId, olapTable.getKeysType(), null); rollupNameJobMap.put(addRollupClause.getRollupName(), alterJobV2); logJobIdSet.add(alterJobV2.getJobId()); @@ -313,7 +313,7 @@ public class MaterializedViewHandler extends AlterHandler { */ private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexName, List mvColumns, Map properties, OlapTable - olapTable, Database db, long baseIndexId, KeysType mvKeysType) + olapTable, Database db, long baseIndexId, KeysType mvKeysType, OriginStatement origStmt) throws DdlException, AnalysisException { if (mvKeysType == null) { // assign rollup index's key type, same as base index's @@ -337,7 +337,7 @@ public class MaterializedViewHandler extends AlterHandler { RollupJobV2 mvJob = new RollupJobV2(jobId, dbId, tableId, olapTable.getName(), timeoutMs, baseIndexId, mvIndexId, baseIndexName, mvName, mvColumns, baseSchemaHash, mvSchemaHash, - mvKeysType, mvShortKeyColumnCount); + mvKeysType, mvShortKeyColumnCount, origStmt); String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName(); if (mvName.equals(newStorageFormatIndexName)) { mvJob.setStorageFormat(TStorageFormat.V2); @@ -465,10 +465,8 @@ public class MaterializedViewHandler extends AlterHandler { if (mvColumnItem.getDefineExpr() != null) { if (mvAggregationType.equals(AggregateType.BITMAP_UNION)) { newMVColumn.setType(Type.BITMAP); - newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + baseColumn.getName()); } else if (mvAggregationType.equals(AggregateType.HLL_UNION)){ newMVColumn.setType(Type.HLL); - newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + "hll_" + baseColumn.getName()); } else { throw new DdlException("The define expr of column is only support bitmap_union or hll_union"); } diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java index aac6d09a57..e856a764c0 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -17,6 +17,10 @@ package org.apache.doris.alter; +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.MVColumnItem; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -36,7 +40,12 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.SqlModeHelper; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; @@ -53,6 +62,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -60,6 +70,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.StringReader; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -70,30 +81,45 @@ import java.util.concurrent.TimeUnit; * This is for replacing the old RollupJob * https://github.com/apache/incubator-doris/issues/1429 */ -public class RollupJobV2 extends AlterJobV2 { +public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(RollupJobV2.class); // partition id -> (rollup tablet id -> base tablet id) + @SerializedName(value = "partitionIdToBaseRollupTabletIdMap") private Map> partitionIdToBaseRollupTabletIdMap = Maps.newHashMap(); + @SerializedName(value = "partitionIdToRollupIndex") private Map partitionIdToRollupIndex = Maps.newHashMap(); // rollup and base schema info + @SerializedName(value = "baseIndexId") private long baseIndexId; + @SerializedName(value = "rollupIndexId") private long rollupIndexId; + @SerializedName(value = "baseIndexName") private String baseIndexName; + @SerializedName(value = "rollupIndexName") private String rollupIndexName; + @SerializedName(value = "rollupSchema") private List rollupSchema = Lists.newArrayList(); + @SerializedName(value = "baseSchemaHash") private int baseSchemaHash; + @SerializedName(value = "rollupSchemaHash") private int rollupSchemaHash; + @SerializedName(value = "rollupKeysType") private KeysType rollupKeysType; + @SerializedName(value = "rollupShortKeyColumnCount") private short rollupShortKeyColumnCount; + @SerializedName(value = "origStmt") + private OriginStatement origStmt; // optional + @SerializedName(value = "storageFormat") private TStorageFormat storageFormat = TStorageFormat.DEFAULT; // The rollup job will wait all transactions before this txn id finished, then send the rollup tasks. + @SerializedName(value = "watershedTxnId") protected long watershedTxnId = -1; // save all create rollup tasks @@ -101,8 +127,8 @@ public class RollupJobV2 extends AlterJobV2 { public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName, - List rollupSchema, int baseSchemaHash, int rollupSchemaHash, - KeysType rollupKeysType, short rollupShortKeyColumnCount) { + List rollupSchema, int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, + short rollupShortKeyColumnCount, OriginStatement origStmt) { super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs); this.baseIndexId = baseIndexId; @@ -115,6 +141,8 @@ public class RollupJobV2 extends AlterJobV2 { this.rollupSchemaHash = rollupSchemaHash; this.rollupKeysType = rollupKeysType; this.rollupShortKeyColumnCount = rollupShortKeyColumnCount; + + this.origStmt = origStmt; } private RollupJobV2() { @@ -278,7 +306,7 @@ public class RollupJobV2 extends AlterJobV2 { } tbl.setIndexMeta(rollupIndexId, rollupIndexName, rollupSchema, 0 /* init schema version */, - rollupSchemaHash, rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType); + rollupSchemaHash, rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType, origStmt); } /** @@ -506,90 +534,6 @@ public class RollupJobV2 extends AlterJobV2 { return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId)); } - public static RollupJobV2 read(DataInput in) throws IOException { - RollupJobV2 rollupJob = new RollupJobV2(); - rollupJob.readFields(in); - return rollupJob; - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - out.writeInt(partitionIdToRollupIndex.size()); - for (long partitionId : partitionIdToRollupIndex.keySet()) { - out.writeLong(partitionId); - - out.writeInt(partitionIdToBaseRollupTabletIdMap.get(partitionId).size()); - for (Map.Entry entry : partitionIdToBaseRollupTabletIdMap.get(partitionId).entrySet()) { - out.writeLong(entry.getKey()); - out.writeLong(entry.getValue()); - } - - MaterializedIndex rollupIndex = partitionIdToRollupIndex.get(partitionId); - rollupIndex.write(out); - } - - out.writeLong(baseIndexId); - out.writeLong(rollupIndexId); - Text.writeString(out, baseIndexName); - Text.writeString(out, rollupIndexName); - - // rollup schema - out.writeInt(rollupSchema.size()); - for (Column column : rollupSchema) { - column.write(out); - } - out.writeInt(baseSchemaHash); - out.writeInt(rollupSchemaHash); - - Text.writeString(out, rollupKeysType.name()); - out.writeShort(rollupShortKeyColumnCount); - - out.writeLong(watershedTxnId); - Text.writeString(out, storageFormat.name()); - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - - int size = in.readInt(); - for (int i = 0; i < size; i++) { - long partitionId = in.readLong(); - int size2 = in.readInt(); - Map tabletIdMap = partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> Maps.newHashMap()); - for (int j = 0; j < size2; j++) { - long rollupTabletId = in.readLong(); - long baseTabletId = in.readLong(); - tabletIdMap.put(rollupTabletId, baseTabletId); - } - - partitionIdToRollupIndex.put(partitionId, MaterializedIndex.read(in)); - } - - baseIndexId = in.readLong(); - rollupIndexId = in.readLong(); - baseIndexName = Text.readString(in); - rollupIndexName = Text.readString(in); - - size = in.readInt(); - for (int i = 0; i < size; i++) { - Column column = Column.read(in); - rollupSchema.add(column); - } - baseSchemaHash = in.readInt(); - rollupSchemaHash = in.readInt(); - - rollupKeysType = KeysType.valueOf(Text.readString(in)); - rollupShortKeyColumnCount = in.readShort(); - - watershedTxnId = in.readLong(); - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_85) { - storageFormat = TStorageFormat.valueOf(Text.readString(in)); - } - } - /** * Replay job in PENDING state. * Should replay all changes before this job's state transfer to PENDING. @@ -773,4 +717,90 @@ public class RollupJobV2 extends AlterJobV2 { this.jobState = jobState; } + private void setColumnsDefineExpr(List items) { + for (MVColumnItem item : items) { + for (Column column : rollupSchema) { + if (column.getName().equals(item.getName())) { + column.setDefineExpr(item.getDefineExpr()); + break; + } + } + } + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this, AlterJobV2.class); + Text.writeString(out, json); + } + + /** + * This method is only used to deserialize the text mate which version is less then 86. + * If the meta version >=86, it will be deserialized by the `read` of AlterJobV2 rather then here. + */ + public static RollupJobV2 read(DataInput in) throws IOException { + Preconditions.checkState(Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86); + RollupJobV2 rollupJob = new RollupJobV2(); + rollupJob.readFields(in); + return rollupJob; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + int size = in.readInt(); + for (int i = 0; i < size; i++) { + long partitionId = in.readLong(); + int size2 = in.readInt(); + Map tabletIdMap = partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> Maps.newHashMap()); + for (int j = 0; j < size2; j++) { + long rollupTabletId = in.readLong(); + long baseTabletId = in.readLong(); + tabletIdMap.put(rollupTabletId, baseTabletId); + } + + partitionIdToRollupIndex.put(partitionId, MaterializedIndex.read(in)); + } + + baseIndexId = in.readLong(); + rollupIndexId = in.readLong(); + baseIndexName = Text.readString(in); + rollupIndexName = Text.readString(in); + + size = in.readInt(); + for (int i = 0; i < size; i++) { + Column column = Column.read(in); + rollupSchema.add(column); + } + baseSchemaHash = in.readInt(); + rollupSchemaHash = in.readInt(); + + rollupKeysType = KeysType.valueOf(Text.readString(in)); + rollupShortKeyColumnCount = in.readShort(); + + watershedTxnId = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_85) { + storageFormat = TStorageFormat.valueOf(Text.readString(in)); + } + } + + @Override + public void gsonPostProcess() throws IOException { + // analyze define stmt + if (origStmt == null) { + return; + } + // parse the define stmt to schema + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt), + SqlModeHelper.MODE_DEFAULT)); + CreateMaterializedViewStmt stmt; + try { + stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, origStmt.idx); + stmt.analyzeSelectClause(); + setColumnsDefineExpr(stmt.getMVColumnItemList()); + } catch (Exception e) { + throw new IOException("error happens when parsing create materialized view stmt: " + origStmt, e); + } + } } diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 71236ec3bf..08838eb3dc 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -38,6 +38,7 @@ import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; @@ -57,8 +58,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.collect.Table; import com.google.common.collect.Table.Cell; +import com.google.gson.annotations.SerializedName; -import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -81,35 +82,44 @@ public class SchemaChangeJobV2 extends AlterJobV2 { private static final Logger LOG = LogManager.getLogger(SchemaChangeJobV2.class); // partition id -> (shadow index id -> (shadow tablet id -> origin tablet id)) + @SerializedName(value = "partitionIndexTabletMap") private Table> partitionIndexTabletMap = HashBasedTable.create(); // partition id -> (shadow index id -> shadow index)) private Table partitionIndexMap = HashBasedTable.create(); // shadow index id -> origin index id + @SerializedName(value = "indexIdMap") private Map indexIdMap = Maps.newHashMap(); // shadow index id -> shadow index name(__doris_shadow_xxx) + @SerializedName(value = "indexIdToName") private Map indexIdToName = Maps.newHashMap(); // shadow index id -> index schema + @SerializedName(value = "indexSchemaMap") private Map> indexSchemaMap = Maps.newHashMap(); // shadow index id -> (shadow index schema version : schema hash) + @SerializedName(value = "indexSchemaVersionAndHashMap") private Map> indexSchemaVersionAndHashMap = Maps.newHashMap(); // shadow index id -> shadow index short key count + @SerializedName(value = "indexShortKeyMap") private Map indexShortKeyMap = Maps.newHashMap(); - // identify whether the job is finished and no need to persist some data - private boolean isMetaPruned = false; - // bloom filter info + @SerializedName(value = "hasBfChange") private boolean hasBfChange; + @SerializedName(value = "bfColumns") private Set bfColumns = null; + @SerializedName(value = "bfFpp") private double bfFpp = 0; // alter index info + @SerializedName(value = "indexChange") private boolean indexChange = false; + @SerializedName(value = "indexes") private List indexes = null; // The schema change job will wait all transactions before this txn id finished, then send the schema change tasks. + @SerializedName(value = "watershedTxnId") protected long watershedTxnId = -1; - + @SerializedName(value = "storageFormat") private TStorageFormat storageFormat = TStorageFormat.DEFAULT; // save all schema change tasks @@ -170,7 +180,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { partitionIndexMap.clear(); indexSchemaMap.clear(); indexShortKeyMap.clear(); - isMetaPruned = true; } /** @@ -642,12 +651,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId)); } - public static SchemaChangeJobV2 read(DataInput in) throws IOException { - SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(); - schemaChangeJob.readFields(in); - return schemaChangeJob; - } - /** * Replay job in PENDING state. * Should replay all changes before this job's state transfer to PENDING. @@ -826,80 +829,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { return taskInfos; } - /** - * write data need to persist when job not finish - */ - private void writeJobNotFinishData(DataOutput out) throws IOException { - out.writeInt(partitionIndexTabletMap.rowKeySet().size()); - for (Long partitionId : partitionIndexTabletMap.rowKeySet()) { - out.writeLong(partitionId); - Map> indexTabletMap = partitionIndexTabletMap.row(partitionId); - out.writeInt(indexTabletMap.size()); - for (Long shadowIndexId : indexTabletMap.keySet()) { - out.writeLong(shadowIndexId); - // tablet id map - Map tabletMap = indexTabletMap.get(shadowIndexId); - out.writeInt(tabletMap.size()); - for (Map.Entry entry : tabletMap.entrySet()) { - out.writeLong(entry.getKey()); - out.writeLong(entry.getValue()); - } - // shadow index - MaterializedIndex shadowIndex = partitionIndexMap.get(partitionId, shadowIndexId); - shadowIndex.write(out); - } - } - - // shadow index info - out.writeInt(indexIdMap.size()); - for (Map.Entry entry : indexIdMap.entrySet()) { - long shadowIndexId = entry.getKey(); - out.writeLong(shadowIndexId); - // index id map - out.writeLong(entry.getValue()); - // index name - Text.writeString(out, indexIdToName.get(shadowIndexId)); - // index schema - out.writeInt(indexSchemaMap.get(shadowIndexId).size()); - for (Column column : indexSchemaMap.get(shadowIndexId)) { - column.write(out); - } - // index schema version and hash - out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first); - out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second); - // short key count - out.writeShort(indexShortKeyMap.get(shadowIndexId)); - } - - // bloom filter - out.writeBoolean(hasBfChange); - if (hasBfChange) { - out.writeInt(bfColumns.size()); - for (String bfCol : bfColumns) { - Text.writeString(out, bfCol); - } - out.writeDouble(bfFpp); - } - - out.writeLong(watershedTxnId); - - // index - out.writeBoolean(indexChange); - if (indexChange) { - if (CollectionUtils.isNotEmpty(indexes)) { - out.writeBoolean(true); - out.writeInt(indexes.size()); - for (Index index : indexes) { - index.write(out); - } - } else { - out.writeBoolean(false); - } - } - - Text.writeString(out, storageFormat.name()); - } - /** * read data need to persist when job not finish */ @@ -982,53 +911,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } } - /** - * write data need to persist when job finished - */ - private void writeJobFinishedData(DataOutput out) throws IOException { - // only persist data will be used in getInfo - out.writeInt(indexIdMap.size()); - for (Entry entry : indexIdMap.entrySet()) { - long shadowIndexId = entry.getKey(); - out.writeLong(shadowIndexId); - // index id map - out.writeLong(entry.getValue()); - // index name - Text.writeString(out, indexIdToName.get(shadowIndexId)); - // index schema version and hash - out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first); - out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second); - } - - // bloom filter - out.writeBoolean(hasBfChange); - if (hasBfChange) { - out.writeInt(bfColumns.size()); - for (String bfCol : bfColumns) { - Text.writeString(out, bfCol); - } - out.writeDouble(bfFpp); - } - - out.writeLong(watershedTxnId); - - // index - out.writeBoolean(indexChange); - if (indexChange) { - if (CollectionUtils.isNotEmpty(indexes)) { - out.writeBoolean(true); - out.writeInt(indexes.size()); - for (Index index : indexes) { - index.write(out); - } - } else { - out.writeBoolean(false); - } - } - - Text.writeString(out, storageFormat.name()); - } - /** * read data need to persist when job finished */ @@ -1082,14 +964,19 @@ public class SchemaChangeJobV2 extends AlterJobV2 { @Override public void write(DataOutput out) throws IOException { - super.write(out); + String json = GsonUtils.GSON.toJson(this, AlterJobV2.class); + Text.writeString(out, json); + } - out.writeBoolean(isMetaPruned); - if (isMetaPruned) { - writeJobFinishedData(out); - } else { - writeJobNotFinishData(out); - } + /** + * This method is only used to deserialize the text mate which version is less then 86. + * If the meta version >=86, it will be deserialized by the `read` of AlterJobV2 rather then here. + */ + public static SchemaChangeJobV2 read(DataInput in) throws IOException { + Preconditions.checkState(Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86); + SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(); + schemaChangeJob.readFields(in); + return schemaChangeJob; } @Override diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java index 5690be3b11..56ec28d769 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java @@ -49,6 +49,8 @@ import java.util.Set; * [PROPERTIES ("key" = "value")] */ public class CreateMaterializedViewStmt extends DdlStmt { + public static final String MATERIALIZED_VIEW_NAME_PRFIX = "__doris_materialized_view_"; + private String mvName; private SelectStmt selectStmt; private Map properties; @@ -100,9 +102,8 @@ public class CreateMaterializedViewStmt extends DdlStmt { @Override public void analyze(Analyzer analyzer) throws UserException { - // TODO(ml): remove it if (!Config.enable_materialized_view) { - throw new AnalysisException("The materialized view is coming soon"); + throw new AnalysisException("The materialized view is disabled"); } super.analyze(analyzer); FeNameFormat.checkTableName(mvName); @@ -128,7 +129,7 @@ public class CreateMaterializedViewStmt extends DdlStmt { } } - private void analyzeSelectClause() throws AnalysisException { + public void analyzeSelectClause() throws AnalysisException { SelectList selectList = selectStmt.getSelectList(); if (selectList.getItems().isEmpty()) { throw new AnalysisException("The materialized view must contain at least one column"); @@ -200,6 +201,7 @@ public class CreateMaterializedViewStmt extends DdlStmt { beginIndexOfAggregation = i; } // TODO(ml): support different type of column, int -> bigint(sum) + // TODO: change the column name of bitmap and hll MVColumnItem mvColumnItem = new MVColumnItem(columnName); mvColumnItem.setAggregationType(AggregateType.valueOf(functionName.toUpperCase()), false); mvColumnItem.setDefineExpr(defineExpr); diff --git a/fe/src/main/java/org/apache/doris/analysis/StatementBase.java b/fe/src/main/java/org/apache/doris/analysis/StatementBase.java index 74100283fd..3fd62aa406 100644 --- a/fe/src/main/java/org/apache/doris/analysis/StatementBase.java +++ b/fe/src/main/java/org/apache/doris/analysis/StatementBase.java @@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.rewrite.ExprRewriter; import com.google.common.base.Preconditions; @@ -46,6 +47,8 @@ public abstract class StatementBase implements ParseNode { // END: Members that need to be reset() ///////////////////////////////////////// + private OriginStatement origStmt; + protected StatementBase() { } /** @@ -153,7 +156,17 @@ public abstract class StatementBase implements ParseNode { public void setClusterName(String clusterName) { this.clusterName = clusterName; - } + } + + public void setOrigStmt(OriginStatement origStmt) { + Preconditions.checkState(origStmt != null); + this.origStmt = origStmt; + } + + public OriginStatement getOrigStmt() { + return origStmt; + } + /** * Resets the internal analysis state of this node. * For easier maintenance, class members that need to be reset are grouped into diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 4c44409d21..cf95e6ddbb 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4903,7 +4903,8 @@ public class Catalog { this.alter.processAlterView(stmt, ConnectContext.get()); } - public void createMaterializedView(CreateMaterializedViewStmt stmt) throws AnalysisException, DdlException { + public void createMaterializedView(CreateMaterializedViewStmt stmt) + throws AnalysisException, DdlException { this.alter.processCreateMaterializedView(stmt); } diff --git a/fe/src/main/java/org/apache/doris/catalog/Column.java b/fe/src/main/java/org/apache/doris/catalog/Column.java index d1ce8a6e60..461cf6a1b7 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/src/main/java/org/apache/doris/catalog/Column.java @@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TColumnType; @@ -67,7 +68,7 @@ public class Column implements Writable { private String comment; @SerializedName(value = "stats") private ColumnStats stats; // cardinality and selectivity etc. - private Expr defineExpr; //use to define materialize view + private Expr defineExpr; // use to define column in materialize view public Column() { this.name = ""; @@ -411,31 +412,11 @@ public class Column implements Writable { @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, name); - ColumnType.write(out, type); - if (null == aggregationType) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - Text.writeString(out, aggregationType.name()); - out.writeBoolean(isAggregationTypeImplicit); - } - - out.writeBoolean(isKey); - out.writeBoolean(isAllowNull); - - if (defaultValue == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - Text.writeString(out, defaultValue); - } - stats.write(out); - - Text.writeString(out, comment); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } - public void readFields(DataInput in) throws IOException { + private void readFields(DataInput in) throws IOException { name = Text.readString(in); type = ColumnType.read(in); boolean notNull = in.readBoolean(); @@ -475,8 +456,13 @@ public class Column implements Writable { } public static Column read(DataInput in) throws IOException { - Column column = new Column(); - column.readFields(in); - return column; + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86) { + Column column = new Column(); + column.readFields(in); + return column; + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, Column.class); + } } } diff --git a/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java b/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java index 25a8bb5321..2e224f9c20 100644 --- a/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java +++ b/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java @@ -17,9 +17,17 @@ package org.apache.doris.catalog; +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.MVColumnItem; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.SqlModeHelper; import org.apache.doris.thrift.TStorageType; import com.google.common.base.Preconditions; @@ -29,9 +37,10 @@ import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.StringReader; import java.util.List; -public class MaterializedIndexMeta implements Writable { +public class MaterializedIndexMeta implements Writable, GsonPostProcessable { @SerializedName(value = "indexId") private long indexId; @SerializedName(value = "schema") @@ -46,9 +55,11 @@ public class MaterializedIndexMeta implements Writable { private TStorageType storageType; @SerializedName(value = "keysType") private KeysType keysType; + @SerializedName(value = "defineStmt") + private OriginStatement defineStmt; - public MaterializedIndexMeta(long indexId, List schema, int schemaVersion, int - schemaHash, short shortKeyColumnCount, TStorageType storageType, KeysType keysType) { + public MaterializedIndexMeta(long indexId, List schema, int schemaVersion, int schemaHash, + short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement defineStmt) { this.indexId = indexId; Preconditions.checkState(schema != null); Preconditions.checkState(schema.size() != 0); @@ -60,6 +71,7 @@ public class MaterializedIndexMeta implements Writable { this.storageType = storageType; Preconditions.checkState(keysType != null); this.keysType = keysType; + this.defineStmt = defineStmt; } public long getIndexId() { @@ -94,6 +106,17 @@ public class MaterializedIndexMeta implements Writable { return schemaVersion; } + private void setColumnsDefineExpr(List items) { + for (MVColumnItem item : items) { + for (Column column : schema) { + if (column.getName().equals(item.getName())) { + column.setDefineExpr(item.getDefineExpr()); + break; + } + } + } + } + @Override public boolean equals(Object obj) { if (!(obj instanceof MaterializedIndexMeta)) { @@ -134,4 +157,23 @@ public class MaterializedIndexMeta implements Writable { return GsonUtils.GSON.fromJson(json, MaterializedIndexMeta.class); } + @Override + public void gsonPostProcess() throws IOException { + // analyze define stmt + if (defineStmt == null) { + return; + } + // parse the define stmt to schema + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(defineStmt.originStmt), + SqlModeHelper.MODE_DEFAULT)); + CreateMaterializedViewStmt stmt; + try { + stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, defineStmt.idx); + stmt.analyzeSelectClause(); + setColumnsDefineExpr(stmt.getMVColumnItemList()); + } catch (Exception e) { + throw new IOException("error happens when parsing create materialized view stmt: " + defineStmt, e); + } + } + } diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java index f307a5f593..b4d60f6000 100644 --- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -38,6 +38,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.RangeUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TOlapTable; import org.apache.doris.thrift.TStorageFormat; @@ -246,6 +247,12 @@ public class OlapTable extends Table { public void setIndexMeta(long indexId, String indexName, List schema, int schemaVersion, int schemaHash, short shortKeyColumnCount, TStorageType storageType, KeysType keysType) { + setIndexMeta(indexId, indexName, schema, schemaVersion, schemaHash, shortKeyColumnCount, storageType, keysType, + null); + } + + public void setIndexMeta(long indexId, String indexName, List schema, int schemaVersion, int schemaHash, + short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement origStmt) { // Nullable when meta comes from schema change log replay. // The replay log only save the index id, so we need to get name by id. if (indexName == null) { @@ -268,7 +275,7 @@ public class OlapTable extends Table { } MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, schema, schemaVersion, - schemaHash, shortKeyColumnCount, storageType, keysType); + schemaHash, shortKeyColumnCount, storageType, keysType, origStmt); indexIdToMeta.put(indexId, indexMeta); indexNameToId.put(indexName, indexId); } @@ -970,7 +977,7 @@ public class OlapTable extends Table { // The keys type in here is incorrect MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, schema, - schemaVersion, schemaHash, shortKeyColumnCount, storageType, KeysType.AGG_KEYS); + schemaVersion, schemaHash, shortKeyColumnCount, storageType, KeysType.AGG_KEYS, null); tmpIndexMetaList.add(indexMeta); } else { MaterializedIndexMeta indexMeta = MaterializedIndexMeta.read(in); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 5cdf4f6a3a..c29921f205 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -1060,7 +1060,7 @@ public class Config extends ConfigBase { * control materialized view */ @ConfField(mutable = true, masterOnly = true) - public static boolean enable_materialized_view = false; + public static boolean enable_materialized_view = true; /** * it can't auto-resume routine load job as long as one of the backends is down diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index f4d1f8cd75..dabd7a800d 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -181,6 +181,8 @@ public final class FeMetaVersion { public static final int VERSION_84 = 84; // add storage format in rollup job public static final int VERSION_85 = 85; + // serialize origStmt in rollupJob and mv meta + public static final int VERSION_86 = 86; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_85; + public static final int VERSION_CURRENT = VERSION_86; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index c4d7d2c271..8d817f0f84 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -124,7 +124,7 @@ public class BrokerLoadJob extends LoadJob { } } - public static BrokerLoadJob fromLoadStmt(LoadStmt stmt, OriginStatement originStmt) throws DdlException { + public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { // get db id String dbName = stmt.getLabel().getDbName(); Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName()); @@ -135,7 +135,7 @@ public class BrokerLoadJob extends LoadJob { // create job try { BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(), - stmt.getBrokerDesc(), originStmt); + stmt.getBrokerDesc(), stmt.getOrigStmt()); brokerLoadJob.setJobProperties(stmt.getProperties()); brokerLoadJob.checkAndSetDataSourceInfo(db, stmt.getDataDescriptions()); return brokerLoadJob; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index e123e419a8..e0cb91a716 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -39,7 +39,6 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; -import org.apache.doris.qe.OriginStatement; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TMiniLoadBeginRequest; import org.apache.doris.thrift.TMiniLoadRequest; @@ -98,7 +97,7 @@ public class LoadManager implements Writable{ * @param stmt * @throws DdlException */ - public void createLoadJobFromStmt(LoadStmt stmt, OriginStatement originStmt) throws DdlException { + public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException { Database database = checkDb(stmt.getLabel().getDbName()); long dbId = database.getId(); LoadJob loadJob = null; @@ -112,7 +111,7 @@ public class LoadManager implements Writable{ throw new DdlException("There are more then " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, " + "please retry later."); } - loadJob = BrokerLoadJob.fromLoadStmt(stmt, originStmt); + loadJob = BrokerLoadJob.fromLoadStmt(stmt); createLoadJob(loadJob); } finally { writeUnlock(); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 0efb6fefe6..061d7fb169 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -37,7 +37,6 @@ import org.apache.doris.common.util.LogKey; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.OriginStatement; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -117,7 +116,7 @@ public class RoutineLoadManager implements Writable { } - public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt, OriginStatement origStmt) + public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt) throws UserException { // check load auth if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), @@ -141,7 +140,7 @@ public class RoutineLoadManager implements Writable { throw new UserException("Unknown data source type: " + type); } - routineLoadJob.setOrigStmt(origStmt); + routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt()); addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName()); } diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java b/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java index c09f2ad559..6733d7948c 100644 --- a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java +++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java @@ -17,6 +17,8 @@ package org.apache.doris.persist.gson; +import java.io.IOException; + public interface GsonPostProcessable { - public void gsonPostProcess(); + public void gsonPostProcess() throws IOException; } diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 033ab036e8..10e1ad7f30 100644 --- a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -17,6 +17,9 @@ package org.apache.doris.persist.gson; +import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.alter.RollupJobV2; +import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.HashDistributionInfo; @@ -93,6 +96,12 @@ public class GsonUtils { .of(Resource.class, "clazz") .registerSubtype(SparkResource.class, SparkResource.class.getSimpleName()); + // runtime adapter for class "AlterJobV2" + private static RuntimeTypeAdapterFactory alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory + .of(AlterJobV2.class, "clazz") + .registerSubtype(RollupJobV2.class, RollupJobV2.class.getSimpleName()) + .registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName()); + // the builder of GSON instance. // Add any other adapters if necessary. private static final GsonBuilder GSON_BUILDER = new GsonBuilder() @@ -103,7 +112,8 @@ public class GsonUtils { .registerTypeAdapterFactory(new PostProcessTypeAdapterFactory()) .registerTypeAdapterFactory(columnTypeAdapterFactory) .registerTypeAdapterFactory(distributionInfoTypeAdapterFactory) - .registerTypeAdapterFactory(resourceTypeAdapterFactory); + .registerTypeAdapterFactory(resourceTypeAdapterFactory) + .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory); // this instance is thread-safe. public static final Gson GSON = GSON_BUILDER.create(); diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java index d94733a347..83ef187664 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -178,7 +178,8 @@ public class ConnectProcessor { ctx.resetRetureRows(); } parsedStmt = stmts.get(i); - executor = new StmtExecutor(ctx, parsedStmt, new OriginStatement(originStmt, i)); + parsedStmt.setOrigStmt(new OriginStatement(originStmt, i)); + executor = new StmtExecutor(ctx, parsedStmt); executor.execute(); if (i != stmts.size() - 1) { diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index 284daf6e39..db1a0e5e59 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -82,8 +82,7 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.Load; public class DdlExecutor { - public static void execute(Catalog catalog, DdlStmt ddlStmt, OriginStatement origStmt) - throws DdlException, QueryStateException, Exception { + public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { if (ddlStmt instanceof CreateClusterStmt) { CreateClusterStmt stmt = (CreateClusterStmt) ddlStmt; catalog.createCluster(stmt); @@ -132,7 +131,7 @@ public class DdlExecutor { if (loadStmt.getVersion().equals(Load.VERSION) || jobType == EtlJobType.HADOOP) { catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis()); } else { - catalog.getLoadManager().createLoadJobFromStmt(loadStmt, origStmt); + catalog.getLoadManager().createLoadJobFromStmt(loadStmt); } } else if (ddlStmt instanceof CancelLoadStmt) { if (catalog.getLoadInstance().isLabelExist( @@ -142,7 +141,7 @@ public class DdlExecutor { catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt); } } else if (ddlStmt instanceof CreateRoutineLoadStmt) { - catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt, origStmt); + catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt); } else if (ddlStmt instanceof PauseRoutineLoadStmt) { catalog.getRoutineLoadManager().pauseRoutineLoadJob((PauseRoutineLoadStmt) ddlStmt); } else if (ddlStmt instanceof ResumeRoutineLoadStmt) { diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index b5ccb65eba..114f9cf30c 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -128,12 +128,12 @@ public class StmtExecutor { } // constructor for receiving parsed stmt from connect processor - public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt, OriginStatement originStmt) { + public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt) { this.context = ctx; - this.originStmt = originStmt; + this.parsedStmt = parsedStmt; + this.originStmt = parsedStmt.getOrigStmt(); this.serializer = context.getSerializer(); this.isProxy = false; - this.parsedStmt = parsedStmt; } // At the end of query execution, we begin to add up profile @@ -376,7 +376,7 @@ public class StmtExecutor { SqlParser parser = new SqlParser(input); try { parsedStmt = SqlParserUtils.getStmt(parser, originStmt.idx); - + parsedStmt.setOrigStmt(originStmt); } catch (Error e) { LOG.info("error happened when parsing stmt {}, id: {}", originStmt, context.getStmtId(), e); throw new AnalysisException("sql parsing error, please check your sql"); @@ -889,7 +889,7 @@ public class StmtExecutor { private void handleDdlStmt() { try { - DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt, originStmt); + DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt); context.getState().setOk(); } catch (QueryStateException e) { context.setState(e.getQueryState()); diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 35429d0dc6..e3fd856137 100644 --- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -24,6 +24,13 @@ import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.analysis.AddRollupClause; import org.apache.doris.analysis.AlterClause; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.FunctionName; +import org.apache.doris.analysis.MVColumnItem; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; @@ -34,7 +41,6 @@ import org.apache.doris.catalog.FakeEditLog; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; -import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.catalog.Partition; @@ -48,6 +54,7 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.meta.MetaContext; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.thrift.TStorageFormat; @@ -60,7 +67,6 @@ import com.google.common.collect.Lists; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import java.io.DataInputStream; @@ -367,15 +373,21 @@ public class RollupJobV2Test { @Test - public void testSerializeOfRollupJob() throws IOException { + public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt) throws IOException { // prepare file File file = new File(fileName); file.createNewFile(); DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); short keysCount = 1; - RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup",Lists.newArrayList(), 1, 1, - KeysType.AGG_KEYS, keysCount); + List columns = Lists.newArrayList(); + String mvColumnName =CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + "c1"; + Column column = new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", ""); + columns.add(column); + RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, 1, 1, + KeysType.AGG_KEYS, keysCount, + new OriginStatement("create materialized view rollup as select bitmap_union(to_bitmap(c1)) from test", + 0)); rollupJobV2.setStorageFormat(TStorageFormat.V2); // write rollup job @@ -383,15 +395,37 @@ public class RollupJobV2Test { out.flush(); out.close(); + List itemList = Lists.newArrayList(); + MVColumnItem item = new MVColumnItem( + mvColumnName); + List params = Lists.newArrayList(); + SlotRef param1 = new SlotRef(new TableName(null, "test"), "c1"); + params.add(param1); + item.setDefineExpr(new FunctionCallExpr(new FunctionName("to_bitmap"), params)); + itemList.add(item); + new Expectations() { + { + stmt.getMVColumnItemList(); + result = itemList; + } + }; + // read objects from file MetaContext metaContext = new MetaContext(); - metaContext.setMetaVersion(FeMetaVersion.VERSION_85); + metaContext.setMetaVersion(FeMetaVersion.VERSION_86); metaContext.setThreadLocalInfo(); - DataInputStream in = new DataInputStream(new FileInputStream(file)); + DataInputStream in = new DataInputStream(new FileInputStream(file)); RollupJobV2 result = (RollupJobV2) AlterJobV2.read(in); - Catalog.getCurrentCatalogJournalVersion(); Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat")); + List resultColumns = Deencapsulation.getField(result, "rollupSchema"); + Assert.assertEquals(1, resultColumns.size()); + Column resultColumn1 = resultColumns.get(0); + Assert.assertEquals(mvColumnName, + resultColumn1.getName()); + Assert.assertTrue(resultColumn1.getDefineExpr() instanceof FunctionCallExpr); + FunctionCallExpr resultFunctionCall = (FunctionCallExpr) resultColumn1.getDefineExpr(); + Assert.assertEquals("to_bitmap", resultFunctionCall.getFnName().getFunction()); } } diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index 91a745bc18..86970286db 100644 --- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -53,9 +53,11 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.meta.MetaContext; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.FakeTransactionIDGenerator; import org.apache.doris.transaction.GlobalTransactionMgr; @@ -66,6 +68,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.HashMap; @@ -74,6 +82,8 @@ import java.util.Map; public class SchemaChangeJobV2Test { + private static String fileName = "./SchemaChangeV2Test"; + private static FakeEditLog fakeEditLog; private static FakeCatalog fakeCatalog; private static FakeTransactionIDGenerator fakeTransactionIDGenerator; @@ -372,4 +382,33 @@ public class SchemaChangeJobV2Test { modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p", DynamicPartitionProperty.ENABLE); modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30", DynamicPartitionProperty.ENABLE); } + + @Test + public void testSerializeOfSchemaChangeJob() throws IOException { + // prepare file + File file = new File(fileName); + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + + SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1,1, "test",600000); + schemaChangeJobV2.setStorageFormat(TStorageFormat.V2); + Deencapsulation.setField(schemaChangeJobV2, "jobState", AlterJobV2.JobState.FINISHED); + + + // write schema change job + schemaChangeJobV2.write(out); + out.flush(); + out.close(); + + // read objects from file + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_86); + metaContext.setThreadLocalInfo(); + + DataInputStream in = new DataInputStream(new FileInputStream(file)); + SchemaChangeJobV2 result = (SchemaChangeJobV2) AlterJobV2.read(in); + Assert.assertEquals(1, result.getJobId()); + Assert.assertEquals(AlterJobV2.JobState.FINISHED, result.getJobState()); + Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat")); + } } diff --git a/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java b/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java index 6f310554c9..04a46f99e2 100644 --- a/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java @@ -74,8 +74,7 @@ public class ColumnTest { // 2. Read objects from file DataInputStream dis = new DataInputStream(new FileInputStream(file)); - Column rColumn1 = new Column(); - rColumn1.readFields(dis); + Column rColumn1 = Column.read(dis); Assert.assertEquals("user", rColumn1.getName()); Assert.assertEquals(PrimitiveType.CHAR, rColumn1.getDataType()); Assert.assertEquals(AggregateType.SUM, rColumn1.getAggregationType()); diff --git a/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java b/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java index 9d8af1aa13..f661e6239d 100644 --- a/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java @@ -17,6 +17,14 @@ package org.apache.doris.catalog; +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.FunctionName; +import org.apache.doris.analysis.MVColumnItem; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableName; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TStorageType; import com.google.common.collect.Lists; @@ -33,6 +41,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.List; +import mockit.Expectations; +import mockit.Mocked; + public class MaterializedIndexMetaTest { private static String fileName = "./MaterializedIndexMetaSerializeTest"; @@ -44,12 +55,13 @@ public class MaterializedIndexMetaTest { } @Test - public void testSerializeMaterializedIndexMeta() throws IOException { + public void testSerializeMaterializedIndexMeta(@Mocked CreateMaterializedViewStmt stmt) throws IOException { // 1. Write objects to file File file = new File(fileName); file.createNewFile(); DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + String mvColumnName = CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + "k1"; List schema = Lists.newArrayList(); schema.add(new Column("k1", Type.TINYINT, true, null, true, "1", "abc")); schema.add(new Column("k2", Type.SMALLINT, true, null, true, "1", "debug")); @@ -63,19 +75,48 @@ public class MaterializedIndexMetaTest { schema.add(new Column("k10", Type.VARCHAR, true, null, true, "1", "")); schema.add(new Column("k11", Type.DECIMALV2, true, null, true, "1", "")); schema.add(new Column("k12", Type.INT, true, null, true, "1", "")); - schema.add(new Column("v1", Type.INT, true, AggregateType.SUM, true, "1", "")); - schema.add(new Column("v1", Type.VARCHAR, true, AggregateType.REPLACE, true, "1", "")); + schema.add(new Column("v1", Type.INT, false, AggregateType.SUM, true, "1", "")); + schema.add(new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", "")); short shortKeyColumnCount = 1; MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(1, schema, 1, 1, shortKeyColumnCount, - TStorageType.COLUMN, KeysType.DUP_KEYS); + TStorageType.COLUMN, KeysType.DUP_KEYS, new OriginStatement( + "create materialized view test as select k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12, sum(v1), " + + "bitmap_union(to_bitmap(k1)) from test group by k1, k2, k3, k4, k5, " + + "k6, k7, k8, k9, k10, k11, k12", + 0)); indexMeta.write(out); out.flush(); out.close(); + List itemList = Lists.newArrayList(); + MVColumnItem item = new MVColumnItem(mvColumnName); + List params = Lists.newArrayList(); + SlotRef param1 = new SlotRef(new TableName(null, "test"), "c1"); + params.add(param1); + item.setDefineExpr(new FunctionCallExpr(new FunctionName("to_bitmap"), params)); + itemList.add(item); + new Expectations() { + { + stmt.getMVColumnItemList(); + result = itemList; + } + }; + + // 2. Read objects from file DataInputStream in = new DataInputStream(new FileInputStream(file)); - MaterializedIndexMeta readIndexMeta = MaterializedIndexMeta.read(in); - Assert.assertEquals(indexMeta, readIndexMeta); + Assert.assertEquals(1, readIndexMeta.getIndexId()); + List resultColumns = readIndexMeta.getSchema(); + for (Column column : resultColumns) { + if (column.getName().equals(mvColumnName)) { + Assert.assertTrue(column.getDefineExpr() instanceof FunctionCallExpr); + Assert.assertEquals(Type.BITMAP, column.getType()); + Assert.assertEquals(AggregateType.BITMAP_UNION, column.getAggregationType()); + Assert.assertEquals("to_bitmap", ((FunctionCallExpr) column.getDefineExpr()).getFnName().getFunction()); + } else { + Assert.assertEquals(null, column.getDefineExpr()); + } + } } } diff --git a/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java b/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java index 849848d7d7..b39b3c0454 100644 --- a/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java @@ -560,10 +560,6 @@ public class TempPartitionTest { } private void testSerializeOlapTable(OlapTable tbl) throws IOException, AnalysisException { - MetaContext metaContext = new MetaContext(); - metaContext.setMetaVersion(FeMetaVersion.VERSION_77); - metaContext.setThreadLocalInfo(); - // 1. Write objects to file File file = new File(tempPartitionFile); file.createNewFile(); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 9c7192ae63..7991343688 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -71,14 +71,10 @@ public class BrokerLoadJobTest { @Injectable LabelName labelName, @Injectable DataDescription dataDescription, @Mocked Catalog catalog, - @Injectable Database database, - @Injectable BrokerDesc brokerDesc, - @Injectable String originStmt) { + @Injectable Database database) { List dataDescriptionList = Lists.newArrayList(); dataDescriptionList.add(dataDescription); - String label = "label"; - long dbId = 1; String tableName = "table"; String databaseName = "database"; new Expectations() { @@ -105,7 +101,7 @@ public class BrokerLoadJobTest { }; try { - BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, new OriginStatement(originStmt, 0)); + BrokerLoadJob.fromLoadStmt(loadStmt); Assert.fail(); } catch (DdlException e) { System.out.println("could not find table named " + tableName); @@ -119,8 +115,7 @@ public class BrokerLoadJobTest { @Injectable LabelName labelName, @Injectable Database database, @Injectable OlapTable olapTable, - @Mocked Catalog catalog, - @Injectable String originStmt) { + @Mocked Catalog catalog) { String label = "label"; long dbId = 1; @@ -170,7 +165,7 @@ public class BrokerLoadJobTest { }; try { - BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, new OriginStatement(originStmt, 0)); + BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt); Assert.assertEquals(Long.valueOf(dbId), Deencapsulation.getField(brokerLoadJob, "dbId")); Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label")); Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state")); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 56551877f9..8469aca809 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -94,6 +94,7 @@ public class RoutineLoadManagerTest { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); + createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, serverAddress, topicName); @@ -116,7 +117,7 @@ public class RoutineLoadManagerTest { } }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new OriginStatement("dummy", 0)); + routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt); Map idToRoutineLoadJob = Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob"); @@ -162,6 +163,7 @@ public class RoutineLoadManagerTest { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); + createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); new Expectations() { @@ -176,7 +178,7 @@ public class RoutineLoadManagerTest { }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); try { - routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new OriginStatement("dummy", 0)); + routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt); Assert.fail(); } catch (LoadException | DdlException e) { Assert.fail(); diff --git a/fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java b/fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java new file mode 100644 index 0000000000..f8c5d46bc4 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist.gson; + +import org.apache.doris.thrift.TStorageFormat; + +import org.junit.Assert; +import org.junit.Test; + +public class ThriftToJsonTest { + + @Test + public void testTEnumToJson() { + // write + String serializeString = GsonUtils.GSON.toJson(TStorageFormat.V1); + // read + TStorageFormat tStorageFormat = GsonUtils.GSON.fromJson(serializeString, TStorageFormat.class); + Assert.assertEquals(TStorageFormat.V1, tStorageFormat); + } +} diff --git a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 84843633ff..c2f701c8b3 100644 --- a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -577,7 +577,7 @@ public class StmtExecutorTest { new Expectations(ddlExecutor) { { // Mock ddl - DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any); + DdlExecutor.execute((Catalog) any, (DdlStmt) any); minTimes = 0; } }; @@ -610,7 +610,7 @@ public class StmtExecutorTest { new Expectations(ddlExecutor) { { // Mock ddl - DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any); + DdlExecutor.execute((Catalog) any, (DdlStmt) any); minTimes = 0; result = new DdlException("ddl fail"); } @@ -644,7 +644,7 @@ public class StmtExecutorTest { new Expectations(ddlExecutor) { { // Mock ddl - DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any); + DdlExecutor.execute((Catalog) any, (DdlStmt) any); minTimes = 0; result = new Exception("bug"); }