From bc7e7409caa34c2fca241c6031e67ae07a28a568 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 24 Jan 2019 15:39:05 +0800 Subject: [PATCH] Allow repair VERSION_IMCOMPLETE tablet when ALTERing table (#583) Previously we do not allow repair tablet if the table it belongs to is under ALTER process. But it will possibly let the alter job failed due to some replica's failure of load. --- .../java/org/apache/doris/catalog/Table.java | 10 ++----- .../apache/doris/clone/TabletScheduler.java | 22 +++++++++------ .../doris/common/proc/BackendProcNode.java | 3 ++ .../doris/service/FrontendServiceImpl.java | 28 +++++++++++++------ .../doris/transaction/TransactionState.java | 3 ++ 5 files changed, 40 insertions(+), 26 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/Table.java b/fe/src/main/java/org/apache/doris/catalog/Table.java index 3973d14087..b9995631d5 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/src/main/java/org/apache/doris/catalog/Table.java @@ -18,7 +18,6 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.CreateTableStmt; -import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -269,7 +268,8 @@ public class Table extends MetaObject implements Writable { /* * 1. Only schedule OLAP table. * 2. If table is colocate with other table, not schedule it. - * 3. if table's state is not NORMAL, not schedule it. + * 3. if table's state is not NORMAL, we will schedule it, but will only repair VERSION_IMCOMPLETE status, + * this will be checked in TabletScheduler. */ public boolean needSchedule() { if (type != TableType.OLAP) { @@ -283,12 +283,6 @@ public class Table extends MetaObject implements Writable { return false; } - if (olapTable.getState() != OlapTableState.NORMAL) { - LOG.info("table {}'s state is not NORMAL: {}, skip tablet scheduler.", - name, olapTable.getState().name()); - return false; - } - return true; } } diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index 44d82f5ce3..18699bc375 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -429,10 +429,7 @@ public class TabletScheduler extends Daemon { throw new SchedException(Status.UNRECOVERABLE, "tbl does not exist"); } - // we may add a tablet of a NOT NORMAL table during balance, which should be blocked - if (tbl.getState() != OlapTableState.NORMAL) { - throw new SchedException(Status.UNRECOVERABLE, "tbl's state is not normal: " + tbl.getState()); - } + OlapTableState tableState = tbl.getState(); Partition partition = tbl.getPartition(tabletInfo.getPartitionId()); if (partition == null) { @@ -453,6 +450,18 @@ public class TabletScheduler extends Daemon { partition.getVisibleVersionHash(), tbl.getPartitionInfo().getReplicationNum(partition.getId())); + if (statusPair.first != TabletStatus.VERSION_INCOMPLETE && tableState != OlapTableState.NORMAL) { + // If table is under ALTER process, do not allow to add or delete replica. + // VERSION_INCOMPLETE will repair the replica in place, which is allowed. + throw new SchedException(Status.UNRECOVERABLE, + "table's state is not NORMAL but tablet status is " + statusPair.first.name()); + } + + if (tabletInfo.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) { + // If table is under ALTER process, do not allow to do balance. + throw new SchedException(Status.UNRECOVERABLE, "table's state is not NORMAL"); + } + tabletInfo.setTabletStatus(statusPair.first); if (statusPair.first == TabletStatus.HEALTHY && tabletInfo.getType() == TabletSchedCtx.Type.REPAIR) { throw new SchedException(Status.UNRECOVERABLE, "tablet is healthy"); @@ -686,11 +695,6 @@ public class TabletScheduler extends Daemon { // it will also delete replica from tablet inverted index. tabletInfo.deleteReplica(replica); - // TODO(cmy): this should be removed after I finish modifying alter job logic - // Catalog.getInstance().handleJobsWhenDeleteReplica(tabletInfo.getTblId(), tabletInfo.getPartitionId(), - // tabletInfo.getIndexId(), tabletInfo.getTabletId(), - // replica.getId(), replica.getBackendId()); - // write edit log ReplicaPersistInfo info = ReplicaPersistInfo.createForDelete(tabletInfo.getDbId(), tabletInfo.getTblId(), diff --git a/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java index 9de723e40f..52c75e2f03 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java @@ -79,6 +79,9 @@ public class BackendProcNode implements ProcNodeInterface { } info.add(String.format("%.2f", used) + " %"); + info.add(entry.getValue().getState().name()); + info.add(String.valueOf(entry.getValue().getPathHash())); + result.addRow(info); } diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index d00a22bd7a..577e4b0e04 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -468,14 +468,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TMasterOpResult forward(TMasterOpRequest params) throws TException { - ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext(); - // For NonBlockingServer, we can not get client ip. - if (connectionContext != null) { - TNetworkAddress clientAddress = connectionContext.getClient(); - - Frontend fe = Catalog.getInstance().getFeByHost(clientAddress.getHostname()); + TNetworkAddress clientAddr = getClientAddr(); + if (clientAddr != null) { + Frontend fe = Catalog.getInstance().getFeByHost(clientAddr.getHostname()); if (fe == null) { - LOG.warn("reject request from invalid host. client: {}", clientAddress); + LOG.warn("reject request from invalid host. client: {}", clientAddr); throw new TException("request from invalid host was rejected."); } } @@ -538,9 +535,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException { - LOG.info("receive loadTxnBegin request, db: {}, tbl: {}, label: {}", - request.getDb(), request.getTbl(), request.getLabel()); + TNetworkAddress clientAddr = getClientAddr(); + + LOG.info("receive loadTxnBegin request, db: {}, tbl: {}, label: {}, backend: {}", + request.getDb(), request.getTbl(), request.getLabel(), + clientAddr == null ? "unknown" : clientAddr.getHostname()); LOG.debug("txn begin request: {}", request); + TLoadTxnBeginResult result = new TLoadTxnBeginResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); @@ -741,5 +742,14 @@ public class FrontendServiceImpl implements FrontendService.Iface { } return new TStatus(TStatusCode.CANCELLED); } + + private TNetworkAddress getClientAddr() { + ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext(); + // For NonBlockingServer, we can not get client ip. + if (connectionContext != null) { + return connectionContext.getClient(); + } + return null; + } } diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 4a498b3830..d39c0e085c 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -24,6 +24,7 @@ import org.apache.doris.load.TxnStateChangeListener; import org.apache.doris.metric.MetricRepo; import org.apache.doris.task.PublishVersionTask; +import com.google.common.base.Joiner; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -102,6 +103,7 @@ public class TransactionState implements Writable { private long commitTime; private long finishTime; private String reason; + // error replica ids private Set errorReplicas; private CountDownLatch latch; @@ -397,6 +399,7 @@ public class TransactionState implements Writable { sb.append(", coordinator: ").append(coordinator); sb.append(", transaction status: ").append(transactionStatus); sb.append(", error replicas num: ").append(errorReplicas.size()); + sb.append(", replica ids: ").append(Joiner.on(",").join(errorReplicas.stream().limit(5).toArray())); sb.append(", prepare time: ").append(prepareTime); sb.append(", commit time: ").append(commitTime); sb.append(", finish time: ").append(finishTime);