[Bug] Fix alter table failed when none of new load jobs succeed on alter replica (#5617)

* [Bug] Fix alter table failed when none of new load jobs succeed on altering replica

Co-authored-by: caiconghui <caiconghui@xiaomi.com>
This commit is contained in:
caiconghui
2021-04-15 15:55:57 +08:00
committed by GitHub
parent 892fbf6ded
commit b6c0767754
8 changed files with 27 additions and 34 deletions

View File

@ -393,16 +393,16 @@ public abstract class AlterHandler extends MasterDaemon {
* We assume that the specified version is X.
* Case 1:
* After alter table process starts, there is no new load job being submitted. So the new replica
* should be with version (1-0). So we just modify the replica's version to partition's visible version, which is X.
* should be with version (0-1). So we just modify the replica's version to partition's visible version, which is X.
* Case 2:
* After alter table process starts, there are some load job being processed.
* Case 2.1:
* Only one new load job, and it failed on this replica. so the replica's last failed version should be X + 1
* and version is still 1. We should modify the replica's version to (last failed version - 1)
* None of them succeed on this replica. so the version is still 1. We should modify the replica's version to X.
* Case 2.2
* There are new load jobs after alter task, and at least one of them is succeed on this replica.
* So the replica's version should be larger than X. So we don't need to modify the replica version
* because its already looks like normal.
* In summary, we only need to update replica's version when replica's version is smaller than X
*/
public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundException {
Database db = Catalog.getCurrentCatalog().getDb(task.getDbId());
@ -431,19 +431,9 @@ public abstract class AlterHandler extends MasterDaemon {
LOG.info("before handle alter task tablet {}, replica: {}, task version: {}-{}",
task.getSignature(), replica, task.getVersion(), task.getVersionHash());
boolean versionChanged = false;
if (replica.getVersion() > task.getVersion()) {
// Case 2.2, do nothing
} else {
if (replica.getLastFailedVersion() > task.getVersion()) {
// Case 2.1
replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount());
versionChanged = true;
} else {
// Case 1
Preconditions.checkState(replica.getLastFailedVersion() == -1, replica.getLastFailedVersion());
replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount());
versionChanged = true;
}
if (replica.getVersion() < task.getVersion()) {
replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount());
versionChanged = true;
}
if (versionChanged) {

View File

@ -208,6 +208,7 @@ public abstract class AlterJobV2 implements Writable {
// table is stable, set is to ROLLUP and begin altering.
LOG.info("table {} is stable, start {} job {}", tableId, type);
tbl.setState(type == JobType.ROLLUP ? OlapTableState.ROLLUP : OlapTableState.SCHEMA_CHANGE);
errMsg = "";
return true;
}
} finally {

View File

@ -553,7 +553,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
* Should replay all changes before this job's state transfer to PENDING.
* These changes should be same as changes in RollupHander.processAddRollup()
*/
private void replayPending(RollupJobV2 replayedJob) {
private void replayCreateJob(RollupJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
// database may be dropped before replaying this log. just return
@ -602,7 +602,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
* Replay job in WAITING_TXN state.
* Should replay all changes in runPendingJob()
*/
private void replayWaitingTxn(RollupJobV2 replayedJob) {
private void replayPendingJob(RollupJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
// database may be dropped before replaying this log. just return
@ -633,7 +633,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
* Replay job in FINISHED state.
* Should replay all changes in runRuningJob()
*/
private void replayFinished(RollupJobV2 replayedJob) {
private void replayRunningJob(RollupJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTable(tableId);
@ -670,13 +670,13 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
RollupJobV2 replayedRollupJob = (RollupJobV2) replayedJob;
switch (replayedJob.jobState) {
case PENDING:
replayPending(replayedRollupJob);
replayCreateJob(replayedRollupJob);
break;
case WAITING_TXN:
replayWaitingTxn(replayedRollupJob);
replayPendingJob(replayedRollupJob);
break;
case FINISHED:
replayFinished(replayedRollupJob);
replayRunningJob(replayedRollupJob);
break;
case CANCELLED:
replayCancelled(replayedRollupJob);

View File

@ -665,7 +665,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
* Should replay all changes before this job's state transfer to PENDING.
* These changes should be same as changes in SchemaChangeHandler.createJob()
*/
private void replayPending(SchemaChangeJobV2 replayedJob) {
private void replayCreateJob(SchemaChangeJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
// database may be dropped before replaying this log. just return
@ -713,7 +713,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
* Replay job in WAITING_TXN state.
* Should replay all changes in runPendingJob()
*/
private void replayWaitingTxn(SchemaChangeJobV2 replayedJob) {
private void replayPendingJob(SchemaChangeJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
// database may be dropped before replaying this log. just return
@ -740,9 +740,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
/**
* Replay job in FINISHED state.
* Should replay all changes in runRuningJob()
* Should replay all changes in runRunningJob()
*/
private void replayFinished(SchemaChangeJobV2 replayedJob) {
private void replayRunningJob(SchemaChangeJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTable(tableId);
@ -777,13 +777,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
SchemaChangeJobV2 replayedSchemaChangeJob = (SchemaChangeJobV2) replayedJob;
switch (replayedJob.jobState) {
case PENDING:
replayPending(replayedSchemaChangeJob);
replayCreateJob(replayedSchemaChangeJob);
break;
case WAITING_TXN:
replayWaitingTxn(replayedSchemaChangeJob);
replayPendingJob(replayedSchemaChangeJob);
break;
case FINISHED:
replayFinished(replayedSchemaChangeJob);
replayRunningJob(replayedSchemaChangeJob);
break;
case CANCELLED:
replayCancelled(replayedSchemaChangeJob);

View File

@ -46,13 +46,13 @@ public class MaterializedIndex extends MetaObject implements Writable, GsonPostP
SHADOW; // index in SHADOW state is visible to load process, but invisible to query
public boolean isVisible() {
return this == IndexState.NORMAL || this == IndexState.SCHEMA_CHANGE;
return this == IndexState.NORMAL;
}
}
public enum IndexExtState {
ALL,
VISIBLE, // index state in NORMAL and SCHEMA_CHANGE
VISIBLE, // index state in NORMAL
SHADOW // index state in SHADOW
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.planner;
import org.apache.commons.lang.StringUtils;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Catalog;
@ -300,7 +301,8 @@ public class OlapTableSink extends DataSink {
Multimap<Long, Long> bePathsMap = tablet.getNormalReplicaBackendPathMap();
if (bePathsMap.keySet().size() < quorum) {
throw new UserException(InternalErrorCode.REPLICA_FEW_ERR,
"tablet " + tablet.getId() + " has few replicas: " + bePathsMap.keySet().size());
"tablet " + tablet.getId() + " has few replicas: " + bePathsMap.keySet().size()
+ ", alive backends: [" + StringUtils.join(bePathsMap.keySet(), ",") + "]");
}
locationParam.addToTablets(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
allBePathsMap.putAll(bePathsMap);

View File

@ -648,7 +648,7 @@ public class Backend implements Writable {
} else {
if (isAlive.compareAndSet(true, false)) {
isChanged = true;
LOG.info("{} is dead,", this.toString());
LOG.warn("{} is dead,", this.toString());
}
heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();

View File

@ -204,7 +204,7 @@ public class GlobalTransactionMgr implements Writable {
long publishTimeoutMillis = timeoutMillis - stopWatch.getTime();
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(db.getId());
if (publishTimeoutMillis < 0) {
// here commit transaction successfully cost too much time to cause publisTimeoutMillis is less than zero,
// here commit transaction successfully cost too much time to cause that publishTimeoutMillis is less than zero,
// so we just return false to indicate publish timeout
return false;
}