From aa4ac2d078168aaadd74701626f95adfe856a299 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Tue, 26 May 2020 15:35:12 +0800 Subject: [PATCH] [Bug] Serialize storage format in rollup job (#3686) The segment v2 rollup job should set the storage format v2 and serialize it. If it is not serialized, the rollup of segment v2 may use the error format 'segment v1'. --- .../org/apache/doris/alter/RollupJobV2.java | 10 ++- .../apache/doris/common/FeMetaVersion.java | 4 +- .../apache/doris/alter/RollupJobV2Test.java | 62 ++++++++++++++++++- 3 files changed, 70 insertions(+), 6 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java index 4127bd26ca..71d23673bd 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; @@ -88,14 +89,15 @@ public class RollupJobV2 extends AlterJobV2 { private KeysType rollupKeysType; private short rollupShortKeyColumnCount; + // optional + private TStorageFormat storageFormat = TStorageFormat.DEFAULT; + // The rollup job will wait all transactions before this txn id finished, then send the rollup tasks. protected long watershedTxnId = -1; // save all create rollup tasks private AgentBatchTask rollupBatchTask = new AgentBatchTask(); - private TStorageFormat storageFormat = null; - public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName, List rollupSchema, int baseSchemaHash, int rollupSchemaHash, @@ -542,6 +544,7 @@ public class RollupJobV2 extends AlterJobV2 { out.writeShort(rollupShortKeyColumnCount); out.writeLong(watershedTxnId); + Text.writeString(out, storageFormat.name()); } @Override @@ -579,6 +582,9 @@ public class RollupJobV2 extends AlterJobV2 { rollupShortKeyColumnCount = in.readShort(); watershedTxnId = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_85) { + storageFormat = TStorageFormat.valueOf(Text.readString(in)); + } } /** diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index bebaccffe5..f4d1f8cd75 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -179,6 +179,8 @@ public final class FeMetaVersion { public static final int VERSION_83 = 83; // add storage format in schema change job public static final int VERSION_84 = 84; + // add storage format in rollup job + public static final int VERSION_85 = 85; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_84; + public static final int VERSION_CURRENT = VERSION_85; } diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 2be848d047..35429d0dc6 100644 --- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -17,51 +17,71 @@ package org.apache.doris.alter; -import mockit.Mock; -import mockit.MockUp; +import static org.junit.Assert.assertEquals; + import org.apache.doris.alter.AlterJobV2.JobState; import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.analysis.AddRollupClause; import org.apache.doris.analysis.AlterClause; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.FakeCatalog; import org.apache.doris.catalog.FakeEditLog; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; +import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.meta.MetaContext; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.FakeTransactionIDGenerator; import org.apache.doris.transaction.GlobalTransactionMgr; import com.google.common.collect.Lists; +import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; public class RollupJobV2Test { + private static String fileName = "./RollupJobV2Test"; + private static FakeTransactionIDGenerator fakeTransactionIDGenerator; private static GlobalTransactionMgr masterTransMgr; private static GlobalTransactionMgr slaveTransMgr; @@ -118,6 +138,12 @@ public class RollupJobV2Test { }; } + @After + public void tearDown() { + File file = new File(fileName); + file.delete(); + } + @Test public void testRunRollupJobConcurrentLimit() throws UserException { fakeCatalog = new FakeCatalog(); @@ -338,4 +364,34 @@ public class RollupJobV2Test { materializedViewHandler.runAfterCatalogReady(); Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState()); } + + + @Test + public void testSerializeOfRollupJob() throws IOException { + // prepare file + File file = new File(fileName); + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + + short keysCount = 1; + RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup",Lists.newArrayList(), 1, 1, + KeysType.AGG_KEYS, keysCount); + rollupJobV2.setStorageFormat(TStorageFormat.V2); + + // write rollup job + rollupJobV2.write(out); + out.flush(); + out.close(); + + // read objects from file + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_85); + metaContext.setThreadLocalInfo(); + DataInputStream in = new DataInputStream(new FileInputStream(file)); + + RollupJobV2 result = (RollupJobV2) AlterJobV2.read(in); + Catalog.getCurrentCatalogJournalVersion(); + Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat")); + + } }