[fix](schema change) Change table state to NORMAL by SchemaChangeJob instead of SchemaChangeHandler (#19838)

fix problem:
If there is an unfinished schema change job (job-2), and before this time, another schema change job (job-1) of the same table has been finished.
Then restart fe, will replay edit log (pending log and waiting_txn log) for job-2, and the table's state is set to SHCEMA_CHANGE, but when loadAlterJob after replayJournal, will add job-1 to schema change handler, and then run the job-1 will set the table to NORMAL because of job-1 is done, but at this point, the job-2 is doing runWaitingTxnJob, in this function will check table's state, if not normal will throw exception, not change the job's state, and cannot cancel the job because the table is not under schema change.
This commit is contained in:
YueW
2023-05-23 18:23:12 +08:00
committed by GitHub
parent c246c22b23
commit 2596d68424
2 changed files with 30 additions and 40 deletions

View File

@ -1601,35 +1601,9 @@ public class SchemaChangeHandler extends AlterHandler {
});
}
}
if (alterJobsV2.isDone()) {
changeTableState(alterJobsV2.getDbId(), alterJobsV2.getTableId(), OlapTableState.NORMAL);
runnableSchemaChangeJobV2.remove(alterJobsV2.getJobId());
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJobsV2.getTableId(),
alterJobsV2.getJobId());
}
});
}
private void changeTableState(long dbId, long tableId, OlapTableState olapTableState) {
try {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
olapTable.writeLockOrMetaException();
try {
if (olapTable.getState() == olapTableState) {
return;
} else if (olapTable.getState() == OlapTableState.SCHEMA_CHANGE) {
olapTable.setState(olapTableState);
}
} finally {
olapTable.writeUnlock();
}
} catch (MetaNotFoundException e) {
LOG.warn("[INCONSISTENT META] changing table status failed after schema change job done", e);
}
}
public List<List<Comparable>> getAllAlterJobInfos() {
List<List<Comparable>> schemaChangeJobInfos = new LinkedList<>();
for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) {
@ -2197,11 +2171,6 @@ public class SchemaChangeHandler extends AlterHandler {
if (!schemaChangeJobV2.cancel("user cancelled")) {
throw new DdlException("Job can not be cancelled. State: " + schemaChangeJobV2.getJobState());
}
if (schemaChangeJobV2.isDone()) {
changeTableState(schemaChangeJobV2.getDbId(), schemaChangeJobV2.getTableId(), OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when cancel job, table id: {}, job id: {}",
schemaChangeJobV2.getTableId(), schemaChangeJobV2.getJobId());
}
return;
}
}
@ -2303,9 +2272,6 @@ public class SchemaChangeHandler extends AlterHandler {
while (iterator.hasNext()) {
AlterJobV2 alterJobV2 = iterator.next().getValue();
if (alterJobV2.isDone()) {
changeTableState(alterJobV2.getDbId(), alterJobV2.getTableId(), OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJobV2.getTableId(),
alterJobV2.getJobId());
iterator.remove();
}
}
@ -2325,12 +2291,6 @@ public class SchemaChangeHandler extends AlterHandler {
runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob);
}
super.replayAlterJobV2(alterJob);
if (alterJob.isDone()) {
changeTableState(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL);
runnableSchemaChangeJobV2.remove(alterJob.getJobId());
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJob.getTableId(),
alterJob.getJobId());
}
}
// the invoker should keep table's write lock

View File

@ -692,6 +692,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
Env.getCurrentEnv().getEditLog().logAlterJob(this);
LOG.info("schema change job finished: {}", jobId);
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
}
private void onFinished(OlapTable tbl) {
@ -797,6 +800,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
this.finishedTimeMs = System.currentTimeMillis();
LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
Env.getCurrentEnv().getEditLog().logAlterJob(this);
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when cancel, table id: {}, job id: {}", tableId, jobId);
return true;
}
@ -931,6 +938,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
jobState = JobState.FINISHED;
this.finishedTimeMs = replayedJob.finishedTimeMs;
LOG.info("replay finished schema change job: {} table id: {}", jobId, tableId);
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when replay finished, table id: {}, job id: {}", tableId, jobId);
}
/**
@ -942,6 +951,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
this.finishedTimeMs = replayedJob.finishedTimeMs;
this.errMsg = replayedJob.errMsg;
LOG.info("replay cancelled schema change job: {}", jobId);
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when replay cancelled, table id: {}, job id: {}", tableId, jobId);
}
@Override
@ -1016,6 +1027,25 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
return taskInfos;
}
private void changeTableState(long dbId, long tableId, OlapTableState olapTableState) {
try {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLockOrMetaException();
try {
if (olapTable.getState() == olapTableState) {
return;
} else if (olapTable.getState() == OlapTableState.SCHEMA_CHANGE) {
olapTable.setState(olapTableState);
}
} finally {
olapTable.writeUnlock();
}
} catch (MetaNotFoundException e) {
LOG.warn("[INCONSISTENT META] changing table status failed after schema change job done", e);
}
}
public String getOtherInfo() {
String info = null;
// can add info as needed