diff --git a/fe/src/main/java/org/apache/doris/catalog/Table.java b/fe/src/main/java/org/apache/doris/catalog/Table.java index 3973d14087..b9995631d5 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/src/main/java/org/apache/doris/catalog/Table.java @@ -18,7 +18,6 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.CreateTableStmt; -import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -269,7 +268,8 @@ public class Table extends MetaObject implements Writable { /* * 1. Only schedule OLAP table. * 2. If table is colocate with other table, not schedule it. - * 3. if table's state is not NORMAL, not schedule it. + * 3. if table's state is not NORMAL, we will schedule it, but will only repair VERSION_IMCOMPLETE status, + * this will be checked in TabletScheduler. */ public boolean needSchedule() { if (type != TableType.OLAP) { @@ -283,12 +283,6 @@ public class Table extends MetaObject implements Writable { return false; } - if (olapTable.getState() != OlapTableState.NORMAL) { - LOG.info("table {}'s state is not NORMAL: {}, skip tablet scheduler.", - name, olapTable.getState().name()); - return false; - } - return true; } } diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index 44d82f5ce3..18699bc375 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -429,10 +429,7 @@ public class TabletScheduler extends Daemon { throw new SchedException(Status.UNRECOVERABLE, "tbl does not exist"); } - // we may add a tablet of a NOT NORMAL table during balance, which should be blocked - if (tbl.getState() != OlapTableState.NORMAL) { - throw new SchedException(Status.UNRECOVERABLE, "tbl's state is not normal: " + tbl.getState()); - } + OlapTableState tableState = tbl.getState(); Partition partition = tbl.getPartition(tabletInfo.getPartitionId()); if (partition == null) { @@ -453,6 +450,18 @@ public class TabletScheduler extends Daemon { partition.getVisibleVersionHash(), tbl.getPartitionInfo().getReplicationNum(partition.getId())); + if (statusPair.first != TabletStatus.VERSION_INCOMPLETE && tableState != OlapTableState.NORMAL) { + // If table is under ALTER process, do not allow to add or delete replica. + // VERSION_INCOMPLETE will repair the replica in place, which is allowed. + throw new SchedException(Status.UNRECOVERABLE, + "table's state is not NORMAL but tablet status is " + statusPair.first.name()); + } + + if (tabletInfo.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) { + // If table is under ALTER process, do not allow to do balance. + throw new SchedException(Status.UNRECOVERABLE, "table's state is not NORMAL"); + } + tabletInfo.setTabletStatus(statusPair.first); if (statusPair.first == TabletStatus.HEALTHY && tabletInfo.getType() == TabletSchedCtx.Type.REPAIR) { throw new SchedException(Status.UNRECOVERABLE, "tablet is healthy"); @@ -686,11 +695,6 @@ public class TabletScheduler extends Daemon { // it will also delete replica from tablet inverted index. tabletInfo.deleteReplica(replica); - // TODO(cmy): this should be removed after I finish modifying alter job logic - // Catalog.getInstance().handleJobsWhenDeleteReplica(tabletInfo.getTblId(), tabletInfo.getPartitionId(), - // tabletInfo.getIndexId(), tabletInfo.getTabletId(), - // replica.getId(), replica.getBackendId()); - // write edit log ReplicaPersistInfo info = ReplicaPersistInfo.createForDelete(tabletInfo.getDbId(), tabletInfo.getTblId(), diff --git a/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java index 9de723e40f..52c75e2f03 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java @@ -79,6 +79,9 @@ public class BackendProcNode implements ProcNodeInterface { } info.add(String.format("%.2f", used) + " %"); + info.add(entry.getValue().getState().name()); + info.add(String.valueOf(entry.getValue().getPathHash())); + result.addRow(info); } diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index d00a22bd7a..577e4b0e04 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -468,14 +468,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TMasterOpResult forward(TMasterOpRequest params) throws TException { - ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext(); - // For NonBlockingServer, we can not get client ip. - if (connectionContext != null) { - TNetworkAddress clientAddress = connectionContext.getClient(); - - Frontend fe = Catalog.getInstance().getFeByHost(clientAddress.getHostname()); + TNetworkAddress clientAddr = getClientAddr(); + if (clientAddr != null) { + Frontend fe = Catalog.getInstance().getFeByHost(clientAddr.getHostname()); if (fe == null) { - LOG.warn("reject request from invalid host. client: {}", clientAddress); + LOG.warn("reject request from invalid host. client: {}", clientAddr); throw new TException("request from invalid host was rejected."); } } @@ -538,9 +535,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException { - LOG.info("receive loadTxnBegin request, db: {}, tbl: {}, label: {}", - request.getDb(), request.getTbl(), request.getLabel()); + TNetworkAddress clientAddr = getClientAddr(); + + LOG.info("receive loadTxnBegin request, db: {}, tbl: {}, label: {}, backend: {}", + request.getDb(), request.getTbl(), request.getLabel(), + clientAddr == null ? "unknown" : clientAddr.getHostname()); LOG.debug("txn begin request: {}", request); + TLoadTxnBeginResult result = new TLoadTxnBeginResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); @@ -741,5 +742,14 @@ public class FrontendServiceImpl implements FrontendService.Iface { } return new TStatus(TStatusCode.CANCELLED); } + + private TNetworkAddress getClientAddr() { + ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext(); + // For NonBlockingServer, we can not get client ip. + if (connectionContext != null) { + return connectionContext.getClient(); + } + return null; + } } diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 4a498b3830..d39c0e085c 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -24,6 +24,7 @@ import org.apache.doris.load.TxnStateChangeListener; import org.apache.doris.metric.MetricRepo; import org.apache.doris.task.PublishVersionTask; +import com.google.common.base.Joiner; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -102,6 +103,7 @@ public class TransactionState implements Writable { private long commitTime; private long finishTime; private String reason; + // error replica ids private Set errorReplicas; private CountDownLatch latch; @@ -397,6 +399,7 @@ public class TransactionState implements Writable { sb.append(", coordinator: ").append(coordinator); sb.append(", transaction status: ").append(transactionStatus); sb.append(", error replicas num: ").append(errorReplicas.size()); + sb.append(", replica ids: ").append(Joiner.on(",").join(errorReplicas.stream().limit(5).toArray())); sb.append(", prepare time: ").append(prepareTime); sb.append(", commit time: ").append(commitTime); sb.append(", finish time: ").append(finishTime);