Serialize origin stmt in Rollup Job and MV Meta (#3705)

* Serialize origin stmt in Rollup Job and MV Meta

In materialized view 2.0, the define expr is serialized in column.
The method is that doris serialzie the origin stmt of Create Materialzied View Stmt in RollupJobV2 and MVMeta.
The define expr will be extract from the origin stmt after meta is deserialized.

The define expr is necessary for bitmap and hll materialized view.
For example:
MV meta: __doris_mv_bitmap_k1, bitmap_union, to_bitmap(k1)
Origin stmt: select bitmap_union(to_bitmap(k1)) from table
Deserialize meta: __doris_mv_bitmap_k1, bitmap_union, null
After extract: the define expr `to_bitmap(k1)` from origin stmt should be extracted.
               __doris_mv_bitmap_v1, bitmap_union, to_bitmap(k1) (which comes from the origin stmt)

Change-Id: Ic2da093188d8985f5e97be5bd094e5d60d82c9a7

* Add comment of read method

Change-Id: I4e1e0f4ad0f6e76cdc43e49938de768ec3b0a0e8

* Fix ut

Change-Id: I2be257d512bf541f00912a374a2e07a039fc42b4

* Change code style

Change-Id: I3ab23f5c94ae781167f498fefde2d96e42e05bf9
This commit is contained in:
EmmyMiao87
2020-05-30 20:17:46 +08:00
committed by GitHub
parent 5cb4063904
commit 30df9fcae9
30 changed files with 478 additions and 339 deletions

View File

@ -80,7 +80,8 @@ public class Alter {
clusterHandler.start();
}
public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) throws DdlException, AnalysisException {
public void processCreateMaterializedView(CreateMaterializedViewStmt stmt)
throws DdlException, AnalysisException {
String tableName = stmt.getBaseIndexName();
// check db
String dbName = stmt.getDBName();
@ -102,7 +103,8 @@ public class Alter {
OlapTable olapTable = (OlapTable) table;
olapTable.checkStableAndNormal(db.getClusterName());
((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt, db, olapTable);
((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt, db,
olapTable);
} finally {
db.writeUnlock();
}

View File

@ -22,10 +22,13 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -59,17 +62,27 @@ public abstract class AlterJobV2 implements Writable {
ROLLUP, SCHEMA_CHANGE
}
@SerializedName(value = "type")
protected JobType type;
@SerializedName(value = "jobId")
protected long jobId;
@SerializedName(value = "jobState")
protected JobState jobState;
@SerializedName(value = "dbId")
protected long dbId;
@SerializedName(value = "tableId")
protected long tableId;
@SerializedName(value = "tableName")
protected String tableName;
@SerializedName(value = "errMsg")
protected String errMsg = "";
@SerializedName(value = "createTimeMs")
protected long createTimeMs = -1;
@SerializedName(value = "finishedTimeMs")
protected long finishedTimeMs = -1;
@SerializedName(value = "timeoutMs")
protected long timeoutMs = -1;
public AlterJobV2(long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) {
@ -220,15 +233,20 @@ public abstract class AlterJobV2 implements Writable {
public abstract void replay(AlterJobV2 replayedJob);
public static AlterJobV2 read(DataInput in) throws IOException {
JobType type = JobType.valueOf(Text.readString(in));
switch (type) {
case ROLLUP:
return RollupJobV2.read(in);
case SCHEMA_CHANGE:
return SchemaChangeJobV2.read(in);
default:
Preconditions.checkState(false);
return null;
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86) {
JobType type = JobType.valueOf(Text.readString(in));
switch (type) {
case ROLLUP:
return RollupJobV2.read(in);
case SCHEMA_CHANGE:
return SchemaChangeJobV2.read(in);
default:
Preconditions.checkState(false);
return null;
}
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, AlterJobV2.class);
}
}

View File

@ -57,6 +57,7 @@ import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
@ -89,7 +90,6 @@ import java.util.concurrent.ConcurrentHashMap;
public class MaterializedViewHandler extends AlterHandler {
private static final Logger LOG = LogManager.getLogger(MaterializedViewHandler.class);
public static final String NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX = "__v2_";
public static final String MATERIALIZED_VIEW_NAME_PRFIX = "__doris_materialized_view_";
public MaterializedViewHandler() {
super("materialized view");
@ -200,7 +200,7 @@ public class MaterializedViewHandler extends AlterHandler {
// Step2: create mv job
RollupJobV2 rollupJobV2 = createMaterializedViewJob(mvIndexName, baseIndexName, mvColumns, addMVClause
.getProperties(), olapTable, db, baseIndexId, addMVClause.getMVKeysType());
.getProperties(), olapTable, db, baseIndexId, addMVClause.getMVKeysType(), addMVClause.getOrigStmt());
addAlterJobV2(rollupJobV2);
@ -264,7 +264,7 @@ public class MaterializedViewHandler extends AlterHandler {
// step 3 create rollup job
RollupJobV2 alterJobV2 = createMaterializedViewJob(rollupIndexName, baseIndexName, rollupSchema, addRollupClause.getProperties(),
olapTable, db, baseIndexId, olapTable.getKeysType());
olapTable, db, baseIndexId, olapTable.getKeysType(), null);
rollupNameJobMap.put(addRollupClause.getRollupName(), alterJobV2);
logJobIdSet.add(alterJobV2.getJobId());
@ -313,7 +313,7 @@ public class MaterializedViewHandler extends AlterHandler {
*/
private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexName,
List<Column> mvColumns, Map<String, String> properties, OlapTable
olapTable, Database db, long baseIndexId, KeysType mvKeysType)
olapTable, Database db, long baseIndexId, KeysType mvKeysType, OriginStatement origStmt)
throws DdlException, AnalysisException {
if (mvKeysType == null) {
// assign rollup index's key type, same as base index's
@ -337,7 +337,7 @@ public class MaterializedViewHandler extends AlterHandler {
RollupJobV2 mvJob = new RollupJobV2(jobId, dbId, tableId, olapTable.getName(), timeoutMs,
baseIndexId, mvIndexId, baseIndexName, mvName,
mvColumns, baseSchemaHash, mvSchemaHash,
mvKeysType, mvShortKeyColumnCount);
mvKeysType, mvShortKeyColumnCount, origStmt);
String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName();
if (mvName.equals(newStorageFormatIndexName)) {
mvJob.setStorageFormat(TStorageFormat.V2);
@ -465,10 +465,8 @@ public class MaterializedViewHandler extends AlterHandler {
if (mvColumnItem.getDefineExpr() != null) {
if (mvAggregationType.equals(AggregateType.BITMAP_UNION)) {
newMVColumn.setType(Type.BITMAP);
newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + baseColumn.getName());
} else if (mvAggregationType.equals(AggregateType.HLL_UNION)){
newMVColumn.setType(Type.HLL);
newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + "hll_" + baseColumn.getName());
} else {
throw new DdlException("The define expr of column is only support bitmap_union or hll_union");
}

View File

@ -17,6 +17,10 @@
package org.apache.doris.alter;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.MVColumnItem;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@ -36,7 +40,12 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
@ -53,6 +62,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -60,6 +70,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -70,30 +81,45 @@ import java.util.concurrent.TimeUnit;
* This is for replacing the old RollupJob
* https://github.com/apache/incubator-doris/issues/1429
*/
public class RollupJobV2 extends AlterJobV2 {
public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(RollupJobV2.class);
// partition id -> (rollup tablet id -> base tablet id)
@SerializedName(value = "partitionIdToBaseRollupTabletIdMap")
private Map<Long, Map<Long, Long>> partitionIdToBaseRollupTabletIdMap = Maps.newHashMap();
@SerializedName(value = "partitionIdToRollupIndex")
private Map<Long, MaterializedIndex> partitionIdToRollupIndex = Maps.newHashMap();
// rollup and base schema info
@SerializedName(value = "baseIndexId")
private long baseIndexId;
@SerializedName(value = "rollupIndexId")
private long rollupIndexId;
@SerializedName(value = "baseIndexName")
private String baseIndexName;
@SerializedName(value = "rollupIndexName")
private String rollupIndexName;
@SerializedName(value = "rollupSchema")
private List<Column> rollupSchema = Lists.newArrayList();
@SerializedName(value = "baseSchemaHash")
private int baseSchemaHash;
@SerializedName(value = "rollupSchemaHash")
private int rollupSchemaHash;
@SerializedName(value = "rollupKeysType")
private KeysType rollupKeysType;
@SerializedName(value = "rollupShortKeyColumnCount")
private short rollupShortKeyColumnCount;
@SerializedName(value = "origStmt")
private OriginStatement origStmt;
// optional
@SerializedName(value = "storageFormat")
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
// The rollup job will wait all transactions before this txn id finished, then send the rollup tasks.
@SerializedName(value = "watershedTxnId")
protected long watershedTxnId = -1;
// save all create rollup tasks
@ -101,8 +127,8 @@ public class RollupJobV2 extends AlterJobV2 {
public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs,
long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName,
List<Column> rollupSchema, int baseSchemaHash, int rollupSchemaHash,
KeysType rollupKeysType, short rollupShortKeyColumnCount) {
List<Column> rollupSchema, int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType,
short rollupShortKeyColumnCount, OriginStatement origStmt) {
super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs);
this.baseIndexId = baseIndexId;
@ -115,6 +141,8 @@ public class RollupJobV2 extends AlterJobV2 {
this.rollupSchemaHash = rollupSchemaHash;
this.rollupKeysType = rollupKeysType;
this.rollupShortKeyColumnCount = rollupShortKeyColumnCount;
this.origStmt = origStmt;
}
private RollupJobV2() {
@ -278,7 +306,7 @@ public class RollupJobV2 extends AlterJobV2 {
}
tbl.setIndexMeta(rollupIndexId, rollupIndexName, rollupSchema, 0 /* init schema version */,
rollupSchemaHash, rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType);
rollupSchemaHash, rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType, origStmt);
}
/**
@ -506,90 +534,6 @@ public class RollupJobV2 extends AlterJobV2 {
return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId));
}
public static RollupJobV2 read(DataInput in) throws IOException {
RollupJobV2 rollupJob = new RollupJobV2();
rollupJob.readFields(in);
return rollupJob;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(partitionIdToRollupIndex.size());
for (long partitionId : partitionIdToRollupIndex.keySet()) {
out.writeLong(partitionId);
out.writeInt(partitionIdToBaseRollupTabletIdMap.get(partitionId).size());
for (Map.Entry<Long, Long> entry : partitionIdToBaseRollupTabletIdMap.get(partitionId).entrySet()) {
out.writeLong(entry.getKey());
out.writeLong(entry.getValue());
}
MaterializedIndex rollupIndex = partitionIdToRollupIndex.get(partitionId);
rollupIndex.write(out);
}
out.writeLong(baseIndexId);
out.writeLong(rollupIndexId);
Text.writeString(out, baseIndexName);
Text.writeString(out, rollupIndexName);
// rollup schema
out.writeInt(rollupSchema.size());
for (Column column : rollupSchema) {
column.write(out);
}
out.writeInt(baseSchemaHash);
out.writeInt(rollupSchemaHash);
Text.writeString(out, rollupKeysType.name());
out.writeShort(rollupShortKeyColumnCount);
out.writeLong(watershedTxnId);
Text.writeString(out, storageFormat.name());
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
long partitionId = in.readLong();
int size2 = in.readInt();
Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> Maps.newHashMap());
for (int j = 0; j < size2; j++) {
long rollupTabletId = in.readLong();
long baseTabletId = in.readLong();
tabletIdMap.put(rollupTabletId, baseTabletId);
}
partitionIdToRollupIndex.put(partitionId, MaterializedIndex.read(in));
}
baseIndexId = in.readLong();
rollupIndexId = in.readLong();
baseIndexName = Text.readString(in);
rollupIndexName = Text.readString(in);
size = in.readInt();
for (int i = 0; i < size; i++) {
Column column = Column.read(in);
rollupSchema.add(column);
}
baseSchemaHash = in.readInt();
rollupSchemaHash = in.readInt();
rollupKeysType = KeysType.valueOf(Text.readString(in));
rollupShortKeyColumnCount = in.readShort();
watershedTxnId = in.readLong();
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_85) {
storageFormat = TStorageFormat.valueOf(Text.readString(in));
}
}
/**
* Replay job in PENDING state.
* Should replay all changes before this job's state transfer to PENDING.
@ -773,4 +717,90 @@ public class RollupJobV2 extends AlterJobV2 {
this.jobState = jobState;
}
private void setColumnsDefineExpr(List<MVColumnItem> items) {
for (MVColumnItem item : items) {
for (Column column : rollupSchema) {
if (column.getName().equals(item.getName())) {
column.setDefineExpr(item.getDefineExpr());
break;
}
}
}
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this, AlterJobV2.class);
Text.writeString(out, json);
}
/**
* This method is only used to deserialize the text mate which version is less then 86.
* If the meta version >=86, it will be deserialized by the `read` of AlterJobV2 rather then here.
*/
public static RollupJobV2 read(DataInput in) throws IOException {
Preconditions.checkState(Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86);
RollupJobV2 rollupJob = new RollupJobV2();
rollupJob.readFields(in);
return rollupJob;
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
long partitionId = in.readLong();
int size2 = in.readInt();
Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> Maps.newHashMap());
for (int j = 0; j < size2; j++) {
long rollupTabletId = in.readLong();
long baseTabletId = in.readLong();
tabletIdMap.put(rollupTabletId, baseTabletId);
}
partitionIdToRollupIndex.put(partitionId, MaterializedIndex.read(in));
}
baseIndexId = in.readLong();
rollupIndexId = in.readLong();
baseIndexName = Text.readString(in);
rollupIndexName = Text.readString(in);
size = in.readInt();
for (int i = 0; i < size; i++) {
Column column = Column.read(in);
rollupSchema.add(column);
}
baseSchemaHash = in.readInt();
rollupSchemaHash = in.readInt();
rollupKeysType = KeysType.valueOf(Text.readString(in));
rollupShortKeyColumnCount = in.readShort();
watershedTxnId = in.readLong();
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_85) {
storageFormat = TStorageFormat.valueOf(Text.readString(in));
}
}
@Override
public void gsonPostProcess() throws IOException {
// analyze define stmt
if (origStmt == null) {
return;
}
// parse the define stmt to schema
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt),
SqlModeHelper.MODE_DEFAULT));
CreateMaterializedViewStmt stmt;
try {
stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, origStmt.idx);
stmt.analyzeSelectClause();
setColumnsDefineExpr(stmt.getMVColumnItemList());
} catch (Exception e) {
throw new IOException("error happens when parsing create materialized view stmt: " + origStmt, e);
}
}
}

View File

@ -38,6 +38,7 @@ import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
@ -57,8 +58,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.common.collect.Table.Cell;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -81,35 +82,44 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
private static final Logger LOG = LogManager.getLogger(SchemaChangeJobV2.class);
// partition id -> (shadow index id -> (shadow tablet id -> origin tablet id))
@SerializedName(value = "partitionIndexTabletMap")
private Table<Long, Long, Map<Long, Long>> partitionIndexTabletMap = HashBasedTable.create();
// partition id -> (shadow index id -> shadow index))
private Table<Long, Long, MaterializedIndex> partitionIndexMap = HashBasedTable.create();
// shadow index id -> origin index id
@SerializedName(value = "indexIdMap")
private Map<Long, Long> indexIdMap = Maps.newHashMap();
// shadow index id -> shadow index name(__doris_shadow_xxx)
@SerializedName(value = "indexIdToName")
private Map<Long, String> indexIdToName = Maps.newHashMap();
// shadow index id -> index schema
@SerializedName(value = "indexSchemaMap")
private Map<Long, List<Column>> indexSchemaMap = Maps.newHashMap();
// shadow index id -> (shadow index schema version : schema hash)
@SerializedName(value = "indexSchemaVersionAndHashMap")
private Map<Long, Pair<Integer, Integer>> indexSchemaVersionAndHashMap = Maps.newHashMap();
// shadow index id -> shadow index short key count
@SerializedName(value = "indexShortKeyMap")
private Map<Long, Short> indexShortKeyMap = Maps.newHashMap();
// identify whether the job is finished and no need to persist some data
private boolean isMetaPruned = false;
// bloom filter info
@SerializedName(value = "hasBfChange")
private boolean hasBfChange;
@SerializedName(value = "bfColumns")
private Set<String> bfColumns = null;
@SerializedName(value = "bfFpp")
private double bfFpp = 0;
// alter index info
@SerializedName(value = "indexChange")
private boolean indexChange = false;
@SerializedName(value = "indexes")
private List<Index> indexes = null;
// The schema change job will wait all transactions before this txn id finished, then send the schema change tasks.
@SerializedName(value = "watershedTxnId")
protected long watershedTxnId = -1;
@SerializedName(value = "storageFormat")
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
// save all schema change tasks
@ -170,7 +180,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
partitionIndexMap.clear();
indexSchemaMap.clear();
indexShortKeyMap.clear();
isMetaPruned = true;
}
/**
@ -642,12 +651,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId));
}
public static SchemaChangeJobV2 read(DataInput in) throws IOException {
SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2();
schemaChangeJob.readFields(in);
return schemaChangeJob;
}
/**
* Replay job in PENDING state.
* Should replay all changes before this job's state transfer to PENDING.
@ -826,80 +829,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
return taskInfos;
}
/**
* write data need to persist when job not finish
*/
private void writeJobNotFinishData(DataOutput out) throws IOException {
out.writeInt(partitionIndexTabletMap.rowKeySet().size());
for (Long partitionId : partitionIndexTabletMap.rowKeySet()) {
out.writeLong(partitionId);
Map<Long, Map<Long, Long>> indexTabletMap = partitionIndexTabletMap.row(partitionId);
out.writeInt(indexTabletMap.size());
for (Long shadowIndexId : indexTabletMap.keySet()) {
out.writeLong(shadowIndexId);
// tablet id map
Map<Long, Long> tabletMap = indexTabletMap.get(shadowIndexId);
out.writeInt(tabletMap.size());
for (Map.Entry<Long, Long> entry : tabletMap.entrySet()) {
out.writeLong(entry.getKey());
out.writeLong(entry.getValue());
}
// shadow index
MaterializedIndex shadowIndex = partitionIndexMap.get(partitionId, shadowIndexId);
shadowIndex.write(out);
}
}
// shadow index info
out.writeInt(indexIdMap.size());
for (Map.Entry<Long, Long> entry : indexIdMap.entrySet()) {
long shadowIndexId = entry.getKey();
out.writeLong(shadowIndexId);
// index id map
out.writeLong(entry.getValue());
// index name
Text.writeString(out, indexIdToName.get(shadowIndexId));
// index schema
out.writeInt(indexSchemaMap.get(shadowIndexId).size());
for (Column column : indexSchemaMap.get(shadowIndexId)) {
column.write(out);
}
// index schema version and hash
out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first);
out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second);
// short key count
out.writeShort(indexShortKeyMap.get(shadowIndexId));
}
// bloom filter
out.writeBoolean(hasBfChange);
if (hasBfChange) {
out.writeInt(bfColumns.size());
for (String bfCol : bfColumns) {
Text.writeString(out, bfCol);
}
out.writeDouble(bfFpp);
}
out.writeLong(watershedTxnId);
// index
out.writeBoolean(indexChange);
if (indexChange) {
if (CollectionUtils.isNotEmpty(indexes)) {
out.writeBoolean(true);
out.writeInt(indexes.size());
for (Index index : indexes) {
index.write(out);
}
} else {
out.writeBoolean(false);
}
}
Text.writeString(out, storageFormat.name());
}
/**
* read data need to persist when job not finish
*/
@ -982,53 +911,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
}
/**
* write data need to persist when job finished
*/
private void writeJobFinishedData(DataOutput out) throws IOException {
// only persist data will be used in getInfo
out.writeInt(indexIdMap.size());
for (Entry<Long, Long> entry : indexIdMap.entrySet()) {
long shadowIndexId = entry.getKey();
out.writeLong(shadowIndexId);
// index id map
out.writeLong(entry.getValue());
// index name
Text.writeString(out, indexIdToName.get(shadowIndexId));
// index schema version and hash
out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first);
out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second);
}
// bloom filter
out.writeBoolean(hasBfChange);
if (hasBfChange) {
out.writeInt(bfColumns.size());
for (String bfCol : bfColumns) {
Text.writeString(out, bfCol);
}
out.writeDouble(bfFpp);
}
out.writeLong(watershedTxnId);
// index
out.writeBoolean(indexChange);
if (indexChange) {
if (CollectionUtils.isNotEmpty(indexes)) {
out.writeBoolean(true);
out.writeInt(indexes.size());
for (Index index : indexes) {
index.write(out);
}
} else {
out.writeBoolean(false);
}
}
Text.writeString(out, storageFormat.name());
}
/**
* read data need to persist when job finished
*/
@ -1082,14 +964,19 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
String json = GsonUtils.GSON.toJson(this, AlterJobV2.class);
Text.writeString(out, json);
}
out.writeBoolean(isMetaPruned);
if (isMetaPruned) {
writeJobFinishedData(out);
} else {
writeJobNotFinishData(out);
}
/**
* This method is only used to deserialize the text mate which version is less then 86.
* If the meta version >=86, it will be deserialized by the `read` of AlterJobV2 rather then here.
*/
public static SchemaChangeJobV2 read(DataInput in) throws IOException {
Preconditions.checkState(Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86);
SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2();
schemaChangeJob.readFields(in);
return schemaChangeJob;
}
@Override

