[fix](alter-job) Missing alter job when doing checkpoint image (#9329)

This bug is introduced from #8030
This commit is contained in:
Mingyu Chen
2022-05-03 22:36:36 +08:00
committed by GitHub
parent 7eefad429b
commit 3baf6cefc3
14 changed files with 55 additions and 54 deletions

View File

@ -58,7 +58,7 @@ public abstract class AlterJobV2 implements Writable {
}
public enum JobType {
ROLLUP, SCHEMA_CHANGE, DECOMMISSION_BACKEND
ROLLUP, SCHEMA_CHANGE
}
@SerializedName(value = "type")

View File

@ -118,7 +118,7 @@ public class SchemaChangeHandler extends AlterHandler {
public final Map<Long, AlterJobV2> runnableSchemaChangeJobV2 = Maps.newConcurrentMap();
public int cycle_count = 0;
public int cycleCount = 0;
public SchemaChangeHandler() {
super("schema change", Config.default_schema_change_scheduler_interval_millisecond);
@ -1391,13 +1391,13 @@ public class SchemaChangeHandler extends AlterHandler {
@Override
protected void runAfterCatalogReady() {
if (cycle_count >= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) {
if (cycleCount >= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) {
clearFinishedOrCancelledSchemaChangeJobV2();
super.runAfterCatalogReady();
cycle_count = 0;
cycleCount = 0;
}
runAlterJobV2();
cycle_count++;
cycleCount++;
}
private void runAlterJobV2() {
@ -1868,12 +1868,11 @@ public class SchemaChangeHandler extends AlterHandler {
}
@Override
protected void addAlterJobV2(AlterJobV2 alterJob) {
public void addAlterJobV2(AlterJobV2 alterJob) {
super.addAlterJobV2(alterJob);
runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob);
}
private void clearFinishedOrCancelledSchemaChangeJobV2() {
Iterator<Map.Entry<Long, AlterJobV2>> iterator = runnableSchemaChangeJobV2.entrySet().iterator();
while (iterator.hasNext()) {

View File

@ -1815,8 +1815,6 @@ public class Catalog {
}
public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throws IOException {
Map<Long, AlterJobV2> alterJobsV2 = Maps.newHashMap();
// alter jobs
int size = dis.readInt();
long newChecksum = checksum ^ size;
@ -1825,13 +1823,11 @@ public class Catalog {
throw new IOException("There are [" + size + "] old alter jobs. Please downgrade FE to an older version and handle residual jobs");
}
if (Catalog.getCurrentCatalogJournalVersion() >= 2) {
// finished or cancelled jobs
size = dis.readInt();
newChecksum ^= size;
if (size > 0) {
throw new IOException("There are [" + size + "] old finished or cancelled alter jobs. Please downgrade FE to an older version and handle residual jobs");
}
// finished or cancelled jobs
size = dis.readInt();
newChecksum ^= size;
if (size > 0) {
throw new IOException("There are [" + size + "] old finished or cancelled alter jobs. Please downgrade FE to an older version and handle residual jobs");
}
// alter job v2
@ -1841,9 +1837,9 @@ public class Catalog {
AlterJobV2 alterJobV2 = AlterJobV2.read(dis);
if (type == JobType.ROLLUP || type == JobType.SCHEMA_CHANGE) {
if (type == JobType.ROLLUP) {
this.getRollupHandler().addAlterJobV2(alterJobV2);
this.getMaterializedViewHandler().addAlterJobV2(alterJobV2);
} else {
alterJobsV2.put(alterJobV2.getJobId(), alterJobV2);
this.getSchemaChangeHandler().addAlterJobV2(alterJobV2);
}
// ATTN : we just want to add tablet into TabletInvertedIndex when only PendingJob is checkpointed
// to prevent TabletInvertedIndex data loss,
@ -1853,7 +1849,7 @@ public class Catalog {
LOG.info("replay pending alter job when load alter job {} ", alterJobV2.getJobId());
}
} else {
alterJobsV2.put(alterJobV2.getJobId(), alterJobV2);
throw new IOException("Invalid alter job type: " + type.name());
}
}
@ -2112,7 +2108,14 @@ public class Catalog {
}
public long saveAlterJob(CountingDataOutputStream dos, long checksum, JobType type) throws IOException {
Map<Long, AlterJobV2> alterJobsV2 = Maps.newHashMap();
Map<Long, AlterJobV2> alterJobsV2;
if (type == JobType.ROLLUP) {
alterJobsV2 = this.getMaterializedViewHandler().getAlterJobsV2();
} else if (type == JobType.SCHEMA_CHANGE) {
alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2();
} else {
throw new IOException("Invalid alter job type: " + type.name());
}
// alter jobs == 0
// If the FE version upgrade from old version, if it have alter jobs, the FE will failed during start process
@ -3814,7 +3817,7 @@ public class Catalog {
}
Preconditions.checkNotNull(rollupIndexStorageType);
// set rollup index meta to olap table
List<Column> rollupColumns = getRollupHandler().checkAndPrepareMaterializedView(addRollupClause,
List<Column> rollupColumns = getMaterializedViewHandler().checkAndPrepareMaterializedView(addRollupClause,
olapTable, baseRollupIndex, false);
short rollupShortKeyColumnCount = Catalog.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties());
int rollupSchemaHash = Util.generateSchemaHash();
@ -5032,7 +5035,7 @@ public class Catalog {
return (SchemaChangeHandler) this.alter.getSchemaChangeHandler();
}
public MaterializedViewHandler getRollupHandler() {
public MaterializedViewHandler getMaterializedViewHandler() {
return (MaterializedViewHandler) this.alter.getMaterializedViewHandler();
}
@ -5310,7 +5313,7 @@ public class Catalog {
*/
public void cancelAlter(CancelAlterTableStmt stmt) throws DdlException {
if (stmt.getAlterType() == AlterType.ROLLUP) {
this.getRollupHandler().cancel(stmt);
this.getMaterializedViewHandler().cancel(stmt);
} else if (stmt.getAlterType() == AlterType.COLUMN) {
this.getSchemaChangeHandler().cancel(stmt);
} else {

View File

@ -72,7 +72,7 @@ public class JobsProcDir implements ProcDirInterface {
} else if (jobTypeName.equals(DELETE)) {
return new DeleteInfoProcDir(catalog.getDeleteHandler(), catalog.getLoadInstance(), db.getId());
} else if (jobTypeName.equals(ROLLUP)) {
return new RollupProcDir(catalog.getRollupHandler(), db);
return new RollupProcDir(catalog.getMaterializedViewHandler(), db);
} else if (jobTypeName.equals(SCHEMA_CHANGE)) {
return new SchemaChangeProcDir(catalog.getSchemaChangeHandler(), db);
} else if (jobTypeName.equals(EXPORT)) {
@ -119,7 +119,7 @@ public class JobsProcDir implements ProcDirInterface {
cancelledNum.toString(), totalNum.toString()));
// rollup
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler();
pendingNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING, dbId);
runningNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN, dbId)
+ materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING, dbId);

View File

@ -61,12 +61,12 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@ -752,7 +752,7 @@ public class MasterImpl {
AlterReplicaTask alterTask = (AlterReplicaTask) task;
try {
if (alterTask.getJobType() == JobType.ROLLUP) {
Catalog.getCurrentCatalog().getRollupHandler().handleFinishAlterTask(alterTask);
Catalog.getCurrentCatalog().getMaterializedViewHandler().handleFinishAlterTask(alterTask);
} else if (alterTask.getJobType() == JobType.SCHEMA_CHANGE) {
Catalog.getCurrentCatalog().getSchemaChangeHandler().handleFinishAlterTask(alterTask);
}

View File

@ -275,13 +275,13 @@ public class EditLog {
}
case OperationType.OP_DROP_ROLLUP: {
DropInfo info = (DropInfo) journal.getData();
catalog.getRollupHandler().replayDropRollup(info, catalog);
catalog.getMaterializedViewHandler().replayDropRollup(info, catalog);
break;
}
case OperationType.OP_BATCH_DROP_ROLLUP: {
BatchDropInfo batchDropInfo = (BatchDropInfo) journal.getData();
for (long indexId : batchDropInfo.getIndexIdSet()) {
catalog.getRollupHandler().replayDropRollup(
catalog.getMaterializedViewHandler().replayDropRollup(
new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(), indexId, false), catalog);
}
break;
@ -701,7 +701,7 @@ public class EditLog {
AlterJobV2 alterJob = (AlterJobV2) journal.getData();
switch (alterJob.getType()) {
case ROLLUP:
catalog.getRollupHandler().replayAlterJobV2(alterJob);
catalog.getMaterializedViewHandler().replayAlterJobV2(alterJob);
break;
case SCHEMA_CHANGE:
catalog.getSchemaChangeHandler().replayAlterJobV2(alterJob);
@ -714,7 +714,7 @@ public class EditLog {
case OperationType.OP_BATCH_ADD_ROLLUP: {
BatchAlterJobPersistInfo batchAlterJobV2 = (BatchAlterJobPersistInfo) journal.getData();
for (AlterJobV2 alterJobV2 : batchAlterJobV2.getAlterJobV2List()) {
catalog.getRollupHandler().replayAlterJobV2(alterJobV2);
catalog.getMaterializedViewHandler().replayAlterJobV2(alterJobV2);
}
break;
}
@ -759,7 +759,7 @@ public class EditLog {
RemoveAlterJobV2OperationLog log = (RemoveAlterJobV2OperationLog) journal.getData();
switch (log.getType()) {
case ROLLUP:
catalog.getRollupHandler().replayRemoveAlterJobV2(log);
catalog.getMaterializedViewHandler().replayRemoveAlterJobV2(log);
break;
case SCHEMA_CHANGE:
catalog.getSchemaChangeHandler().replayRemoveAlterJobV2(log);

View File

@ -134,7 +134,7 @@ public class AlterJobV2Test {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
// 2. check alter job
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
// 3. check show alter table column
String showAlterStmtStr = "show alter table rollup from test;";
@ -157,7 +157,7 @@ public class AlterJobV2Test {
String alterStmtStr = "alter table test.segmentv2 add rollup r1(k2, v1)";
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
String sql = "select k2, sum(v1) from test.segmentv2 group by k2";
@ -168,7 +168,7 @@ public class AlterJobV2Test {
alterStmtStr = "alter table test.segmentv2 add rollup segmentv2(k2, v1) properties('storage_format' = 'v2')";
alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
@ -219,7 +219,7 @@ public class AlterJobV2Test {
alterTable("alter table test.dup_table add rollup r1(v1,v2,k2,k1);");
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
ExceptionChecker.expectThrowsNoException(() -> alterTable("alter table test.dup_table modify column v2 varchar(2);"));
}
@ -253,7 +253,7 @@ public class AlterJobV2Test {
"city,\n" +
"user_id,\n" +
"date;");
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
}
}

View File

@ -578,7 +578,7 @@ public class AlterTest {
private void waitSchemaChangeJobDone(boolean rollupJob) throws Exception {
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
if (rollupJob) {
alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
}
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {

View File

@ -55,7 +55,7 @@ public class BatchRollupJobTest {
@Before
public void before() throws Exception {
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
alterJobs.clear();
// create database db1
@ -77,7 +77,7 @@ public class BatchRollupJobTest {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
Assert.assertEquals(3, alterJobs.size());
Database db = Catalog.getCurrentCatalog().getDbNullable("default_cluster:db1");
@ -127,7 +127,7 @@ public class BatchRollupJobTest {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
Assert.assertEquals(3, alterJobs.size());
List<Long> jobIds = Lists.newArrayList(alterJobs.keySet());

View File

@ -63,7 +63,6 @@ import org.apache.doris.transaction.FakeTransactionIDGenerator;
import org.apache.doris.transaction.GlobalTransactionMgr;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Assert;
@ -150,7 +149,7 @@ public class RollupJobV2Test {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(clause);
alterClauses.add(clause2);
@ -171,7 +170,7 @@ public class RollupJobV2Test {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(clause);
Database db = masterCatalog.getDbOrDdlException(CatalogTestUtil.testDbId1);
@ -188,7 +187,7 @@ public class RollupJobV2Test {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler();
// add a rollup job
ArrayList<AlterClause> alterClauses = new ArrayList<>();
@ -240,7 +239,7 @@ public class RollupJobV2Test {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler();
// add a rollup job
ArrayList<AlterClause> alterClauses = new ArrayList<>();
@ -378,7 +377,7 @@ public class RollupJobV2Test {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler();
Database db = masterCatalog.getDbOrDdlException(CatalogTestUtil.testDbId1);
OlapTable olapTable = (OlapTable) db.getTableOrDdlException(CatalogTestUtil.testTableId2);

View File

@ -122,7 +122,7 @@ public class CatalogOperationTest {
String alterStmtStr = "alter table test.newNewTest add rollup r1(k1)";
alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
Assert.assertEquals(1, alterJobs.size());
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {

View File

@ -432,7 +432,7 @@ public class TempPartitionTest {
alterTable(stmtStr, true);
// wait rollup finish
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
System.out.println(
@ -794,7 +794,7 @@ public class TempPartitionTest {
alterTable(stmtStr, true);
// wait rollup finish
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
System.out.println(
@ -1132,7 +1132,7 @@ public class TempPartitionTest {
alterTable(stmtStr, true);
// wait rollup finish
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
System.out.println(

View File

@ -340,7 +340,7 @@ abstract public class DorisHttpTestCase {
return new SchemaChangeHandler();
}
@Mock
MaterializedViewHandler getRollupHandler() {
MaterializedViewHandler getMaterializedViewHandler() {
return new MaterializedViewHandler();
}
@Mock

View File

@ -148,7 +148,7 @@ public class DorisAssert {
private void checkAlterJob() throws InterruptedException {
// check alter job
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
System.out.println("alter job " + alterJobV2.getDbId()