diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java index 4127bd26ca..71d23673bd 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; @@ -88,14 +89,15 @@ public class RollupJobV2 extends AlterJobV2 { private KeysType rollupKeysType; private short rollupShortKeyColumnCount; + // optional + private TStorageFormat storageFormat = TStorageFormat.DEFAULT; + // The rollup job will wait all transactions before this txn id finished, then send the rollup tasks. protected long watershedTxnId = -1; // save all create rollup tasks private AgentBatchTask rollupBatchTask = new AgentBatchTask(); - private TStorageFormat storageFormat = null; - public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName, List rollupSchema, int baseSchemaHash, int rollupSchemaHash, @@ -542,6 +544,7 @@ public class RollupJobV2 extends AlterJobV2 { out.writeShort(rollupShortKeyColumnCount); out.writeLong(watershedTxnId); + Text.writeString(out, storageFormat.name()); } @Override @@ -579,6 +582,9 @@ public class RollupJobV2 extends AlterJobV2 { rollupShortKeyColumnCount = in.readShort(); watershedTxnId = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_85) { + storageFormat = TStorageFormat.valueOf(Text.readString(in)); + } } /** diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index bebaccffe5..f4d1f8cd75 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -179,6 +179,8 @@ public final class FeMetaVersion { public static final int VERSION_83 = 83; // add storage format in schema change job public static final int VERSION_84 = 84; + // add storage format in rollup job + public static final int VERSION_85 = 85; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_84; + public static final int VERSION_CURRENT = VERSION_85; } diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 2be848d047..35429d0dc6 100644 --- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -17,51 +17,71 @@ package org.apache.doris.alter; -import mockit.Mock; -import mockit.MockUp; +import static org.junit.Assert.assertEquals; + import org.apache.doris.alter.AlterJobV2.JobState; import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.analysis.AddRollupClause; import org.apache.doris.analysis.AlterClause; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.FakeCatalog; import org.apache.doris.catalog.FakeEditLog; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; +import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.meta.MetaContext; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.FakeTransactionIDGenerator; import org.apache.doris.transaction.GlobalTransactionMgr; import com.google.common.collect.Lists; +import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; public class RollupJobV2Test { + private static String fileName = "./RollupJobV2Test"; + private static FakeTransactionIDGenerator fakeTransactionIDGenerator; private static GlobalTransactionMgr masterTransMgr; private static GlobalTransactionMgr slaveTransMgr; @@ -118,6 +138,12 @@ public class RollupJobV2Test { }; } + @After + public void tearDown() { + File file = new File(fileName); + file.delete(); + } + @Test public void testRunRollupJobConcurrentLimit() throws UserException { fakeCatalog = new FakeCatalog(); @@ -338,4 +364,34 @@ public class RollupJobV2Test { materializedViewHandler.runAfterCatalogReady(); Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState()); } + + + @Test + public void testSerializeOfRollupJob() throws IOException { + // prepare file + File file = new File(fileName); + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + + short keysCount = 1; + RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup",Lists.newArrayList(), 1, 1, + KeysType.AGG_KEYS, keysCount); + rollupJobV2.setStorageFormat(TStorageFormat.V2); + + // write rollup job + rollupJobV2.write(out); + out.flush(); + out.close(); + + // read objects from file + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_85); + metaContext.setThreadLocalInfo(); + DataInputStream in = new DataInputStream(new FileInputStream(file)); + + RollupJobV2 result = (RollupJobV2) AlterJobV2.read(in); + Catalog.getCurrentCatalogJournalVersion(); + Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat")); + + } }