From 2596d68424c9ff6787cf027dac43c161ae855c65 Mon Sep 17 00:00:00 2001 From: YueW <45946325+Tanya-W@users.noreply.github.com> Date: Tue, 23 May 2023 18:23:12 +0800 Subject: [PATCH] [fix](schema change) Change table state to NORMAL by SchemaChangeJob instead of SchemaChangeHandler (#19838) fix problem: If there is an unfinished schema change job (job-2), and before this time, another schema change job (job-1) of the same table has been finished. Then restart fe, will replay edit log (pending log and waiting_txn log) for job-2, and the table's state is set to SHCEMA_CHANGE, but when loadAlterJob after replayJournal, will add job-1 to schema change handler, and then run the job-1 will set the table to NORMAL because of job-1 is done, but at this point, the job-2 is doing runWaitingTxnJob, in this function will check table's state, if not normal will throw exception, not change the job's state, and cannot cancel the job because the table is not under schema change. --- .../doris/alter/SchemaChangeHandler.java | 40 ------------------- .../apache/doris/alter/SchemaChangeJobV2.java | 30 ++++++++++++++ 2 files changed, 30 insertions(+), 40 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 35acb0272c..e7543fbedc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1601,35 +1601,9 @@ public class SchemaChangeHandler extends AlterHandler { }); } } - - if (alterJobsV2.isDone()) { - changeTableState(alterJobsV2.getDbId(), alterJobsV2.getTableId(), OlapTableState.NORMAL); - runnableSchemaChangeJobV2.remove(alterJobsV2.getJobId()); - LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJobsV2.getTableId(), - alterJobsV2.getJobId()); - } }); } - private void changeTableState(long dbId, long tableId, OlapTableState olapTableState) { - try { - Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); - OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP); - olapTable.writeLockOrMetaException(); - try { - if (olapTable.getState() == olapTableState) { - return; - } else if (olapTable.getState() == OlapTableState.SCHEMA_CHANGE) { - olapTable.setState(olapTableState); - } - } finally { - olapTable.writeUnlock(); - } - } catch (MetaNotFoundException e) { - LOG.warn("[INCONSISTENT META] changing table status failed after schema change job done", e); - } - } - public List> getAllAlterJobInfos() { List> schemaChangeJobInfos = new LinkedList<>(); for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) { @@ -2197,11 +2171,6 @@ public class SchemaChangeHandler extends AlterHandler { if (!schemaChangeJobV2.cancel("user cancelled")) { throw new DdlException("Job can not be cancelled. State: " + schemaChangeJobV2.getJobState()); } - if (schemaChangeJobV2.isDone()) { - changeTableState(schemaChangeJobV2.getDbId(), schemaChangeJobV2.getTableId(), OlapTableState.NORMAL); - LOG.info("set table's state to NORMAL when cancel job, table id: {}, job id: {}", - schemaChangeJobV2.getTableId(), schemaChangeJobV2.getJobId()); - } return; } } @@ -2303,9 +2272,6 @@ public class SchemaChangeHandler extends AlterHandler { while (iterator.hasNext()) { AlterJobV2 alterJobV2 = iterator.next().getValue(); if (alterJobV2.isDone()) { - changeTableState(alterJobV2.getDbId(), alterJobV2.getTableId(), OlapTableState.NORMAL); - LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJobV2.getTableId(), - alterJobV2.getJobId()); iterator.remove(); } } @@ -2325,12 +2291,6 @@ public class SchemaChangeHandler extends AlterHandler { runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob); } super.replayAlterJobV2(alterJob); - if (alterJob.isDone()) { - changeTableState(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL); - runnableSchemaChangeJobV2.remove(alterJob.getJobId()); - LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJob.getTableId(), - alterJob.getJobId()); - } } // the invoker should keep table's write lock diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index aff0cfc007..988781e5bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -692,6 +692,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 { Env.getCurrentEnv().getEditLog().logAlterJob(this); LOG.info("schema change job finished: {}", jobId); + + changeTableState(dbId, tableId, OlapTableState.NORMAL); + LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId); } private void onFinished(OlapTable tbl) { @@ -797,6 +800,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 { this.finishedTimeMs = System.currentTimeMillis(); LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg); Env.getCurrentEnv().getEditLog().logAlterJob(this); + + changeTableState(dbId, tableId, OlapTableState.NORMAL); + LOG.info("set table's state to NORMAL when cancel, table id: {}, job id: {}", tableId, jobId); + return true; } @@ -931,6 +938,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 { jobState = JobState.FINISHED; this.finishedTimeMs = replayedJob.finishedTimeMs; LOG.info("replay finished schema change job: {} table id: {}", jobId, tableId); + changeTableState(dbId, tableId, OlapTableState.NORMAL); + LOG.info("set table's state to NORMAL when replay finished, table id: {}, job id: {}", tableId, jobId); } /** @@ -942,6 +951,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 { this.finishedTimeMs = replayedJob.finishedTimeMs; this.errMsg = replayedJob.errMsg; LOG.info("replay cancelled schema change job: {}", jobId); + changeTableState(dbId, tableId, OlapTableState.NORMAL); + LOG.info("set table's state to NORMAL when replay cancelled, table id: {}, job id: {}", tableId, jobId); } @Override @@ -1016,6 +1027,25 @@ public class SchemaChangeJobV2 extends AlterJobV2 { return taskInfos; } + private void changeTableState(long dbId, long tableId, OlapTableState olapTableState) { + try { + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); + olapTable.writeLockOrMetaException(); + try { + if (olapTable.getState() == olapTableState) { + return; + } else if (olapTable.getState() == OlapTableState.SCHEMA_CHANGE) { + olapTable.setState(olapTableState); + } + } finally { + olapTable.writeUnlock(); + } + } catch (MetaNotFoundException e) { + LOG.warn("[INCONSISTENT META] changing table status failed after schema change job done", e); + } + } + public String getOtherInfo() { String info = null; // can add info as needed