[Bug] Serialize storage format in rollup job (#3686)

The segment v2 rollup job should set the storage format v2 and serialize it.
If it is not serialized, the rollup of segment v2 may use the error format 'segment v1'.
This commit is contained in:
EmmyMiao87
2020-05-26 15:35:12 +08:00
committed by GitHub
parent f4c03fe8e2
commit aa4ac2d078
3 changed files with 70 additions and 6 deletions

View File

@ -17,51 +17,71 @@
package org.apache.doris.alter;
import mockit.Mock;
import mockit.MockUp;
import static org.junit.Assert.assertEquals;
import org.apache.doris.alter.AlterJobV2.JobState;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.AddRollupClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.CatalogTestUtil;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.FakeCatalog;
import org.apache.doris.catalog.FakeEditLog;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.FakeTransactionIDGenerator;
import org.apache.doris.transaction.GlobalTransactionMgr;
import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
public class RollupJobV2Test {
private static String fileName = "./RollupJobV2Test";
private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
private static GlobalTransactionMgr masterTransMgr;
private static GlobalTransactionMgr slaveTransMgr;
@ -118,6 +138,12 @@ public class RollupJobV2Test {
};
}
@After
public void tearDown() {
File file = new File(fileName);
file.delete();
}
@Test
public void testRunRollupJobConcurrentLimit() throws UserException {
fakeCatalog = new FakeCatalog();
@ -338,4 +364,34 @@ public class RollupJobV2Test {
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState());
}
@Test
public void testSerializeOfRollupJob() throws IOException {
// prepare file
File file = new File(fileName);
file.createNewFile();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
short keysCount = 1;
RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup",Lists.newArrayList(), 1, 1,
KeysType.AGG_KEYS, keysCount);
rollupJobV2.setStorageFormat(TStorageFormat.V2);
// write rollup job
rollupJobV2.write(out);
out.flush();
out.close();
// read objects from file
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_85);
metaContext.setThreadLocalInfo();
DataInputStream in = new DataInputStream(new FileInputStream(file));
RollupJobV2 result = (RollupJobV2) AlterJobV2.read(in);
Catalog.getCurrentCatalogJournalVersion();
Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat"));
}
}