From 92d8f6ae78a5c8d2bbf2bbdce68b7b75d237ef20 Mon Sep 17 00:00:00 2001 From: WingC <1018957763@qq.com> Date: Sat, 18 Jan 2020 22:56:37 +0800 Subject: [PATCH] [Alter] Allow submitting alter jobs when table is unstable Alter job will wait table to be stable before running. --- .../java/org/apache/doris/alter/Alter.java | 5 +- .../org/apache/doris/alter/RollupJobV2.java | 11 +- .../doris/alter/SchemaChangeHandler.java | 6 +- .../apache/doris/alter/SchemaChangeJobV2.java | 14 +- .../org/apache/doris/clone/TabletChecker.java | 157 ++++++++---------- .../apache/doris/alter/RollupJobV2Test.java | 88 +++++++++- .../doris/alter/SchemaChangeJobV2Test.java | 82 +++++++++ .../apache/doris/catalog/CatalogTestUtil.java | 7 +- 8 files changed, 274 insertions(+), 96 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java b/fe/src/main/java/org/apache/doris/alter/Alter.java index 9c2a008875..b21b620568 100644 --- a/fe/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/src/main/java/org/apache/doris/alter/Alter.java @@ -275,8 +275,9 @@ public class Alter { if (olapTable.getState() != OlapTableState.NORMAL) { throw new DdlException("Table[" + table.getName() + "]'s state is not NORMAL. Do not allow doing ALTER ops"); } - - if (needTableStable) { + + // schema change job will wait until table become stable + if (needTableStable && !hasSchemaChange && !hasAddMaterializedView) { // check if all tablets are healthy, and no tablet is in tablet scheduler boolean isStable = olapTable.isStable(Catalog.getCurrentSystemInfo(), Catalog.getCurrentCatalog().getTabletScheduler(), diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java index 2b4cda75ca..880814d602 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -168,8 +168,17 @@ public class RollupJobV2 extends AlterJobV2 { if (tbl == null) { throw new AlterCancelException("Table " + tableId + " does not exist"); } - Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); + boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(), + Catalog.getCurrentCatalog().getTabletScheduler(), + db.getClusterName()); + if (!isStable) { + errMsg = "table is unstable"; + LOG.warn("doing rollup job: " + jobId + " while table is not stable."); + return; + } + + Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { long partitionId = entry.getKey(); Partition partition = tbl.getPartition(partitionId); diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index fbe36e5222..c8b3109440 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1310,12 +1310,12 @@ public class SchemaChangeHandler extends AlterHandler { public void process(List alterClauses, String clusterName, Database db, OlapTable olapTable) throws UserException { // index id -> index schema - Map> indexSchemaMap = new HashMap>(); + Map> indexSchemaMap = new HashMap<>(); for (Map.Entry> entry : olapTable.getIndexIdToSchema().entrySet()) { - indexSchemaMap.put(entry.getKey(), new LinkedList(entry.getValue())); + indexSchemaMap.put(entry.getKey(), new LinkedList<>(entry.getValue())); } List newIndexes = olapTable.getCopiedIndexes(); - Map propertyMap = new HashMap(); + Map propertyMap = new HashMap<>(); for (AlterClause alterClause : alterClauses) { // get properties Map properties = alterClause.getProperties(); diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index a8fdc7b775..056a80ec69 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -184,13 +184,23 @@ public class SchemaChangeJobV2 extends AlterJobV2 { totalReplicaNum += tablet.getReplicas().size(); } } - MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(totalReplicaNum); + MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch<>(totalReplicaNum); db.readLock(); try { OlapTable tbl = (OlapTable) db.getTable(tableId); if (tbl == null) { throw new AlterCancelException("Table " + tableId + " does not exist"); - } + } + + boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(), + Catalog.getCurrentCatalog().getTabletScheduler(), + db.getClusterName()); + if (!isStable) { + errMsg = "table is unstable"; + LOG.warn("doing schema change job: " + jobId + " while table is not stable."); + return; + } + Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); for (long partitionId : partitionIndexMap.rowKeySet()) { Partition partition = tbl.getPartition(partitionId); diff --git a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java index bae24386d0..cba2d8a193 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -100,6 +100,18 @@ public class TabletChecker extends MasterDaemon { } } + public static class RepairTabletInfo { + public long dbId; + public long tblId; + public List partIds; + + public RepairTabletInfo(Long dbId, Long tblId, List partIds) { + this.dbId = dbId; + this.tblId = tblId; + this.partIds = partIds; + } + } + public TabletChecker(Catalog catalog, SystemInfoService infoService, TabletScheduler tabletScheduler, TabletSchedulerStat stat) { super("tablet checker", CHECK_INTERVAL_MS); @@ -109,42 +121,42 @@ public class TabletChecker extends MasterDaemon { this.stat = stat; } - public void addPrios(long dbId, long tblId, List partitionIds, long timeoutMs) { - Preconditions.checkArgument(!partitionIds.isEmpty()); + private void addPrios(RepairTabletInfo repairTabletInfo, long timeoutMs) { + Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty()); long currentTime = System.currentTimeMillis(); synchronized (prios) { - Set parts = prios.get(dbId, tblId); + Set parts = prios.get(repairTabletInfo.dbId, repairTabletInfo.tblId); if (parts == null) { parts = Sets.newHashSet(); - prios.put(dbId, tblId, parts); + prios.put(repairTabletInfo.dbId, repairTabletInfo.tblId, parts); } - for (long partId : partitionIds) { + for (long partId : repairTabletInfo.partIds) { PrioPart prioPart = new PrioPart(partId, currentTime, timeoutMs); parts.add(prioPart); } } // we also need to change the priority of tablets which are already in - tabletScheduler.changeTabletsPriorityToVeryHigh(dbId, tblId, partitionIds); + tabletScheduler.changeTabletsPriorityToVeryHigh(repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds); } - private void removePrios(long dbId, long tblId, List partitionIds) { - Preconditions.checkArgument(!partitionIds.isEmpty()); + private void removePrios(RepairTabletInfo repairTabletInfo) { + Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty()); synchronized (prios) { - Map> tblMap = prios.row(dbId); + Map> tblMap = prios.row(repairTabletInfo.dbId); if (tblMap == null) { return; } - Set parts = tblMap.get(tblId); + Set parts = tblMap.get(repairTabletInfo.tblId); if (parts == null) { return; } - for (long partId : partitionIds) { + for (long partId : repairTabletInfo.partIds) { parts.remove(new PrioPart(partId, -1, -1)); } if (parts.isEmpty()) { - tblMap.remove(tblId); + tblMap.remove(repairTabletInfo.tblId); } } @@ -271,7 +283,8 @@ public class TabletChecker extends MasterDaemon { // priorities. LOG.debug("partition is healthy, remove from prios: {}-{}-{}", db.getId(), olapTbl.getId(), partition.getId()); - removePrios(db.getId(), olapTbl.getId(), Lists.newArrayList(partition.getId())); + removePrios(new RepairTabletInfo(db.getId(), + olapTbl.getId(), Lists.newArrayList(partition.getId()))); } } // partitions } // tables @@ -356,42 +369,9 @@ public class TabletChecker extends MasterDaemon { * when being scheduled. */ public void repairTable(AdminRepairTableStmt stmt) throws DdlException { - Catalog catalog = Catalog.getCurrentCatalog(); - Database db = catalog.getDb(stmt.getDbName()); - if (db == null) { - throw new DdlException("Database " + stmt.getDbName() + " does not exist"); - } - - long dbId = db.getId(); - long tblId = -1; - List partIds = Lists.newArrayList(); - db.readLock(); - try { - Table tbl = db.getTable(stmt.getTblName()); - if (tbl == null || tbl.getType() != TableType.OLAP) { - throw new DdlException("Table does not exist or is not OLAP table: " + stmt.getTblName()); - } - - tblId = tbl.getId(); - OlapTable olapTable = (OlapTable) tbl; - if (stmt.getPartitions().isEmpty()) { - partIds = olapTable.getPartitions().stream().map(p -> p.getId()).collect(Collectors.toList()); - } else { - for (String partName : stmt.getPartitions()) { - Partition partition = olapTable.getPartition(partName); - if (partition == null) { - throw new DdlException("Partition does not exist: " + partName); - } - partIds.add(partition.getId()); - } - } - } finally { - db.readUnlock(); - } - - Preconditions.checkState(tblId != -1); - addPrios(dbId, tblId, partIds, stmt.getTimeoutS() * 1000); - LOG.info("repair database: {}, table: {}, partition: {}", dbId, tblId, partIds); + RepairTabletInfo repairTabletInfo = getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions()); + addPrios(repairTabletInfo, stmt.getTimeoutS()); + LOG.info("repair database: {}, table: {}, partition: {}", repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds); } /* @@ -399,42 +379,9 @@ public class TabletChecker extends MasterDaemon { * This operation will remove the specified partitions from 'prios' */ public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws DdlException { - Catalog catalog = Catalog.getCurrentCatalog(); - Database db = catalog.getDb(stmt.getDbName()); - if (db == null) { - throw new DdlException("Database " + stmt.getDbName() + " does not exist"); - } - - long dbId = db.getId(); - long tblId = -1; - List partIds = Lists.newArrayList(); - db.readLock(); - try { - Table tbl = db.getTable(stmt.getTblName()); - if (tbl == null || tbl.getType() != TableType.OLAP) { - throw new DdlException("Table does not exist or is not OLAP table: " + stmt.getTblName()); - } - - tblId = tbl.getId(); - OlapTable olapTable = (OlapTable) tbl; - if (stmt.getPartitions().isEmpty()) { - partIds = olapTable.getPartitions().stream().map(p -> p.getId()).collect(Collectors.toList()); - } else { - for (String partName : stmt.getPartitions()) { - Partition partition = olapTable.getPartition(partName); - if (partition == null) { - throw new DdlException("Partition does not exist: " + partName); - } - partIds.add(partition.getId()); - } - } - } finally { - db.readUnlock(); - } - - Preconditions.checkState(tblId != -1); - removePrios(dbId, tblId, partIds); - LOG.info("cancel repair database: {}, table: {}, partition: {}", dbId, tblId, partIds); + RepairTabletInfo repairTabletInfo = getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions()); + removePrios(repairTabletInfo); + LOG.info("cancel repair database: {}, table: {}, partition: {}", repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds); } public int getPrioPartitionNum() { @@ -463,4 +410,44 @@ public class TabletChecker extends MasterDaemon { } return infos; } + + public static RepairTabletInfo getRepairTabletInfo(String dbName, String tblName, List partitions) throws DdlException { + Catalog catalog = Catalog.getCurrentCatalog(); + Database db = catalog.getDb(dbName); + if (db == null) { + throw new DdlException("Database " + dbName + " does not exist"); + } + + long dbId = db.getId(); + long tblId = -1; + List partIds = Lists.newArrayList(); + db.readLock(); + try { + Table tbl = db.getTable(tblName); + if (tbl == null || tbl.getType() != TableType.OLAP) { + throw new DdlException("Table does not exist or is not OLAP table: " + tblName); + } + + tblId = tbl.getId(); + OlapTable olapTable = (OlapTable) tbl; + + if (partitions == null || partitions.isEmpty()) { + partIds = olapTable.getPartitions().stream().map(Partition::getId).collect(Collectors.toList()); + } else { + for (String partName : partitions) { + Partition partition = olapTable.getPartition(partName); + if (partition == null) { + throw new DdlException("Partition does not exist: " + partName); + } + partIds.add(partition.getId()); + } + } + } finally { + db.readUnlock(); + } + + Preconditions.checkState(tblId != -1); + + return new RepairTabletInfo(dbId, tblId, partIds); + } } diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 8f88bc0de4..d79b8c5ab3 100644 --- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -58,6 +58,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; + public class RollupJobV2Test { private static FakeTransactionIDGenerator fakeTransactionIDGenerator; private static GlobalTransactionMgr masterTransMgr; @@ -69,11 +71,13 @@ public class RollupJobV2Test { private static Analyzer analyzer; private static AddRollupClause clause; - FakeEditLog fakeEditLog; + private FakeCatalog fakeCatalog; + private FakeEditLog fakeEditLog; @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException, AnalysisException { + fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); fakeTransactionIDGenerator = new FakeTransactionIDGenerator(); masterCatalog = CatalogTestUtil.createTestCatalog(); @@ -110,7 +114,9 @@ public class RollupJobV2Test { @Test public void testAddSchemaChange() throws UserException { + fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); MaterializedViewHandler materializedViewHandler = Catalog.getInstance().getRollupHandler(); ArrayList alterClauses = new ArrayList<>(); alterClauses.add(clause); @@ -125,7 +131,9 @@ public class RollupJobV2Test { // start a schema change, then finished @Test public void testSchemaChange1() throws Exception { + fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); MaterializedViewHandler materializedViewHandler = Catalog.getInstance().getRollupHandler(); // add a rollup job @@ -225,4 +233,82 @@ public class RollupJobV2Test { */ } + @Test + public void testSchemaChangeWhileTabletNotStable() throws Exception { + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); + MaterializedViewHandler materializedViewHandler = Catalog.getInstance().getRollupHandler(); + + // add a rollup job + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(clause); + Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1); + OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1); + Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1); + materializedViewHandler.process(alterClauses, db.getClusterName(), db, olapTable); + Map alterJobsV2 = materializedViewHandler.getAlterJobsV2(); + Assert.assertEquals(1, alterJobsV2.size()); + RollupJobV2 rollupJob = (RollupJobV2) alterJobsV2.values().stream().findAny().get(); + + MaterializedIndex baseIndex = testPartition.getBaseIndex(); + assertEquals(MaterializedIndex.IndexState.NORMAL, baseIndex.getState()); + assertEquals(Partition.PartitionState.NORMAL, testPartition.getState()); + assertEquals(OlapTableState.ROLLUP, olapTable.getState()); + + Tablet baseTablet = baseIndex.getTablets().get(0); + List replicas = baseTablet.getReplicas(); + Replica replica1 = replicas.get(0); + Replica replica2 = replicas.get(1); + Replica replica3 = replicas.get(2); + + assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion()); + assertEquals(-1, replica1.getLastFailedVersion()); + assertEquals(-1, replica2.getLastFailedVersion()); + assertEquals(-1, replica3.getLastFailedVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica1.getLastSuccessVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica2.getLastSuccessVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica3.getLastSuccessVersion()); + + // runPendingJob + replica1.setState(Replica.ReplicaState.DECOMMISSION); + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.PENDING, rollupJob.getJobState()); + + // table is stable, runPendingJob again + replica1.setState(Replica.ReplicaState.NORMAL); + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.WAITING_TXN, rollupJob.getJobState()); + Assert.assertEquals(2, testPartition.getMaterializedIndices(IndexExtState.ALL).size()); + Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size()); + Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.SHADOW).size()); + + // runWaitingTxnJob + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState()); + + // runWaitingTxnJob, task not finished + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState()); + + // finish all tasks + List tasks = AgentTaskQueue.getTask(TTaskType.ALTER); + Assert.assertEquals(3, tasks.size()); + for (AgentTask agentTask : tasks) { + agentTask.setFinished(true); + } + MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); + for (Tablet shadowTablet : shadowIndex.getTablets()) { + for (Replica shadowReplica : shadowTablet.getReplicas()) { + shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), + testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(), + shadowReplica.getRowCount()); + } + } + + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState()); + } } diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index d462eaa0c3..91a745bc18 100644 --- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -206,6 +206,88 @@ public class SchemaChangeJobV2Test { Assert.assertEquals(JobState.FINISHED, schemaChangeJob.getJobState()); } + @Test + public void testSchemaChangeWhileTabletNotStable() throws Exception { + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); + SchemaChangeHandler schemaChangeHandler = Catalog.getInstance().getSchemaChangeHandler(); + + // add a schema change job + ArrayList alterClauses = new ArrayList<>(); + alterClauses.add(addColumnClause); + Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1); + OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1); + Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1); + schemaChangeHandler.process(alterClauses, "default_cluster", db, olapTable); + Map alterJobsV2 = schemaChangeHandler.getAlterJobsV2(); + Assert.assertEquals(1, alterJobsV2.size()); + SchemaChangeJobV2 schemaChangeJob = (SchemaChangeJobV2) alterJobsV2.values().stream().findAny().get(); + + MaterializedIndex baseIndex = testPartition.getBaseIndex(); + assertEquals(IndexState.NORMAL, baseIndex.getState()); + assertEquals(PartitionState.NORMAL, testPartition.getState()); + assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState()); + + Tablet baseTablet = baseIndex.getTablets().get(0); + List replicas = baseTablet.getReplicas(); + Replica replica1 = replicas.get(0); + Replica replica2 = replicas.get(1); + Replica replica3 = replicas.get(2); + + assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion()); + assertEquals(-1, replica1.getLastFailedVersion()); + assertEquals(-1, replica2.getLastFailedVersion()); + assertEquals(-1, replica3.getLastFailedVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica1.getLastSuccessVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica2.getLastSuccessVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica3.getLastSuccessVersion()); + + // runPendingJob + replica1.setState(Replica.ReplicaState.DECOMMISSION); + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.PENDING, schemaChangeJob.getJobState()); + + // table is stable runPendingJob again + replica1.setState(Replica.ReplicaState.NORMAL); + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.WAITING_TXN, schemaChangeJob.getJobState()); + Assert.assertEquals(2, testPartition.getMaterializedIndices(IndexExtState.ALL).size()); + Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size()); + Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.SHADOW).size()); + + // runWaitingTxnJob + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState()); + + // runWaitingTxnJob, task not finished + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState()); + + // runRunningJob + schemaChangeHandler.runAfterCatalogReady(); + // task not finished, still running + Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState()); + + // finish alter tasks + List tasks = AgentTaskQueue.getTask(TTaskType.ALTER); + Assert.assertEquals(3, tasks.size()); + for (AgentTask agentTask : tasks) { + agentTask.setFinished(true); + } + MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); + for (Tablet shadowTablet : shadowIndex.getTablets()) { + for (Replica shadowReplica : shadowTablet.getReplicas()) { + shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(), shadowReplica.getRowCount()); + } + } + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.FINISHED, schemaChangeJob.getJobState()); + } + @Test public void testModifyDynamicPartitionNormal() throws UserException { fakeCatalog = new FakeCatalog(); diff --git a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index 172f860156..7a1276b869 100644 --- a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -89,8 +89,11 @@ public class CatalogTestUtil { catalog.setEditLog(new EditLog("name")); FakeCatalog.setCatalog(catalog); Backend backend1 = createBackend(testBackendId1, "host1", 123, 124, 125); - Backend backend2 = createBackend(testBackendId2, "host1", 123, 124, 125); - Backend backend3 = createBackend(testBackendId3, "host1", 123, 124, 125); + Backend backend2 = createBackend(testBackendId2, "host2", 123, 124, 125); + Backend backend3 = createBackend(testBackendId3, "host3", 123, 124, 125); + backend1.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); + backend2.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); + backend3.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); Catalog.getCurrentSystemInfo().addBackend(backend1); Catalog.getCurrentSystemInfo().addBackend(backend2); Catalog.getCurrentSystemInfo().addBackend(backend3);