From fbab8afe24385704ce91c7363be2770b42e138b5 Mon Sep 17 00:00:00 2001 From: caiconghui <55968745+caiconghui@users.noreply.github.com> Date: Tue, 30 Nov 2021 22:08:32 +0800 Subject: [PATCH] [feature] Support disable query and load for backend to make Doris more robust and set default value to 1 for max_query_retry_time (#7155) ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true"); ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true"); --- .../administrator-guide/config/fe_config.md | 2 +- .../Administration/ALTER SYSTEM.md | 43 ++++++--- .../administrator-guide/config/fe_config.md | 2 +- .../Administration/ALTER SYSTEM.md | 10 ++ .../doris/analysis/AddBackendClause.java | 2 +- .../doris/analysis/ModifyBackendClause.java | 43 ++++++++- .../apache/doris/catalog/MetadataViewer.java | 2 +- .../java/org/apache/doris/catalog/Tablet.java | 6 +- .../doris/clone/BackendLoadStatistic.java | 2 +- .../ColocateTableCheckerAndBalancer.java | 2 +- .../doris/clone/PartitionRebalancer.java | 4 +- .../apache/doris/clone/TabletSchedCtx.java | 2 +- .../apache/doris/clone/TabletScheduler.java | 2 +- .../java/org/apache/doris/common/Config.java | 2 +- .../doris/common/util/PropertyAnalyzer.java | 16 +++- .../apache/doris/http/rest/LoadAction.java | 9 +- .../apache/doris/httpv2/rest/LoadAction.java | 9 +- .../doris/httpv2/util/LoadSubmitter.java | 5 +- .../doris/journal/bdbje/BDBJEJournal.java | 6 +- .../java/org/apache/doris/load/ExportJob.java | 2 +- .../main/java/org/apache/doris/load/Load.java | 2 +- .../apache/doris/planner/BrokerScanNode.java | 3 +- .../java/org/apache/doris/qe/Coordinator.java | 13 +-- .../doris/qe/InsertStreamTxnExecutor.java | 8 +- .../org/apache/doris/qe/MultiLoadMgr.java | 7 +- .../org/apache/doris/qe/ShowExecutor.java | 4 +- .../org/apache/doris/qe/SimpleScheduler.java | 10 +- .../java/org/apache/doris/system/Backend.java | 32 ++++++- .../doris/system/SystemInfoService.java | 96 ++++++++++++++++--- .../apache/doris/task/ExportPendingTask.java | 2 +- .../apache/doris/backup/RestoreJobTest.java | 4 +- ...endTagTest.java => ModifyBackendTest.java} | 24 ++++- .../ColocateTableCheckerAndBalancerTest.java | 20 ++-- .../load/sync/canal/CanalSyncDataTest.java | 2 +- .../org/apache/doris/qe/MultiLoadMgrTest.java | 10 +- .../doris/utframe/DemoMultiBackendsTest.java | 2 +- 36 files changed, 308 insertions(+), 102 deletions(-) rename fe/fe-core/src/test/java/org/apache/doris/catalog/{ModifyBackendTagTest.java => ModifyBackendTest.java} (87%) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index b2bb01962b..50e2634524 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -787,7 +787,7 @@ The tryLock timeout configuration of catalog lock. Normally it does not need to ### max_query_retry_time -Default:2 +Default:1 IsMutable:true diff --git a/docs/en/sql-reference/sql-statements/Administration/ALTER SYSTEM.md b/docs/en/sql-reference/sql-statements/Administration/ALTER SYSTEM.md index 5601093ded..43f1dc2843 100644 --- a/docs/en/sql-reference/sql-statements/Administration/ALTER SYSTEM.md +++ b/docs/en/sql-reference/sql-statements/Administration/ALTER SYSTEM.md @@ -57,23 +57,28 @@ Explain: 4) Node offline operations are used to secure offline nodes. This operation is asynchronous. If successful, the node will eventually be removed from the metadata. If it fails, the offline will not be completed. 5) The offline operation of the node can be cancelled manually. See CANCEL DECOMMISSION for details 6) Load error hub: -Currently, two types of Hub are supported: Mysql and Broker. You need to specify "type" = "mysql" or "type" = "broker" in PROPERTIES. -If you need to delete the current load error hub, you can set type to null. -1) When using the Mysql type, the error information generated when importing will be inserted into the specified MySQL library table, and then the error information can be viewed directly through the show load warnings statement. + Currently, two types of Hub are supported: Mysql and Broker. You need to specify "type" = "mysql" or "type" = "broker" in PROPERTIES. + If you need to delete the current load error hub, you can set type to null. + 1) When using the Mysql type, the error information generated when importing will be inserted into the specified MySQL library table, and then the error information can be viewed directly through the show load warnings statement. -Hub of Mysql type needs to specify the following parameters: -host: mysql host -port: mysql port -user: mysql user -password: mysql password -database mysql database -table: mysql table + Hub of Mysql type needs to specify the following parameters: + host: mysql host + port: mysql port + user: mysql user + password: mysql password + database mysql database + table: mysql table -2) When the Broker type is used, the error information generated when importing will form a file and be written to the designated remote storage system through the broker. Make sure that the corresponding broker is deployed -Hub of Broker type needs to specify the following parameters: -Broker: Name of broker -Path: Remote Storage Path -Other properties: Other information necessary to access remote storage, such as authentication information. + 2) When the Broker type is used, the error information generated when importing will form a file and be written to the designated remote storage system through the broker. Make sure that the corresponding broker is deployed + Hub of Broker type needs to specify the following parameters: + Broker: Name of broker + Path: Remote Storage Path + Other properties: Other information necessary to access remote storage, such as authentication information. + +7) Modify BE node attributes currently supports the following attributes: + 1. tag.location:Resource tag + 2. disable_query: Query disabled attribute + 3. disable_load: Load disabled attribute ## example @@ -121,5 +126,13 @@ ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a"); +10. Modify the query disabled attribute of BE + +ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true"); + +11. Modify the load disabled attribute of BE + +ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true"); + ## keyword AGE,SYSTEM,BACKGROUND,BROKER,FREE diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index ce4b7a79f5..6fdc1e56db 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -775,7 +775,7 @@ fe 会在每隔 es_state_sync_interval_secs 调用 es api 获取 es 索引分片 ### max_query_retry_time -默认值:2 +默认值:1 是否可以动态配置:true diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ALTER SYSTEM.md b/docs/zh-CN/sql-reference/sql-statements/Administration/ALTER SYSTEM.md index 88f427c7e0..8af7db553d 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Administration/ALTER SYSTEM.md +++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ALTER SYSTEM.md @@ -78,6 +78,8 @@ under the License. 7) 修改 BE 节点属性目前支持以下属性: 1. tag.location:资源标签 + 2. disable_query: 查询禁用属性 + 3. disable_load: 导入禁用属性 ## example @@ -124,7 +126,15 @@ under the License. 9. 修改 BE 的资源标签 ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a"); + + 10. 修改 BE 的查询禁用属性 + ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true"); + + 11. 修改 BE 的导入禁用属性 + + ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true"); + ## keyword ALTER,SYSTEM,BACKEND,BROKER,FREE diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AddBackendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AddBackendClause.java index 302bc998cd..c29cfea548 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AddBackendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AddBackendClause.java @@ -64,7 +64,7 @@ public class AddBackendClause extends BackendClause { @Override public void analyze(Analyzer analyzer) throws AnalysisException { super.analyze(analyzer); - tag = PropertyAnalyzer.analyzeBackendTagProperties(properties); + tag = PropertyAnalyzer.analyzeBackendTagProperties(properties, Tag.DEFAULT_BACKEND_TAG); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyBackendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyBackendClause.java index f6d6262a8c..acc80116d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyBackendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyBackendClause.java @@ -23,12 +23,17 @@ import org.apache.doris.resource.Tag; import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; + import java.util.List; import java.util.Map; public class ModifyBackendClause extends BackendClause { protected Map properties = Maps.newHashMap(); - private Tag tag; + protected Map analyzedProperties = Maps.newHashMap(); + private Tag tag = null; + private Boolean isQueryDisabled = null; + private Boolean isLoadDisabled = null; public ModifyBackendClause(List hostPorts, Map properties) { super(hostPorts); @@ -38,13 +43,38 @@ public class ModifyBackendClause extends BackendClause { @Override public void analyze(Analyzer analyzer) throws AnalysisException { super.analyze(analyzer); - tag = PropertyAnalyzer.analyzeBackendTagProperties(properties); + tag = PropertyAnalyzer.analyzeBackendTagProperties(properties, null); + isQueryDisabled = PropertyAnalyzer.analyzeBackendDisableProperties(properties, + PropertyAnalyzer.PROPERTIES_DISABLE_QUERY, null); + isLoadDisabled = PropertyAnalyzer.analyzeBackendDisableProperties(properties, + PropertyAnalyzer.PROPERTIES_DISABLE_LOAD, null); + if (tag != null) { + analyzedProperties.put(tag.type, tag.value); + } + if (isQueryDisabled != null) { + analyzedProperties.put(PropertyAnalyzer.PROPERTIES_DISABLE_QUERY, String.valueOf(isQueryDisabled)); + } + if (isLoadDisabled != null) { + analyzedProperties.put(PropertyAnalyzer.PROPERTIES_DISABLE_LOAD, String.valueOf(isLoadDisabled)); + } + if (!properties.isEmpty()) { + throw new AnalysisException("unknown properties setting for key (" + + StringUtils.join(properties.keySet(), ",") + ")"); + } } public Tag getTag() { return tag; } + public Boolean isQueryDisabled() { + return isQueryDisabled; + } + + public Boolean isLoadDisabled() { + return isLoadDisabled; + } + @Override public String toSql() { StringBuilder sb = new StringBuilder(); @@ -55,6 +85,15 @@ public class ModifyBackendClause extends BackendClause { sb.append(", "); } } + sb.append(" SET ("); + for (String key : analyzedProperties.keySet()) { + sb.append("\"").append(key).append("\"=\""); + sb.append(analyzedProperties.get(analyzedProperties.get(key))).append("\","); + } + if (!analyzedProperties.isEmpty()) { + sb.deleteCharAt(sb.length() - 1); + } + sb.append(")"); return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java index 9acce81436..98309f7a3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -84,7 +84,7 @@ public class MetadataViewer { ReplicaStatus status = ReplicaStatus.OK; Backend be = infoService.getBackend(replica.getBackendId()); - if (be == null || !be.isAvailable() || replica.isBad()) { + if (be == null || !be.isAlive() || replica.isBad()) { status = ReplicaStatus.DEAD; } else if (replica.getVersion() < visibleVersion || replica.getLastFailedVersion() > 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index fe6b7a78bd..36ca4c4e8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -205,7 +205,7 @@ public class Tablet extends MetaObject implements Writable { } ReplicaState state = replica.getState(); - if (infoService.checkBackendAlive(replica.getBackendId()) && state.canLoad()) { + if (infoService.checkBackendLoadAvailable(replica.getBackendId()) && state.canLoad()) { map.put(replica.getBackendId(), replica.getPathHash()); } } @@ -431,7 +431,7 @@ public class Tablet extends MetaObject implements Writable { } aliveAndVersionComplete++; - if (!backend.isAvailable()) { + if (!backend.isScheduleAvailable()) { // this replica is alive, version complete, but backend is not available continue; } @@ -492,7 +492,7 @@ public class Tablet extends MetaObject implements Writable { List replicaBeIds = replicas.stream() .map(Replica::getBackendId).collect(Collectors.toList()); List availableBeIds = aliveBeIdsInCluster.stream() - .filter(systemInfoService::checkBackendAvailable) + .filter(systemInfoService::checkBackendScheduleAvailable) .collect(Collectors.toList()); if (replicaBeIds.containsAll(availableBeIds) && availableBeIds.size() >= replicationNum diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index 005ea1e691..8c575d8cb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -175,7 +175,7 @@ public class BackendLoadStatistic { throw new LoadBalanceException("backend " + beId + " does not exist"); } - isAvailable = be.isAvailable(); + isAvailable = be.isScheduleAvailable() && be.isLoadAvailable() && be.isQueryAvailable(); ImmutableMap disks = be.getDisks(); for (DiskInfo diskInfo : disks.values()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 14676d2ac3..36f473e9ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -575,7 +575,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { return false; } else if (!be.getTag().equals(tag) || excludedBeIds.contains(be.getId())) { return false; - } else if (!be.isAvailable()) { + } else if (!be.isScheduleAvailable()) { // 1. BE is dead for a long time // 2. BE is under decommission if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index d4678b787b..9fefd9b833 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -279,8 +279,8 @@ public class PartitionRebalancer extends Rebalancer { // So we can't do skew check. // Just do some basic checks, e.g. server available. private void checkMoveValidation(TabletMove move) throws IllegalStateException { - boolean fromAvailable = infoService.checkBackendAvailable(move.fromBe); - boolean toAvailable = infoService.checkBackendAvailable(move.toBe); + boolean fromAvailable = infoService.checkBackendScheduleAvailable(move.fromBe); + boolean toAvailable = infoService.checkBackendScheduleAvailable(move.toBe); Preconditions.checkState(fromAvailable && toAvailable, move + "'s bes are not all available: from " + fromAvailable + ", to " + toAvailable); // To be improved } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index d86c2c5ae8..c67c3cc51f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -573,7 +573,7 @@ public class TabletSchedCtx implements Comparable { } Backend be = infoService.getBackend(replica.getBackendId()); - if (be == null || !be.isAvailable()) { + if (be == null || !be.isScheduleAvailable()) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 7aca236ad8..2fcc82151d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -800,7 +800,7 @@ public class TabletScheduler extends MasterDaemon { // this case should be handled in deleteBackendDropped() continue; } - if (!be.isAvailable()) { + if (!be.isScheduleAvailable()) { deleteReplicaInternal(tabletCtx, replica, "backend unavailable", force); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 23341381fb..fe13d9bb11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1043,7 +1043,7 @@ public class Config extends ConfigBase { * You may reduce this number to avoid Avalanche disaster. */ @ConfField(mutable = true) - public static int max_query_retry_time = 2; + public static int max_query_retry_time = 1; /** * The tryLock timeout configuration of catalog lock. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 8cdef52f3f..29e7c7f716 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -98,6 +98,10 @@ public class PropertyAnalyzer { public static final String TAG_LOCATION = "tag.location"; + public static final String PROPERTIES_DISABLE_QUERY = "disable_query"; + + public static final String PROPERTIES_DISABLE_LOAD = "disable_load"; + public static DataProperty analyzeDataProperty(Map properties, DataProperty oldDataProperty) throws AnalysisException { if (properties == null) { @@ -455,12 +459,20 @@ public class PropertyAnalyzer { return ScalarType.createType(type); } - public static Tag analyzeBackendTagProperties(Map properties) throws AnalysisException { + public static Boolean analyzeBackendDisableProperties(Map properties, String key, Boolean defaultValue) throws AnalysisException { + if (properties.containsKey(key)) { + String value = properties.remove(key); + return Boolean.valueOf(value); + } + return defaultValue; + } + + public static Tag analyzeBackendTagProperties(Map properties, Tag defaultValue) throws AnalysisException { if (properties.containsKey(TAG_LOCATION)) { String tagVal = properties.remove(TAG_LOCATION); return Tag.create(Tag.TYPE_LOCATION, tagVal); } - return Tag.DEFAULT_BACKEND_TAG; + return defaultValue; } // There are 2 kinds of replication property: diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/LoadAction.java index 9981007001..f0d51065eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/LoadAction.java @@ -28,6 +28,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Strings; @@ -116,15 +117,17 @@ public class LoadAction extends RestBaseAction { redirectAddr = execEnv.getMultiLoadMgr().redirectAddr(fullDbName, label); } else { // Choose a backend sequentially. + SystemInfoService.BeAvailablePredicate beAvailablePredicate = + new SystemInfoService.BeAvailablePredicate(false, false, true); List backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag( - 1, true, false, clusterName, null, null); + 1, beAvailablePredicate, false, clusterName, null, null); if (backendIds == null) { - throw new DdlException("No backend alive."); + throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); } Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); if (backend == null) { - throw new DdlException("No backend alive."); + throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); } redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 194e244e6e..b674834b74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -25,6 +25,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Strings; @@ -128,15 +129,17 @@ public class LoadAction extends RestBaseController { } } else { // Choose a backend sequentially. + SystemInfoService.BeAvailablePredicate beAvailablePredicate = + new SystemInfoService.BeAvailablePredicate(false, false, true); List backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag( - 1, true, false, clusterName, null, null); + 1, beAvailablePredicate, false, clusterName, null, null); if (backendIds == null) { - return new RestBaseResult("No backend alive."); + return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); } Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); if (backend == null) { - return new RestBaseResult("No backend alive."); + return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); } redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java index 62338d0960..1a715cf0e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java @@ -137,8 +137,11 @@ public class LoadSubmitter { } private Backend selectOneBackend() throws DdlException { + SystemInfoService.BeAvailablePredicate beAvailablePredicate = + new SystemInfoService.BeAvailablePredicate(false, false, true); List backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag( - 1, true, false, SystemInfoService.DEFAULT_CLUSTER, null, null); + 1, beAvailablePredicate, false, + SystemInfoService.DEFAULT_CLUSTER, null, null); if (backendIds == null) { throw new DdlException("No alive backend"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index d1c29fe1c6..c4c1b71abf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -144,8 +144,10 @@ public class BDBJEJournal implements Journal { // Parameter null means auto commit if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) { writeSucceed = true; - LOG.debug("master write journal {} finished. db name {}, current time {}", - id, currentJournalDB.getDatabaseName(), System.currentTimeMillis()); + if (LOG.isDebugEnabled()) { + LOG.debug("master write journal {} finished. db name {}, current time {}", + id, currentJournalDB.getDatabaseName(), System.currentTimeMillis()); + } break; } } catch (DatabaseException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 4a232e2774..782c5469c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -655,7 +655,7 @@ public class ExportJob implements Writable { continue; } long backendId = backend.getId(); - if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) { + if (!Catalog.getCurrentSystemInfo().checkBackendQueryAvailable(backendId)) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 4e0d4d406e..2dc0ae83d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -516,7 +516,7 @@ public class Load { TNetworkAddress beAddress = dataDescription.getBeAddr(); Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(beAddress.getHostname(), beAddress.getPort()); - if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backend.getId())) { + if (!Catalog.getCurrentSystemInfo().checkBackendLoadAvailable(backend.getId())) { throw new DdlException("Etl backend is null or not available"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 1aa9a389c1..9f75814163 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -390,7 +390,8 @@ public class BrokerScanNode extends LoadScanNode { private void assignBackends() throws UserException { backends = Lists.newArrayList(); for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) { - if (be.isAvailable()) { + // broker scan node is used for query or load + if (be.isQueryAvailable() && be.isLoadAvailable()) { backends.add(be); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index f1adb358df..5e72189749 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -61,6 +61,7 @@ import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TDescriptorTable; @@ -922,7 +923,7 @@ public class Coordinator { Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort( host.getHostname(), host.getPort()); if (backend == null) { - throw new UserException("there is no scanNode Backend"); + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG); } TNetworkAddress dest = new TNetworkAddress(backend.getHost(), backend.getBeRpcPort()); return dest; @@ -932,7 +933,7 @@ public class Coordinator { Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort( host.getHostname(), host.getPort()); if (backend == null) { - throw new UserException("there is no scanNode Backend"); + throw new UserException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); } if (backend.getBrpcPort() < 0) { return null; @@ -1040,8 +1041,8 @@ public class Coordinator { execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef); } if (execHostport == null) { - LOG.warn("DataPartition UNPARTITIONED, no scanNode Backend"); - throw new UserException("there is no scanNode Backend"); + LOG.warn("DataPartition UNPARTITIONED, no scanNode Backend available"); + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG); } if (backendIdRef.getRef() != null) { // backendIdRef can be null is we call getHostByCurrentBackend() before @@ -1169,7 +1170,7 @@ public class Coordinator { execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef); } if (execHostport == null) { - throw new UserException("there is no scanNode Backend"); + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG); } if (backendIdRef.getRef() != null) { // backendIdRef can be null is we call getHostByCurrentBackend() before @@ -1718,7 +1719,7 @@ public class Coordinator { Reference backendIdRef = new Reference(); TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef); if (execHostPort == null) { - throw new UserException("there is no scanNode Backend"); + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG); } //the backend with buckendId is not alive, chose another new backend if (backendIdRef.getRef() != buckendId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index 97153954df..2cb8fe4ea6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -26,6 +26,7 @@ import org.apache.doris.proto.Types; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TBrokerRangeDesc; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -62,10 +63,13 @@ public class InsertStreamTxnExecutor { StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask); TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); + SystemInfoService.BeAvailablePredicate beAvailablePredicate = + new SystemInfoService.BeAvailablePredicate(false, true, true); List beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag( - 1, true, false, txnEntry.getDb().getClusterName(), null, null); + 1, beAvailablePredicate, false, + txnEntry.getDb().getClusterName(), null, null); if (beIds == null || beIds.isEmpty()) { - throw new UserException("there is no scanNode Backend."); + throw new UserException("there is no backend load available or scanNode backend available."); } tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index bb0f994a92..fd8cf80cac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -41,6 +41,7 @@ import org.apache.doris.load.loadv2.JobState; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TNetworkAddress; @@ -91,10 +92,12 @@ public class MultiLoadMgr { throw new LabelAlreadyUsedException(label); } MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties); + SystemInfoService.BeAvailablePredicate beAvailablePredicate = + new SystemInfoService.BeAvailablePredicate(false, false, true); List backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(1, - true, false, ConnectContext.get().getClusterName(), null, null); + beAvailablePredicate, false, ConnectContext.get().getClusterName(), null, null); if (backendIds == null) { - throw new DdlException("No backend alive."); + throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); } multiLoadDesc.setBackendId(backendIds.get(0)); infoMap.put(multiLabel, multiLoadDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 30e8a53ea4..1b7e663883 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -1161,8 +1161,8 @@ public class ShowExecutor { if (be == null) { throw new AnalysisException(host + ":" + port + " is not a valid backend"); } - if (!be.isAvailable()) { - throw new AnalysisException("Backend " + host + ":" + port + " is not available"); + if (!be.isAlive()) { + throw new AnalysisException("Backend " + host + ":" + port + " is not alive"); } if (!url.getPath().equals("/api/_load_error_log")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index 3b5b8a09b6..428b7b113f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -87,7 +87,7 @@ public class SimpleScheduler { } // no backend returned - throw new UserException("there is no scanNode Backend. " + + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG + getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()), backends, locations.size())); } @@ -119,7 +119,7 @@ public class SimpleScheduler { } // no backend returned - throw new UserException("there is no scanNode Backend. " + + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG + getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()), backends, locations.size())); } @@ -161,7 +161,7 @@ public class SimpleScheduler { } } // no backend returned - throw new UserException("there is no scanNode Backend. " + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG + getBackendErrorMsg(Lists.newArrayList(backends.keySet()), backends, 3)); } @@ -178,6 +178,8 @@ public class SimpleScheduler { } else if (blacklistBackends.containsKey(beId)) { Pair pair = blacklistBackends.get(beId); res.add(beId + ": in black list(" + (pair == null ? "unknown" : pair.second) + ")"); + } else if (!be.isQueryAvailable()) { + res.add(beId + ": disable query"); } else { res.add(beId + ": unknown"); } @@ -195,7 +197,7 @@ public class SimpleScheduler { } public static boolean isAvailable(Backend backend) { - return (backend != null && backend.isAlive() && !blacklistBackends.containsKey(backend.getId())); + return (backend != null && backend.isQueryAvailable() && !blacklistBackends.containsKey(backend.getId())); } private static class UpdateBlacklistThread implements Runnable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 0c740b4f37..4ae145f9b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -200,6 +200,18 @@ public class Backend implements Writable { this.backendStatus.lastStreamLoadTime = lastStreamLoadTime; } + public boolean isQueryDisabled() { return backendStatus.isQueryDisabled; } + + public void setQueryDisabled(boolean isQueryDisabled) { + this.backendStatus.isQueryDisabled = isQueryDisabled; + } + + public boolean isLoadDisabled() {return backendStatus.isLoadDisabled; } + + public void setLoadDisabled(boolean isLoadDisabled) { + this.backendStatus.isLoadDisabled = isLoadDisabled; + } + // for test only public void updateOnce(int bePort, int httpPort, int beRpcPort) { if (this.bePort != bePort) { @@ -285,8 +297,16 @@ public class Backend implements Writable { return this.isDecommissioned.get(); } - public boolean isAvailable() { - return this.isAlive.get() && !this.isDecommissioned.get(); + public boolean isQueryAvailable() { + return isAlive() && !isQueryDisabled(); + } + + public boolean isScheduleAvailable() { + return isAlive() && !isDecommissioned(); + } + + public boolean isLoadAvailable() { + return isAlive() && !isLoadDisabled(); } public void setDisks(ImmutableMap disks) { @@ -702,10 +722,14 @@ public class Backend implements Writable { */ public class BackendStatus { // this will be output as json, so not using FeConstants.null_string; - public String lastSuccessReportTabletsTime = "N/A"; + public volatile String lastSuccessReportTabletsTime = "N/A"; @SerializedName("lastStreamLoadTime") // the last time when the stream load status was reported by backend - public long lastStreamLoadTime = -1; + public volatile long lastStreamLoadTime = -1; + @SerializedName("isQueryDisabled") + public volatile boolean isQueryDisabled = false; + @SerializedName("isLoadDisabled") + public volatile boolean isLoadDisabled = false; } public void setTag(Tag tag) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 67bcebb38e..726cb18a34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -71,6 +71,32 @@ public class SystemInfoService { public static final String DEFAULT_CLUSTER = "default_cluster"; + public static final String NO_BACKEND_LOAD_AVAILABLE_MSG = "No backend load available."; + + public static final String NO_SCAN_NODE_BACKEND_AVAILABLE_MSG = "There is no scanNode Backend available."; + + public static class BeAvailablePredicate { + private boolean scheduleAvailable; + + private boolean queryAvailable; + + private boolean loadAvailable; + + public BeAvailablePredicate(boolean scheduleAvailable, boolean queryAvailable, boolean loadAvailable) { + this.scheduleAvailable = scheduleAvailable; + this.queryAvailable = queryAvailable; + this.loadAvailable = loadAvailable; + } + + public boolean isMatch(Backend backend) { + if (scheduleAvailable && !backend.isScheduleAvailable() || queryAvailable && !backend.isQueryAvailable() || + loadAvailable && !backend.isLoadAvailable()) { + return false; + } + return true; + } + } + private volatile ImmutableMap idToBackendRef; private volatile ImmutableMap idToReportVersionRef; @@ -255,9 +281,25 @@ public class SystemInfoService { return idToBackendRef.get(backendId); } - public boolean checkBackendAvailable(long backendId) { + public boolean checkBackendLoadAvailable(long backendId) { Backend backend = idToBackendRef.get(backendId); - if (backend == null || !backend.isAvailable()) { + if (backend == null || !backend.isLoadAvailable()) { + return false; + } + return true; + } + + public boolean checkBackendQueryAvailable(long backendId) { + Backend backend = idToBackendRef.get(backendId); + if (backend == null || !backend.isQueryAvailable()) { + return false; + } + return true; + } + + public boolean checkBackendScheduleAvailable(long backendId) { + Backend backend = idToBackendRef.get(backendId); + if (backend == null || !backend.isScheduleAvailable()) { return false; } return true; @@ -744,9 +786,10 @@ public class SystemInfoService { Map> chosenBackendIds = Maps.newHashMap(); Map allocMap = replicaAlloc.getAllocMap(); short totalReplicaNum = 0; + BeAvailablePredicate beAvailablePredicate = new BeAvailablePredicate(true, false, false); for (Map.Entry entry : allocMap.entrySet()) { List beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(entry.getValue(), - true, true, clusterName, storageMedium, entry.getKey()); + beAvailablePredicate, true, clusterName, storageMedium, entry.getKey()); if (beIds == null) { throw new DdlException("Failed to find enough host with storage medium and tag(" + (storageMedium == null ? "NaN" : storageMedium) + "/" + entry.getKey() @@ -759,8 +802,9 @@ public class SystemInfoService { return chosenBackendIds; } - public List seqChooseBackendIdsByStorageMediumAndTag(int backendNum, boolean needAlive, boolean isCreate, - String clusterName, TStorageMedium storageMedium, Tag tag) { + public List seqChooseBackendIdsByStorageMediumAndTag(int backendNum, BeAvailablePredicate beAvailablePredicate, + boolean isCreate, String clusterName, + TStorageMedium storageMedium, Tag tag) { Stream beStream = getClusterBackends(clusterName).stream(); if (storageMedium == null) { beStream = beStream.filter(v -> !v.diskExceedLimit()); @@ -771,14 +815,15 @@ public class SystemInfoService { beStream = beStream.filter(v -> v.getTag().equals(tag)); } final List backends = beStream.collect(Collectors.toList()); - return seqChooseBackendIds(backendNum, needAlive, isCreate, clusterName, backends); + return seqChooseBackendIds(backendNum, beAvailablePredicate, isCreate, clusterName, backends); } // choose backends by round robin // return null if not enough backend // use synchronized to run serially - public synchronized List seqChooseBackendIds(int backendNum, boolean needAlive, boolean isCreate, - String clusterName, final List srcBackends) { + public synchronized List seqChooseBackendIds(int backendNum, BeAvailablePredicate beAvailablePredicate, + boolean isCreate, String clusterName, + final List srcBackends) { long lastBackendId; if (clusterName.equals(DEFAULT_CLUSTER)) { @@ -862,10 +907,8 @@ public class SystemInfoService { break; } - if (needAlive) { - if (!backend.isAlive() || backend.isDecommissioned()) { - continue; - } + if (!beAvailablePredicate.isMatch(backend)) { + continue; } long backendId = backend.getId(); @@ -1202,10 +1245,31 @@ public class SystemInfoService { backends.add(be); } - Tag tag = alterClause.getTag(); for (Backend be : backends) { - if (!be.getTag().equals(tag)) { - be.setTag(tag); + boolean shouldModify = false; + if (alterClause.getTag() != null) { + Tag tag = alterClause.getTag(); + if (!be.getTag().equals(tag)) { + be.setTag(tag); + shouldModify = true; + } + } + + if (alterClause.isQueryDisabled() != null) { + if (!alterClause.isQueryDisabled().equals(be.isQueryDisabled())) { + be.setQueryDisabled(alterClause.isQueryDisabled()); + shouldModify = true; + } + } + + if (alterClause.isLoadDisabled() != null) { + if (!alterClause.isLoadDisabled().equals(be.isLoadDisabled())) { + be.setLoadDisabled(alterClause.isLoadDisabled()); + shouldModify = true; + } + } + + if (shouldModify) { Catalog.getCurrentCatalog().getEditLog().logModifyBackend(be); LOG.info("finished to modify backend {} ", be); } @@ -1215,6 +1279,8 @@ public class SystemInfoService { public void replayModifyBackend(Backend backend) { Backend memBe = getBackend(backend.getId()); memBe.setTag(backend.getTag()); + memBe.setQueryDisabled(backend.isQueryDisabled()); + memBe.setLoadDisabled(backend.isLoadDisabled()); LOG.debug("replay modify backend: {}", backend); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportPendingTask.java index ad907783cb..bf146fa5e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportPendingTask.java @@ -106,7 +106,7 @@ public class ExportPendingTask extends MasterTask { return Status.CANCELLED; } long backendId = backend.getId(); - if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) { + if (!Catalog.getCurrentSystemInfo().checkBackendQueryAvailable(backendId)) { return Status.CANCELLED; } TSnapshotRequest snapshotRequest = new TSnapshotRequest(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index da8f479056..90fc55a891 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -161,8 +161,8 @@ public class RestoreJobTest { new Expectations() { { - systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, anyBoolean, anyBoolean, anyString, - (TStorageMedium) any, (Tag) any); + systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, + anyBoolean, anyString, (TStorageMedium) any, (Tag) any); minTimes = 0; result = new Delegate() { public synchronized List seqChooseBackendIds(int backendNum, boolean needAlive, diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTagTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java similarity index 87% rename from fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTagTest.java rename to fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java index 9db9b8e863..d755a6a65e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTagTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java @@ -41,7 +41,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; -public class ModifyBackendTagTest { +public class ModifyBackendTest { private static String runningDir = "fe/mocked/ModifyBackendTagTest/" + UUID.randomUUID().toString() + "/"; private static ConnectContext connectContext; @@ -64,7 +64,7 @@ public class ModifyBackendTagTest { } @Test - public void testModifyBackend() throws Exception { + public void testModifyBackendTag() throws Exception { SystemInfoService infoService = Catalog.getCurrentSystemInfo(); List backends = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER); Assert.assertEquals(1, backends.size()); @@ -183,5 +183,25 @@ public class ModifyBackendTagTest { tblProperties = tableProperty.getProperties(); Assert.assertTrue(tblProperties.containsKey("default.replication_allocation")); } + + @Test + public void testModifyBackendAvailableProperty() throws Exception { + SystemInfoService infoService = Catalog.getCurrentSystemInfo(); + List backends = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER); + String beHostPort = backends.get(0).getHost() + ":" + backends.get(0).getHeartbeatPort(); + // modify backend available property + String stmtStr = "alter system modify backend \"" + beHostPort + "\" set ('disable_query' = 'true', 'disable_load' = 'true')"; + AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext); + DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt); + Backend backend = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER).get(0); + Assert.assertFalse(backend.isQueryAvailable()); + Assert.assertFalse(backend.isLoadAvailable()); + + stmtStr = "alter system modify backend \"" + beHostPort + "\" set ('disable_query' = 'false', 'disable_load' = 'false')"; + stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext); + DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt); + Assert.assertTrue(backend.isQueryAvailable()); + Assert.assertTrue(backend.isLoadAvailable()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java index 8b525ea042..f1a17cce7f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java @@ -345,7 +345,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(2L); result = myBackend2; minTimes = 0; - myBackend2.isAvailable(); + myBackend2.isScheduleAvailable(); result = true; minTimes = 0; myBackend2.getTag(); @@ -356,7 +356,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(3L); result = myBackend3; minTimes = 0; - myBackend3.isAvailable(); + myBackend3.isScheduleAvailable(); result = false; minTimes = 0; myBackend3.isAlive(); @@ -373,7 +373,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(4L); result = myBackend4; minTimes = 0; - myBackend4.isAvailable(); + myBackend4.isScheduleAvailable(); result = false; minTimes = 0; myBackend4.isAlive(); @@ -390,7 +390,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(5L); result = myBackend5; minTimes = 0; - myBackend5.isAvailable(); + myBackend5.isScheduleAvailable(); result = false; minTimes = 0; myBackend5.isAlive(); @@ -438,7 +438,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(2L); result = myBackend2; minTimes = 0; - myBackend2.isAvailable(); + myBackend2.isScheduleAvailable(); result = true; minTimes = 0; myBackend2.getTag(); @@ -449,7 +449,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(3L); result = myBackend3; minTimes = 0; - myBackend3.isAvailable(); + myBackend3.isScheduleAvailable(); result = false; minTimes = 0; myBackend3.isAlive(); @@ -466,7 +466,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(4L); result = myBackend4; minTimes = 0; - myBackend4.isAvailable(); + myBackend4.isScheduleAvailable(); result = false; minTimes = 0; myBackend4.isAlive(); @@ -483,7 +483,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(5L); result = myBackend5; minTimes = 0; - myBackend5.isAvailable(); + myBackend5.isScheduleAvailable(); result = false; minTimes = 0; myBackend5.isAlive(); @@ -500,7 +500,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(5L); result = myBackend6; minTimes = 0; - myBackend6.isAvailable(); + myBackend6.isScheduleAvailable(); result = false; minTimes = 0; myBackend6.isAlive(); @@ -517,7 +517,7 @@ public class ColocateTableCheckerAndBalancerTest { infoService.getBackend(5L); result = myBackend7; minTimes = 0; - myBackend7.isAvailable(); + myBackend7.isScheduleAvailable(); result = false; minTimes = 0; myBackend7.isAlive(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 3f580e9d06..70815a3b4a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -151,7 +151,7 @@ public class CanalSyncDataTest { minTimes = 0; result = execPlanFragmentParams; - systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, anyBoolean, anyBoolean, anyString, + systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, anyBoolean, anyString, (TStorageMedium) any, (Tag) any); minTimes = 0; result = backendIds; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java index 34d56c2b8d..8d4d09ae13 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java @@ -62,13 +62,13 @@ public class MultiLoadMgrTest { }; new Expectations() { { - systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, anyBoolean, anyBoolean, anyString, - (TStorageMedium) any, (Tag) any); + systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, + anyBoolean, anyString, (TStorageMedium) any, (Tag) any); minTimes = 0; result = new Delegate() { - public synchronized List seqChooseBackendIdsByStorageMediumAndTag(int backendNum, boolean needAlive, - boolean isCreate, String clusterName, TStorageMedium medium, - Tag tag) { + public synchronized List seqChooseBackendIdsByStorageMediumAndTag( + int backendNum, SystemInfoService.BeAvailablePredicate availablePredicate, + boolean isCreate, String clusterName, TStorageMedium medium, Tag tag) { List beIds = Lists.newArrayList(); beIds.add(CatalogMocker.BACKEND1_ID); beIds.add(CatalogMocker.BACKEND2_ID); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java index 70578f41b4..c4b5a92aa9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java @@ -199,7 +199,7 @@ public class DemoMultiBackendsTest { ProcResult result = dir.fetchResult(); Assert.assertEquals(BackendsProcDir.TITLE_NAMES.size(), result.getColumnNames().size()); Assert.assertEquals("{\"location\" : \"default\"}", result.getRows().get(0).get(19)); - Assert.assertEquals("{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1}", + Assert.assertEquals("{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 1)); }