View File

@ -49,6 +49,8 @@ import java.util.Set;
* [PROPERTIES ("key" = "value")]
*/
public class CreateMaterializedViewStmt extends DdlStmt {
public static final String MATERIALIZED_VIEW_NAME_PRFIX = "__doris_materialized_view_";
private String mvName;
private SelectStmt selectStmt;
private Map<String, String> properties;
@ -100,9 +102,8 @@ public class CreateMaterializedViewStmt extends DdlStmt {
@Override
public void analyze(Analyzer analyzer) throws UserException {
// TODO(ml): remove it
if (!Config.enable_materialized_view) {
throw new AnalysisException("The materialized view is coming soon");
throw new AnalysisException("The materialized view is disabled");
}
super.analyze(analyzer);
FeNameFormat.checkTableName(mvName);
@ -128,7 +129,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
}
}
private void analyzeSelectClause() throws AnalysisException {
public void analyzeSelectClause() throws AnalysisException {
SelectList selectList = selectStmt.getSelectList();
if (selectList.getItems().isEmpty()) {
throw new AnalysisException("The materialized view must contain at least one column");
@ -200,6 +201,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
beginIndexOfAggregation = i;
}
// TODO(ml): support different type of column, int -> bigint(sum)
// TODO: change the column name of bitmap and hll
MVColumnItem mvColumnItem = new MVColumnItem(columnName);
mvColumnItem.setAggregationType(AggregateType.valueOf(functionName.toUpperCase()), false);
mvColumnItem.setDefineExpr(defineExpr);

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.rewrite.ExprRewriter;
import com.google.common.base.Preconditions;
@ -46,6 +47,8 @@ public abstract class StatementBase implements ParseNode {
// END: Members that need to be reset()
/////////////////////////////////////////
private OriginStatement origStmt;
protected StatementBase() { }
/**
@ -153,7 +156,17 @@ public abstract class StatementBase implements ParseNode {
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
}
public void setOrigStmt(OriginStatement origStmt) {
Preconditions.checkState(origStmt != null);
this.origStmt = origStmt;
}
public OriginStatement getOrigStmt() {
return origStmt;
}
/**
* Resets the internal analysis state of this node.
* For easier maintenance, class members that need to be reset are grouped into

View File

@ -4903,7 +4903,8 @@ public class Catalog {
this.alter.processAlterView(stmt, ConnectContext.get());
}
public void createMaterializedView(CreateMaterializedViewStmt stmt) throws AnalysisException, DdlException {
public void createMaterializedView(CreateMaterializedViewStmt stmt)
throws AnalysisException, DdlException {
this.alter.processCreateMaterializedView(stmt);
}

View File

@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TColumnType;
@ -67,7 +68,7 @@ public class Column implements Writable {
private String comment;
@SerializedName(value = "stats")
private ColumnStats stats; // cardinality and selectivity etc.
private Expr defineExpr; //use to define materialize view
private Expr defineExpr; // use to define column in materialize view
public Column() {
this.name = "";
@ -411,31 +412,11 @@ public class Column implements Writable {
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, name);
ColumnType.write(out, type);
if (null == aggregationType) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out, aggregationType.name());
out.writeBoolean(isAggregationTypeImplicit);
}
out.writeBoolean(isKey);
out.writeBoolean(isAllowNull);
if (defaultValue == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out, defaultValue);
}
stats.write(out);
Text.writeString(out, comment);
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public void readFields(DataInput in) throws IOException {
private void readFields(DataInput in) throws IOException {
name = Text.readString(in);
type = ColumnType.read(in);
boolean notNull = in.readBoolean();
@ -475,8 +456,13 @@ public class Column implements Writable {
}
public static Column read(DataInput in) throws IOException {
Column column = new Column();
column.readFields(in);
return column;
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86) {
Column column = new Column();
column.readFields(in);
return column;
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, Column.class);
}
}
}

View File

@ -17,9 +17,17 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.MVColumnItem;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.thrift.TStorageType;
import com.google.common.base.Preconditions;
@ -29,9 +37,10 @@ import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
public class MaterializedIndexMeta implements Writable {
public class MaterializedIndexMeta implements Writable, GsonPostProcessable {
@SerializedName(value = "indexId")
private long indexId;
@SerializedName(value = "schema")
@ -46,9 +55,11 @@ public class MaterializedIndexMeta implements Writable {
private TStorageType storageType;
@SerializedName(value = "keysType")
private KeysType keysType;
@SerializedName(value = "defineStmt")
private OriginStatement defineStmt;
public MaterializedIndexMeta(long indexId, List<Column> schema, int schemaVersion, int
schemaHash, short shortKeyColumnCount, TStorageType storageType, KeysType keysType) {
public MaterializedIndexMeta(long indexId, List<Column> schema, int schemaVersion, int schemaHash,
short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement defineStmt) {
this.indexId = indexId;
Preconditions.checkState(schema != null);
Preconditions.checkState(schema.size() != 0);
@ -60,6 +71,7 @@ public class MaterializedIndexMeta implements Writable {
this.storageType = storageType;
Preconditions.checkState(keysType != null);
this.keysType = keysType;
this.defineStmt = defineStmt;
}
public long getIndexId() {
@ -94,6 +106,17 @@ public class MaterializedIndexMeta implements Writable {
return schemaVersion;
}
private void setColumnsDefineExpr(List<MVColumnItem> items) {
for (MVColumnItem item : items) {
for (Column column : schema) {
if (column.getName().equals(item.getName())) {
column.setDefineExpr(item.getDefineExpr());
break;
}
}
}
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof MaterializedIndexMeta)) {
@ -134,4 +157,23 @@ public class MaterializedIndexMeta implements Writable {
return GsonUtils.GSON.fromJson(json, MaterializedIndexMeta.class);
}
@Override
public void gsonPostProcess() throws IOException {
// analyze define stmt
if (defineStmt == null) {
return;
}
// parse the define stmt to schema
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(defineStmt.originStmt),
SqlModeHelper.MODE_DEFAULT));
CreateMaterializedViewStmt stmt;
try {
stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, defineStmt.idx);
stmt.analyzeSelectClause();
setColumnsDefineExpr(stmt.getMVColumnItemList());
} catch (Exception e) {
throw new IOException("error happens when parsing create materialized view stmt: " + defineStmt, e);
}
}
}

View File

@ -38,6 +38,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TOlapTable;
import org.apache.doris.thrift.TStorageFormat;
@ -246,6 +247,12 @@ public class OlapTable extends Table {
public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion, int schemaHash,
short shortKeyColumnCount, TStorageType storageType, KeysType keysType) {
setIndexMeta(indexId, indexName, schema, schemaVersion, schemaHash, shortKeyColumnCount, storageType, keysType,
null);
}
public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion, int schemaHash,
short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement origStmt) {
// Nullable when meta comes from schema change log replay.
// The replay log only save the index id, so we need to get name by id.
if (indexName == null) {
@ -268,7 +275,7 @@ public class OlapTable extends Table {
}
MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, schema, schemaVersion,
schemaHash, shortKeyColumnCount, storageType, keysType);
schemaHash, shortKeyColumnCount, storageType, keysType, origStmt);
indexIdToMeta.put(indexId, indexMeta);
indexNameToId.put(indexName, indexId);
}
@ -970,7 +977,7 @@ public class OlapTable extends Table {
// The keys type in here is incorrect
MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, schema,
schemaVersion, schemaHash, shortKeyColumnCount, storageType, KeysType.AGG_KEYS);
schemaVersion, schemaHash, shortKeyColumnCount, storageType, KeysType.AGG_KEYS, null);
tmpIndexMetaList.add(indexMeta);
} else {
MaterializedIndexMeta indexMeta = MaterializedIndexMeta.read(in);

View File

@ -1060,7 +1060,7 @@ public class Config extends ConfigBase {
* control materialized view
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_materialized_view = false;
public static boolean enable_materialized_view = true;
/**
* it can't auto-resume routine load job as long as one of the backends is down

View File

@ -181,6 +181,8 @@ public final class FeMetaVersion {
public static final int VERSION_84 = 84;
// add storage format in rollup job
public static final int VERSION_85 = 85;
// serialize origStmt in rollupJob and mv meta
public static final int VERSION_86 = 86;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_85;
public static final int VERSION_CURRENT = VERSION_86;
}

View File

@ -124,7 +124,7 @@ public class BrokerLoadJob extends LoadJob {
}
}
public static BrokerLoadJob fromLoadStmt(LoadStmt stmt, OriginStatement originStmt) throws DdlException {
public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
// get db id
String dbName = stmt.getLabel().getDbName();
Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName());
@ -135,7 +135,7 @@ public class BrokerLoadJob extends LoadJob {
// create job
try {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(),
stmt.getBrokerDesc(), originStmt);
stmt.getBrokerDesc(), stmt.getOrigStmt());
brokerLoadJob.setJobProperties(stmt.getProperties());
brokerLoadJob.checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
return brokerLoadJob;

View File

@ -39,7 +39,6 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadBeginRequest;
import org.apache.doris.thrift.TMiniLoadRequest;
@ -98,7 +97,7 @@ public class LoadManager implements Writable{
* @param stmt
* @throws DdlException
*/
public void createLoadJobFromStmt(LoadStmt stmt, OriginStatement originStmt) throws DdlException {
public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
Database database = checkDb(stmt.getLabel().getDbName());
long dbId = database.getId();
LoadJob loadJob = null;
@ -112,7 +111,7 @@ public class LoadManager implements Writable{
throw new DdlException("There are more then " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, "
+ "please retry later.");
}
loadJob = BrokerLoadJob.fromLoadStmt(stmt, originStmt);
loadJob = BrokerLoadJob.fromLoadStmt(stmt);
createLoadJob(loadJob);
} finally {
writeUnlock();

View File

@ -37,7 +37,6 @@ import org.apache.doris.common.util.LogKey;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -117,7 +116,7 @@ public class RoutineLoadManager implements Writable {
}
public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt, OriginStatement origStmt)
public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt)
throws UserException {
// check load auth
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
@ -141,7 +140,7 @@ public class RoutineLoadManager implements Writable {
throw new UserException("Unknown data source type: " + type);
}
routineLoadJob.setOrigStmt(origStmt);
routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt());
addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName());
}

View File

@ -17,6 +17,8 @@
package org.apache.doris.persist.gson;
import java.io.IOException;
public interface GsonPostProcessable {
public void gsonPostProcess();
public void gsonPostProcess() throws IOException;
}

View File

@ -17,6 +17,9 @@
package org.apache.doris.persist.gson;
import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.RollupJobV2;
import org.apache.doris.alter.SchemaChangeJobV2;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.HashDistributionInfo;
@ -93,6 +96,12 @@ public class GsonUtils {
.of(Resource.class, "clazz")
.registerSubtype(SparkResource.class, SparkResource.class.getSimpleName());
// runtime adapter for class "AlterJobV2"
private static RuntimeTypeAdapterFactory<AlterJobV2> alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory
.of(AlterJobV2.class, "clazz")
.registerSubtype(RollupJobV2.class, RollupJobV2.class.getSimpleName())
.registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName());
// the builder of GSON instance.
// Add any other adapters if necessary.
private static final GsonBuilder GSON_BUILDER = new GsonBuilder()
@ -103,7 +112,8 @@ public class GsonUtils {
.registerTypeAdapterFactory(new PostProcessTypeAdapterFactory())
.registerTypeAdapterFactory(columnTypeAdapterFactory)
.registerTypeAdapterFactory(distributionInfoTypeAdapterFactory)
.registerTypeAdapterFactory(resourceTypeAdapterFactory);
.registerTypeAdapterFactory(resourceTypeAdapterFactory)
.registerTypeAdapterFactory(alterJobV2TypeAdapterFactory);
// this instance is thread-safe.
public static final Gson GSON = GSON_BUILDER.create();

View File

@ -178,7 +178,8 @@ public class ConnectProcessor {
ctx.resetRetureRows();
}
parsedStmt = stmts.get(i);
executor = new StmtExecutor(ctx, parsedStmt, new OriginStatement(originStmt, i));
parsedStmt.setOrigStmt(new OriginStatement(originStmt, i));
executor = new StmtExecutor(ctx, parsedStmt);
executor.execute();
if (i != stmts.size() - 1) {

View File

@ -82,8 +82,7 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.Load;
public class DdlExecutor {
public static void execute(Catalog catalog, DdlStmt ddlStmt, OriginStatement origStmt)
throws DdlException, QueryStateException, Exception {
public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception {
if (ddlStmt instanceof CreateClusterStmt) {
CreateClusterStmt stmt = (CreateClusterStmt) ddlStmt;
catalog.createCluster(stmt);
@ -132,7 +131,7 @@ public class DdlExecutor {
if (loadStmt.getVersion().equals(Load.VERSION) || jobType == EtlJobType.HADOOP) {
catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis());
} else {
catalog.getLoadManager().createLoadJobFromStmt(loadStmt, origStmt);
catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
}
} else if (ddlStmt instanceof CancelLoadStmt) {
if (catalog.getLoadInstance().isLabelExist(
@ -142,7 +141,7 @@ public class DdlExecutor {
catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
}
} else if (ddlStmt instanceof CreateRoutineLoadStmt) {
catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt, origStmt);
catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof PauseRoutineLoadStmt) {
catalog.getRoutineLoadManager().pauseRoutineLoadJob((PauseRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof ResumeRoutineLoadStmt) {

View File

@ -128,12 +128,12 @@ public class StmtExecutor {
}
// constructor for receiving parsed stmt from connect processor
public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt, OriginStatement originStmt) {
public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt) {
this.context = ctx;
this.originStmt = originStmt;
this.parsedStmt = parsedStmt;
this.originStmt = parsedStmt.getOrigStmt();
this.serializer = context.getSerializer();
this.isProxy = false;
this.parsedStmt = parsedStmt;
}
// At the end of query execution, we begin to add up profile
@ -376,7 +376,7 @@ public class StmtExecutor {
SqlParser parser = new SqlParser(input);
try {
parsedStmt = SqlParserUtils.getStmt(parser, originStmt.idx);
parsedStmt.setOrigStmt(originStmt);
} catch (Error e) {
LOG.info("error happened when parsing stmt {}, id: {}", originStmt, context.getStmtId(), e);
throw new AnalysisException("sql parsing error, please check your sql");
@ -889,7 +889,7 @@ public class StmtExecutor {
private void handleDdlStmt() {
try {
DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt, originStmt);
DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt);
context.getState().setOk();
} catch (QueryStateException e) {
context.setState(e.getQueryState());

View File

@ -24,6 +24,13 @@ import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.AddRollupClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.MVColumnItem;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.CatalogTestUtil;
@ -34,7 +41,6 @@ import org.apache.doris.catalog.FakeEditLog;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
@ -48,6 +54,7 @@ import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.thrift.TStorageFormat;
@ -60,7 +67,6 @@ import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.DataInputStream;
@ -367,15 +373,21 @@ public class RollupJobV2Test {
@Test
public void testSerializeOfRollupJob() throws IOException {
public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt) throws IOException {
// prepare file
File file = new File(fileName);
file.createNewFile();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
short keysCount = 1;
RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup",Lists.newArrayList(), 1, 1,
KeysType.AGG_KEYS, keysCount);
List<Column> columns = Lists.newArrayList();
String mvColumnName =CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + "c1";
Column column = new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", "");
columns.add(column);
RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, 1, 1,
KeysType.AGG_KEYS, keysCount,
new OriginStatement("create materialized view rollup as select bitmap_union(to_bitmap(c1)) from test",
0));
rollupJobV2.setStorageFormat(TStorageFormat.V2);
// write rollup job
@ -383,15 +395,37 @@ public class RollupJobV2Test {
out.flush();
out.close();
List<MVColumnItem> itemList = Lists.newArrayList();
MVColumnItem item = new MVColumnItem(
mvColumnName);
List<Expr> params = Lists.newArrayList();
SlotRef param1 = new SlotRef(new TableName(null, "test"), "c1");
params.add(param1);
item.setDefineExpr(new FunctionCallExpr(new FunctionName("to_bitmap"), params));
itemList.add(item);
new Expectations() {
{
stmt.getMVColumnItemList();
result = itemList;
}
};
// read objects from file
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_85);
metaContext.setMetaVersion(FeMetaVersion.VERSION_86);
metaContext.setThreadLocalInfo();
DataInputStream in = new DataInputStream(new FileInputStream(file));
DataInputStream in = new DataInputStream(new FileInputStream(file));
RollupJobV2 result = (RollupJobV2) AlterJobV2.read(in);
Catalog.getCurrentCatalogJournalVersion();
Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat"));
List<Column> resultColumns = Deencapsulation.getField(result, "rollupSchema");
Assert.assertEquals(1, resultColumns.size());
Column resultColumn1 = resultColumns.get(0);
Assert.assertEquals(mvColumnName,
resultColumn1.getName());
Assert.assertTrue(resultColumn1.getDefineExpr() instanceof FunctionCallExpr);
FunctionCallExpr resultFunctionCall = (FunctionCallExpr) resultColumn1.getDefineExpr();
Assert.assertEquals("to_bitmap", resultFunctionCall.getFnName().getFunction());
}
}

View File

@ -53,9 +53,11 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.FakeTransactionIDGenerator;
import org.apache.doris.transaction.GlobalTransactionMgr;
@ -66,6 +68,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
@ -74,6 +82,8 @@ import java.util.Map;
public class SchemaChangeJobV2Test {
private static String fileName = "./SchemaChangeV2Test";
private static FakeEditLog fakeEditLog;
private static FakeCatalog fakeCatalog;
private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
@ -372,4 +382,33 @@ public class SchemaChangeJobV2Test {
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p", DynamicPartitionProperty.ENABLE);
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30", DynamicPartitionProperty.ENABLE);
}
@Test
public void testSerializeOfSchemaChangeJob() throws IOException {
// prepare file
File file = new File(fileName);
file.createNewFile();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1,1, "test",600000);
schemaChangeJobV2.setStorageFormat(TStorageFormat.V2);
Deencapsulation.setField(schemaChangeJobV2, "jobState", AlterJobV2.JobState.FINISHED);
// write schema change job
schemaChangeJobV2.write(out);
out.flush();
out.close();
// read objects from file
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_86);
metaContext.setThreadLocalInfo();
DataInputStream in = new DataInputStream(new FileInputStream(file));
SchemaChangeJobV2 result = (SchemaChangeJobV2) AlterJobV2.read(in);
Assert.assertEquals(1, result.getJobId());
Assert.assertEquals(AlterJobV2.JobState.FINISHED, result.getJobState());
Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat"));
}
}

View File

@ -74,8 +74,7 @@ public class ColumnTest {
// 2. Read objects from file
DataInputStream dis = new DataInputStream(new FileInputStream(file));
Column rColumn1 = new Column();
rColumn1.readFields(dis);
Column rColumn1 = Column.read(dis);
Assert.assertEquals("user", rColumn1.getName());
Assert.assertEquals(PrimitiveType.CHAR, rColumn1.getDataType());
Assert.assertEquals(AggregateType.SUM, rColumn1.getAggregationType());

View File

@ -17,6 +17,14 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.MVColumnItem;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableName;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TStorageType;
import com.google.common.collect.Lists;
@ -33,6 +41,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import mockit.Expectations;
import mockit.Mocked;
public class MaterializedIndexMetaTest {
private static String fileName = "./MaterializedIndexMetaSerializeTest";
@ -44,12 +55,13 @@ public class MaterializedIndexMetaTest {
}
@Test
public void testSerializeMaterializedIndexMeta() throws IOException {
public void testSerializeMaterializedIndexMeta(@Mocked CreateMaterializedViewStmt stmt) throws IOException {
// 1. Write objects to file
File file = new File(fileName);
file.createNewFile();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
String mvColumnName = CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + "k1";
List<Column> schema = Lists.newArrayList();
schema.add(new Column("k1", Type.TINYINT, true, null, true, "1", "abc"));
schema.add(new Column("k2", Type.SMALLINT, true, null, true, "1", "debug"));
@ -63,19 +75,48 @@ public class MaterializedIndexMetaTest {
schema.add(new Column("k10", Type.VARCHAR, true, null, true, "1", ""));
schema.add(new Column("k11", Type.DECIMALV2, true, null, true, "1", ""));
schema.add(new Column("k12", Type.INT, true, null, true, "1", ""));
schema.add(new Column("v1", Type.INT, true, AggregateType.SUM, true, "1", ""));
schema.add(new Column("v1", Type.VARCHAR, true, AggregateType.REPLACE, true, "1", ""));
schema.add(new Column("v1", Type.INT, false, AggregateType.SUM, true, "1", ""));
schema.add(new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", ""));
short shortKeyColumnCount = 1;
MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(1, schema, 1, 1, shortKeyColumnCount,
TStorageType.COLUMN, KeysType.DUP_KEYS);
TStorageType.COLUMN, KeysType.DUP_KEYS, new OriginStatement(
"create materialized view test as select k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12, sum(v1), "
+ "bitmap_union(to_bitmap(k1)) from test group by k1, k2, k3, k4, k5, "
+ "k6, k7, k8, k9, k10, k11, k12",
0));
indexMeta.write(out);
out.flush();
out.close();
List<MVColumnItem> itemList = Lists.newArrayList();
MVColumnItem item = new MVColumnItem(mvColumnName);
List<Expr> params = Lists.newArrayList();
SlotRef param1 = new SlotRef(new TableName(null, "test"), "c1");
params.add(param1);
item.setDefineExpr(new FunctionCallExpr(new FunctionName("to_bitmap"), params));
itemList.add(item);
new Expectations() {
{
stmt.getMVColumnItemList();
result = itemList;
}
};
// 2. Read objects from file
DataInputStream in = new DataInputStream(new FileInputStream(file));
MaterializedIndexMeta readIndexMeta = MaterializedIndexMeta.read(in);
Assert.assertEquals(indexMeta, readIndexMeta);
Assert.assertEquals(1, readIndexMeta.getIndexId());
List<Column> resultColumns = readIndexMeta.getSchema();
for (Column column : resultColumns) {
if (column.getName().equals(mvColumnName)) {
Assert.assertTrue(column.getDefineExpr() instanceof FunctionCallExpr);
Assert.assertEquals(Type.BITMAP, column.getType());
Assert.assertEquals(AggregateType.BITMAP_UNION, column.getAggregationType());
Assert.assertEquals("to_bitmap", ((FunctionCallExpr) column.getDefineExpr()).getFnName().getFunction());
} else {
Assert.assertEquals(null, column.getDefineExpr());
}
}
}
}

View File

@ -560,10 +560,6 @@ public class TempPartitionTest {
}
private void testSerializeOlapTable(OlapTable tbl) throws IOException, AnalysisException {
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_77);
metaContext.setThreadLocalInfo();
// 1. Write objects to file
File file = new File(tempPartitionFile);
file.createNewFile();

View File

@ -71,14 +71,10 @@ public class BrokerLoadJobTest {
@Injectable LabelName labelName,
@Injectable DataDescription dataDescription,
@Mocked Catalog catalog,
@Injectable Database database,
@Injectable BrokerDesc brokerDesc,
@Injectable String originStmt) {
@Injectable Database database) {
List<DataDescription> dataDescriptionList = Lists.newArrayList();
dataDescriptionList.add(dataDescription);
String label = "label";
long dbId = 1;
String tableName = "table";
String databaseName = "database";
new Expectations() {
@ -105,7 +101,7 @@ public class BrokerLoadJobTest {
};
try {
BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, new OriginStatement(originStmt, 0));
BrokerLoadJob.fromLoadStmt(loadStmt);
Assert.fail();
} catch (DdlException e) {
System.out.println("could not find table named " + tableName);
@ -119,8 +115,7 @@ public class BrokerLoadJobTest {
@Injectable LabelName labelName,
@Injectable Database database,
@Injectable OlapTable olapTable,
@Mocked Catalog catalog,
@Injectable String originStmt) {
@Mocked Catalog catalog) {
String label = "label";
long dbId = 1;
@ -170,7 +165,7 @@ public class BrokerLoadJobTest {
};
try {
BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, new OriginStatement(originStmt, 0));
BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt);
Assert.assertEquals(Long.valueOf(dbId), Deencapsulation.getField(brokerLoadJob, "dbId"));
Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label"));
Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state"));

View File

@ -94,6 +94,7 @@ public class RoutineLoadManagerTest {
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString,
loadPropertyList, properties,
typeName, customProperties);
createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
serverAddress, topicName);
@ -116,7 +117,7 @@ public class RoutineLoadManagerTest {
}
};
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new OriginStatement("dummy", 0));
routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt);
Map<String, RoutineLoadJob> idToRoutineLoadJob =
Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob");
@ -162,6 +163,7 @@ public class RoutineLoadManagerTest {
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString,
loadPropertyList, properties,
typeName, customProperties);
createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
new Expectations() {
@ -176,7 +178,7 @@ public class RoutineLoadManagerTest {
};
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
try {
routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new OriginStatement("dummy", 0));
routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt);
Assert.fail();
} catch (LoadException | DdlException e) {
Assert.fail();

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist.gson;
import org.apache.doris.thrift.TStorageFormat;
import org.junit.Assert;
import org.junit.Test;
public class ThriftToJsonTest {
@Test
public void testTEnumToJson() {
// write
String serializeString = GsonUtils.GSON.toJson(TStorageFormat.V1);
// read
TStorageFormat tStorageFormat = GsonUtils.GSON.fromJson(serializeString, TStorageFormat.class);
Assert.assertEquals(TStorageFormat.V1, tStorageFormat);
}
}

View File

@ -577,7 +577,7 @@ public class StmtExecutorTest {
new Expectations(ddlExecutor) {
{
// Mock ddl
DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any);
DdlExecutor.execute((Catalog) any, (DdlStmt) any);
minTimes = 0;
}
};
@ -610,7 +610,7 @@ public class StmtExecutorTest {
new Expectations(ddlExecutor) {
{
// Mock ddl
DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any);
DdlExecutor.execute((Catalog) any, (DdlStmt) any);
minTimes = 0;
result = new DdlException("ddl fail");
}
@ -644,7 +644,7 @@ public class StmtExecutorTest {
new Expectations(ddlExecutor) {
{
// Mock ddl
DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any);
DdlExecutor.execute((Catalog) any, (DdlStmt) any);
minTimes = 0;
result = new Exception("bug");
}