[feature] Support disable query and load for backend to make Doris more robust and set default value to 1 for max_query_retry_time (#7155)

ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true");
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true");
This commit is contained in:
caiconghui
2021-11-30 22:08:32 +08:00
committed by GitHub
parent 6c4aeab06f
commit fbab8afe24
36 changed files with 308 additions and 102 deletions

View File

@ -787,7 +787,7 @@ The tryLock timeout configuration of catalog lock. Normally it does not need to
### max_query_retry_time
Default:2
Default:1
IsMutable:true

View File

@ -57,23 +57,28 @@ Explain:
4) Node offline operations are used to secure offline nodes. This operation is asynchronous. If successful, the node will eventually be removed from the metadata. If it fails, the offline will not be completed.
5) The offline operation of the node can be cancelled manually. See CANCEL DECOMMISSION for details
6) Load error hub:
Currently, two types of Hub are supported: Mysql and Broker. You need to specify "type" = "mysql" or "type" = "broker" in PROPERTIES.
If you need to delete the current load error hub, you can set type to null.
1) When using the Mysql type, the error information generated when importing will be inserted into the specified MySQL library table, and then the error information can be viewed directly through the show load warnings statement.
Currently, two types of Hub are supported: Mysql and Broker. You need to specify "type" = "mysql" or "type" = "broker" in PROPERTIES.
If you need to delete the current load error hub, you can set type to null.
1) When using the Mysql type, the error information generated when importing will be inserted into the specified MySQL library table, and then the error information can be viewed directly through the show load warnings statement.
Hub of Mysql type needs to specify the following parameters:
host: mysql host
port: mysql port
user: mysql user
password: mysql password
database mysql database
table: mysql table
Hub of Mysql type needs to specify the following parameters:
host: mysql host
port: mysql port
user: mysql user
password: mysql password
database mysql database
table: mysql table
2) When the Broker type is used, the error information generated when importing will form a file and be written to the designated remote storage system through the broker. Make sure that the corresponding broker is deployed
Hub of Broker type needs to specify the following parameters:
Broker: Name of broker
Path: Remote Storage Path
Other properties: Other information necessary to access remote storage, such as authentication information.
2) When the Broker type is used, the error information generated when importing will form a file and be written to the designated remote storage system through the broker. Make sure that the corresponding broker is deployed
Hub of Broker type needs to specify the following parameters:
Broker: Name of broker
Path: Remote Storage Path
Other properties: Other information necessary to access remote storage, such as authentication information.
7) Modify BE node attributes currently supports the following attributes:
1. tag.location:Resource tag
2. disable_query: Query disabled attribute
3. disable_load: Load disabled attribute
## example
@ -121,5 +126,13 @@ ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a");
10. Modify the query disabled attribute of BE
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true");
11. Modify the load disabled attribute of BE
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true");
## keyword
AGE,SYSTEM,BACKGROUND,BROKER,FREE

View File

@ -775,7 +775,7 @@ fe 会在每隔 es_state_sync_interval_secs 调用 es api 获取 es 索引分片
### max_query_retry_time
默认值:2
默认值:1
是否可以动态配置:true

View File

@ -78,6 +78,8 @@ under the License.
7) 修改 BE 节点属性目前支持以下属性:
1. tag.location:资源标签
2. disable_query: 查询禁用属性
3. disable_load: 导入禁用属性
## example
@ -124,7 +126,15 @@ under the License.
9. 修改 BE 的资源标签
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a");
10. 修改 BE 的查询禁用属性
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true");
11. 修改 BE 的导入禁用属性
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true");
## keyword
ALTER,SYSTEM,BACKEND,BROKER,FREE

View File

@ -64,7 +64,7 @@ public class AddBackendClause extends BackendClause {
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
tag = PropertyAnalyzer.analyzeBackendTagProperties(properties);
tag = PropertyAnalyzer.analyzeBackendTagProperties(properties, Tag.DEFAULT_BACKEND_TAG);
}
@Override

View File

@ -23,12 +23,17 @@ import org.apache.doris.resource.Tag;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
public class ModifyBackendClause extends BackendClause {
protected Map<String, String> properties = Maps.newHashMap();
private Tag tag;
protected Map<String, String> analyzedProperties = Maps.newHashMap();
private Tag tag = null;
private Boolean isQueryDisabled = null;
private Boolean isLoadDisabled = null;
public ModifyBackendClause(List<String> hostPorts, Map<String, String> properties) {
super(hostPorts);
@ -38,13 +43,38 @@ public class ModifyBackendClause extends BackendClause {
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
tag = PropertyAnalyzer.analyzeBackendTagProperties(properties);
tag = PropertyAnalyzer.analyzeBackendTagProperties(properties, null);
isQueryDisabled = PropertyAnalyzer.analyzeBackendDisableProperties(properties,
PropertyAnalyzer.PROPERTIES_DISABLE_QUERY, null);
isLoadDisabled = PropertyAnalyzer.analyzeBackendDisableProperties(properties,
PropertyAnalyzer.PROPERTIES_DISABLE_LOAD, null);
if (tag != null) {
analyzedProperties.put(tag.type, tag.value);
}
if (isQueryDisabled != null) {
analyzedProperties.put(PropertyAnalyzer.PROPERTIES_DISABLE_QUERY, String.valueOf(isQueryDisabled));
}
if (isLoadDisabled != null) {
analyzedProperties.put(PropertyAnalyzer.PROPERTIES_DISABLE_LOAD, String.valueOf(isLoadDisabled));
}
if (!properties.isEmpty()) {
throw new AnalysisException("unknown properties setting for key ("
+ StringUtils.join(properties.keySet(), ",") + ")");
}
}
public Tag getTag() {
return tag;
}
public Boolean isQueryDisabled() {
return isQueryDisabled;
}
public Boolean isLoadDisabled() {
return isLoadDisabled;
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
@ -55,6 +85,15 @@ public class ModifyBackendClause extends BackendClause {
sb.append(", ");
}
}
sb.append(" SET (");
for (String key : analyzedProperties.keySet()) {
sb.append("\"").append(key).append("\"=\"");
sb.append(analyzedProperties.get(analyzedProperties.get(key))).append("\",");
}
if (!analyzedProperties.isEmpty()) {
sb.deleteCharAt(sb.length() - 1);
}
sb.append(")");
return sb.toString();
}
}

View File

@ -84,7 +84,7 @@ public class MetadataViewer {
ReplicaStatus status = ReplicaStatus.OK;
Backend be = infoService.getBackend(replica.getBackendId());
if (be == null || !be.isAvailable() || replica.isBad()) {
if (be == null || !be.isAlive() || replica.isBad()) {
status = ReplicaStatus.DEAD;
} else if (replica.getVersion() < visibleVersion
|| replica.getLastFailedVersion() > 0) {

View File

@ -205,7 +205,7 @@ public class Tablet extends MetaObject implements Writable {
}
ReplicaState state = replica.getState();
if (infoService.checkBackendAlive(replica.getBackendId()) && state.canLoad()) {
if (infoService.checkBackendLoadAvailable(replica.getBackendId()) && state.canLoad()) {
map.put(replica.getBackendId(), replica.getPathHash());
}
}
@ -431,7 +431,7 @@ public class Tablet extends MetaObject implements Writable {
}
aliveAndVersionComplete++;
if (!backend.isAvailable()) {
if (!backend.isScheduleAvailable()) {
// this replica is alive, version complete, but backend is not available
continue;
}
@ -492,7 +492,7 @@ public class Tablet extends MetaObject implements Writable {
List<Long> replicaBeIds = replicas.stream()
.map(Replica::getBackendId).collect(Collectors.toList());
List<Long> availableBeIds = aliveBeIdsInCluster.stream()
.filter(systemInfoService::checkBackendAvailable)
.filter(systemInfoService::checkBackendScheduleAvailable)
.collect(Collectors.toList());
if (replicaBeIds.containsAll(availableBeIds)
&& availableBeIds.size() >= replicationNum

View File

@ -175,7 +175,7 @@ public class BackendLoadStatistic {
throw new LoadBalanceException("backend " + beId + " does not exist");
}
isAvailable = be.isAvailable();
isAvailable = be.isScheduleAvailable() && be.isLoadAvailable() && be.isQueryAvailable();
ImmutableMap<String, DiskInfo> disks = be.getDisks();
for (DiskInfo diskInfo : disks.values()) {

View File

@ -575,7 +575,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
return false;
} else if (!be.getTag().equals(tag) || excludedBeIds.contains(be.getId())) {
return false;
} else if (!be.isAvailable()) {
} else if (!be.isScheduleAvailable()) {
// 1. BE is dead for a long time
// 2. BE is under decommission
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)

View File

@ -279,8 +279,8 @@ public class PartitionRebalancer extends Rebalancer {
// So we can't do skew check.
// Just do some basic checks, e.g. server available.
private void checkMoveValidation(TabletMove move) throws IllegalStateException {
boolean fromAvailable = infoService.checkBackendAvailable(move.fromBe);
boolean toAvailable = infoService.checkBackendAvailable(move.toBe);
boolean fromAvailable = infoService.checkBackendScheduleAvailable(move.fromBe);
boolean toAvailable = infoService.checkBackendScheduleAvailable(move.toBe);
Preconditions.checkState(fromAvailable && toAvailable, move + "'s bes are not all available: from " + fromAvailable + ", to " + toAvailable);
// To be improved
}

View File

@ -573,7 +573,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
Backend be = infoService.getBackend(replica.getBackendId());
if (be == null || !be.isAvailable()) {
if (be == null || !be.isScheduleAvailable()) {
continue;
}

View File

@ -800,7 +800,7 @@ public class TabletScheduler extends MasterDaemon {
// this case should be handled in deleteBackendDropped()
continue;
}
if (!be.isAvailable()) {
if (!be.isScheduleAvailable()) {
deleteReplicaInternal(tabletCtx, replica, "backend unavailable", force);
return true;
}

View File

@ -1043,7 +1043,7 @@ public class Config extends ConfigBase {
* You may reduce this number to avoid Avalanche disaster.
*/
@ConfField(mutable = true)
public static int max_query_retry_time = 2;
public static int max_query_retry_time = 1;
/**
* The tryLock timeout configuration of catalog lock.

View File

@ -98,6 +98,10 @@ public class PropertyAnalyzer {
public static final String TAG_LOCATION = "tag.location";
public static final String PROPERTIES_DISABLE_QUERY = "disable_query";
public static final String PROPERTIES_DISABLE_LOAD = "disable_load";
public static DataProperty analyzeDataProperty(Map<String, String> properties, DataProperty oldDataProperty)
throws AnalysisException {
if (properties == null) {
@ -455,12 +459,20 @@ public class PropertyAnalyzer {
return ScalarType.createType(type);
}
public static Tag analyzeBackendTagProperties(Map<String, String> properties) throws AnalysisException {
public static Boolean analyzeBackendDisableProperties(Map<String, String> properties, String key, Boolean defaultValue) throws AnalysisException {
if (properties.containsKey(key)) {
String value = properties.remove(key);
return Boolean.valueOf(value);
}
return defaultValue;
}
public static Tag analyzeBackendTagProperties(Map<String, String> properties, Tag defaultValue) throws AnalysisException {
if (properties.containsKey(TAG_LOCATION)) {
String tagVal = properties.remove(TAG_LOCATION);
return Tag.create(Tag.TYPE_LOCATION, tagVal);
}
return Tag.DEFAULT_BACKEND_TAG;
return defaultValue;
}
// There are 2 kinds of replication property:

View File

@ -28,6 +28,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Strings;
@ -116,15 +117,17 @@ public class LoadAction extends RestBaseAction {
redirectAddr = execEnv.getMultiLoadMgr().redirectAddr(fullDbName, label);
} else {
// Choose a backend sequentially.
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, false, true);
List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
1, true, false, clusterName, null, null);
1, beAvailablePredicate, false, clusterName, null, null);
if (backendIds == null) {
throw new DdlException("No backend alive.");
throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
throw new DdlException("No backend alive.");
throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());

View File

@ -25,6 +25,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Strings;
@ -128,15 +129,17 @@ public class LoadAction extends RestBaseController {
}
} else {
// Choose a backend sequentially.
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, false, true);
List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
1, true, false, clusterName, null, null);
1, beAvailablePredicate, false, clusterName, null, null);
if (backendIds == null) {
return new RestBaseResult("No backend alive.");
return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
return new RestBaseResult("No backend alive.");
return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());

View File

@ -137,8 +137,11 @@ public class LoadSubmitter {
}
private Backend selectOneBackend() throws DdlException {
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, false, true);
List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
1, true, false, SystemInfoService.DEFAULT_CLUSTER, null, null);
1, beAvailablePredicate, false,
SystemInfoService.DEFAULT_CLUSTER, null, null);
if (backendIds == null) {
throw new DdlException("No alive backend");
}

View File

@ -144,8 +144,10 @@ public class BDBJEJournal implements Journal {
// Parameter null means auto commit
if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) {
writeSucceed = true;
LOG.debug("master write journal {} finished. db name {}, current time {}",
id, currentJournalDB.getDatabaseName(), System.currentTimeMillis());
if (LOG.isDebugEnabled()) {
LOG.debug("master write journal {} finished. db name {}, current time {}",
id, currentJournalDB.getDatabaseName(), System.currentTimeMillis());
}
break;
}
} catch (DatabaseException e) {

View File

@ -655,7 +655,7 @@ public class ExportJob implements Writable {
continue;
}
long backendId = backend.getId();
if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
if (!Catalog.getCurrentSystemInfo().checkBackendQueryAvailable(backendId)) {
continue;
}

View File

@ -516,7 +516,7 @@ public class Load {
TNetworkAddress beAddress = dataDescription.getBeAddr();
Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(beAddress.getHostname(),
beAddress.getPort());
if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backend.getId())) {
if (!Catalog.getCurrentSystemInfo().checkBackendLoadAvailable(backend.getId())) {
throw new DdlException("Etl backend is null or not available");
}

View File

@ -390,7 +390,8 @@ public class BrokerScanNode extends LoadScanNode {
private void assignBackends() throws UserException {
backends = Lists.newArrayList();
for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) {
if (be.isAvailable()) {
// broker scan node is used for query or load
if (be.isQueryAvailable() && be.isLoadAvailable()) {
backends.add(be);
}
}

View File

@ -61,6 +61,7 @@ import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TDescriptorTable;
@ -922,7 +923,7 @@ public class Coordinator {
Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(
host.getHostname(), host.getPort());
if (backend == null) {
throw new UserException("there is no scanNode Backend");
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
}
TNetworkAddress dest = new TNetworkAddress(backend.getHost(), backend.getBeRpcPort());
return dest;
@ -932,7 +933,7 @@ public class Coordinator {
Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(
host.getHostname(), host.getPort());
if (backend == null) {
throw new UserException("there is no scanNode Backend");
throw new UserException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
if (backend.getBrpcPort() < 0) {
return null;
@ -1040,8 +1041,8 @@ public class Coordinator {
execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef);
}
if (execHostport == null) {
LOG.warn("DataPartition UNPARTITIONED, no scanNode Backend");
throw new UserException("there is no scanNode Backend");
LOG.warn("DataPartition UNPARTITIONED, no scanNode Backend available");
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
}
if (backendIdRef.getRef() != null) {
// backendIdRef can be null is we call getHostByCurrentBackend() before
@ -1169,7 +1170,7 @@ public class Coordinator {
execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef);
}
if (execHostport == null) {
throw new UserException("there is no scanNode Backend");
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
}
if (backendIdRef.getRef() != null) {
// backendIdRef can be null is we call getHostByCurrentBackend() before
@ -1718,7 +1719,7 @@ public class Coordinator {
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef);
if (execHostPort == null) {
throw new UserException("there is no scanNode Backend");
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
}
//the backend with buckendId is not alive, chose another new backend
if (backendIdRef.getRef() != buckendId) {

View File

@ -26,6 +26,7 @@ import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@ -62,10 +63,13 @@ public class InsertStreamTxnExecutor {
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask);
TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, true, true);
List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
1, true, false, txnEntry.getDb().getClusterName(), null, null);
1, beAvailablePredicate, false,
txnEntry.getDb().getClusterName(), null, null);
if (beIds == null || beIds.isEmpty()) {
throw new UserException("there is no scanNode Backend.");
throw new UserException("there is no backend load available or scanNode backend available.");
}
tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());

View File

@ -41,6 +41,7 @@ import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TNetworkAddress;
@ -91,10 +92,12 @@ public class MultiLoadMgr {
throw new LabelAlreadyUsedException(label);
}
MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, false, true);
List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(1,
true, false, ConnectContext.get().getClusterName(), null, null);
beAvailablePredicate, false, ConnectContext.get().getClusterName(), null, null);
if (backendIds == null) {
throw new DdlException("No backend alive.");
throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
multiLoadDesc.setBackendId(backendIds.get(0));
infoMap.put(multiLabel, multiLoadDesc);

View File

@ -1161,8 +1161,8 @@ public class ShowExecutor {
if (be == null) {
throw new AnalysisException(host + ":" + port + " is not a valid backend");
}
if (!be.isAvailable()) {
throw new AnalysisException("Backend " + host + ":" + port + " is not available");
if (!be.isAlive()) {
throw new AnalysisException("Backend " + host + ":" + port + " is not alive");
}
if (!url.getPath().equals("/api/_load_error_log")) {

View File

@ -87,7 +87,7 @@ public class SimpleScheduler {
}
// no backend returned
throw new UserException("there is no scanNode Backend. " +
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG +
getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()),
backends, locations.size()));
}
@ -119,7 +119,7 @@ public class SimpleScheduler {
}
// no backend returned
throw new UserException("there is no scanNode Backend. " +
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG +
getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()),
backends, locations.size()));
}
@ -161,7 +161,7 @@ public class SimpleScheduler {
}
}
// no backend returned
throw new UserException("there is no scanNode Backend. "
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG
+ getBackendErrorMsg(Lists.newArrayList(backends.keySet()), backends, 3));
}
@ -178,6 +178,8 @@ public class SimpleScheduler {
} else if (blacklistBackends.containsKey(beId)) {
Pair<Integer, String> pair = blacklistBackends.get(beId);
res.add(beId + ": in black list(" + (pair == null ? "unknown" : pair.second) + ")");
} else if (!be.isQueryAvailable()) {
res.add(beId + ": disable query");
} else {
res.add(beId + ": unknown");
}
@ -195,7 +197,7 @@ public class SimpleScheduler {
}
public static boolean isAvailable(Backend backend) {
return (backend != null && backend.isAlive() && !blacklistBackends.containsKey(backend.getId()));
return (backend != null && backend.isQueryAvailable() && !blacklistBackends.containsKey(backend.getId()));
}
private static class UpdateBlacklistThread implements Runnable {

View File

@ -200,6 +200,18 @@ public class Backend implements Writable {
this.backendStatus.lastStreamLoadTime = lastStreamLoadTime;
}
public boolean isQueryDisabled() { return backendStatus.isQueryDisabled; }
public void setQueryDisabled(boolean isQueryDisabled) {
this.backendStatus.isQueryDisabled = isQueryDisabled;
}
public boolean isLoadDisabled() {return backendStatus.isLoadDisabled; }
public void setLoadDisabled(boolean isLoadDisabled) {
this.backendStatus.isLoadDisabled = isLoadDisabled;
}
// for test only
public void updateOnce(int bePort, int httpPort, int beRpcPort) {
if (this.bePort != bePort) {
@ -285,8 +297,16 @@ public class Backend implements Writable {
return this.isDecommissioned.get();
}
public boolean isAvailable() {
return this.isAlive.get() && !this.isDecommissioned.get();
public boolean isQueryAvailable() {
return isAlive() && !isQueryDisabled();
}
public boolean isScheduleAvailable() {
return isAlive() && !isDecommissioned();
}
public boolean isLoadAvailable() {
return isAlive() && !isLoadDisabled();
}
public void setDisks(ImmutableMap<String, DiskInfo> disks) {
@ -702,10 +722,14 @@ public class Backend implements Writable {
*/
public class BackendStatus {
// this will be output as json, so not using FeConstants.null_string;
public String lastSuccessReportTabletsTime = "N/A";
public volatile String lastSuccessReportTabletsTime = "N/A";
@SerializedName("lastStreamLoadTime")
// the last time when the stream load status was reported by backend
public long lastStreamLoadTime = -1;
public volatile long lastStreamLoadTime = -1;
@SerializedName("isQueryDisabled")
public volatile boolean isQueryDisabled = false;
@SerializedName("isLoadDisabled")
public volatile boolean isLoadDisabled = false;
}
public void setTag(Tag tag) {

View File

@ -71,6 +71,32 @@ public class SystemInfoService {
public static final String DEFAULT_CLUSTER = "default_cluster";
public static final String NO_BACKEND_LOAD_AVAILABLE_MSG = "No backend load available.";
public static final String NO_SCAN_NODE_BACKEND_AVAILABLE_MSG = "There is no scanNode Backend available.";
public static class BeAvailablePredicate {
private boolean scheduleAvailable;
private boolean queryAvailable;
private boolean loadAvailable;
public BeAvailablePredicate(boolean scheduleAvailable, boolean queryAvailable, boolean loadAvailable) {
this.scheduleAvailable = scheduleAvailable;
this.queryAvailable = queryAvailable;
this.loadAvailable = loadAvailable;
}
public boolean isMatch(Backend backend) {
if (scheduleAvailable && !backend.isScheduleAvailable() || queryAvailable && !backend.isQueryAvailable() ||
loadAvailable && !backend.isLoadAvailable()) {
return false;
}
return true;
}
}
private volatile ImmutableMap<Long, Backend> idToBackendRef;
private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef;
@ -255,9 +281,25 @@ public class SystemInfoService {
return idToBackendRef.get(backendId);
}
public boolean checkBackendAvailable(long backendId) {
public boolean checkBackendLoadAvailable(long backendId) {
Backend backend = idToBackendRef.get(backendId);
if (backend == null || !backend.isAvailable()) {
if (backend == null || !backend.isLoadAvailable()) {
return false;
}
return true;
}
public boolean checkBackendQueryAvailable(long backendId) {
Backend backend = idToBackendRef.get(backendId);
if (backend == null || !backend.isQueryAvailable()) {
return false;
}
return true;
}
public boolean checkBackendScheduleAvailable(long backendId) {
Backend backend = idToBackendRef.get(backendId);
if (backend == null || !backend.isScheduleAvailable()) {
return false;
}
return true;
@ -744,9 +786,10 @@ public class SystemInfoService {
Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
short totalReplicaNum = 0;
BeAvailablePredicate beAvailablePredicate = new BeAvailablePredicate(true, false, false);
for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(entry.getValue(),
true, true, clusterName, storageMedium, entry.getKey());
beAvailablePredicate, true, clusterName, storageMedium, entry.getKey());
if (beIds == null) {
throw new DdlException("Failed to find enough host with storage medium and tag("
+ (storageMedium == null ? "NaN" : storageMedium) + "/" + entry.getKey()
@ -759,8 +802,9 @@ public class SystemInfoService {
return chosenBackendIds;
}
public List<Long> seqChooseBackendIdsByStorageMediumAndTag(int backendNum, boolean needAlive, boolean isCreate,
String clusterName, TStorageMedium storageMedium, Tag tag) {
public List<Long> seqChooseBackendIdsByStorageMediumAndTag(int backendNum, BeAvailablePredicate beAvailablePredicate,
boolean isCreate, String clusterName,
TStorageMedium storageMedium, Tag tag) {
Stream<Backend> beStream = getClusterBackends(clusterName).stream();
if (storageMedium == null) {
beStream = beStream.filter(v -> !v.diskExceedLimit());
@ -771,14 +815,15 @@ public class SystemInfoService {
beStream = beStream.filter(v -> v.getTag().equals(tag));
}
final List<Backend> backends = beStream.collect(Collectors.toList());
return seqChooseBackendIds(backendNum, needAlive, isCreate, clusterName, backends);
return seqChooseBackendIds(backendNum, beAvailablePredicate, isCreate, clusterName, backends);
}
// choose backends by round robin
// return null if not enough backend
// use synchronized to run serially
public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive, boolean isCreate,
String clusterName, final List<Backend> srcBackends) {
public synchronized List<Long> seqChooseBackendIds(int backendNum, BeAvailablePredicate beAvailablePredicate,
boolean isCreate, String clusterName,
final List<Backend> srcBackends) {
long lastBackendId;
if (clusterName.equals(DEFAULT_CLUSTER)) {
@ -862,10 +907,8 @@ public class SystemInfoService {
break;
}
if (needAlive) {
if (!backend.isAlive() || backend.isDecommissioned()) {
continue;
}
if (!beAvailablePredicate.isMatch(backend)) {
continue;
}
long backendId = backend.getId();
@ -1202,10 +1245,31 @@ public class SystemInfoService {
backends.add(be);
}
Tag tag = alterClause.getTag();
for (Backend be : backends) {
if (!be.getTag().equals(tag)) {
be.setTag(tag);
boolean shouldModify = false;
if (alterClause.getTag() != null) {
Tag tag = alterClause.getTag();
if (!be.getTag().equals(tag)) {
be.setTag(tag);
shouldModify = true;
}
}
if (alterClause.isQueryDisabled() != null) {
if (!alterClause.isQueryDisabled().equals(be.isQueryDisabled())) {
be.setQueryDisabled(alterClause.isQueryDisabled());
shouldModify = true;
}
}
if (alterClause.isLoadDisabled() != null) {
if (!alterClause.isLoadDisabled().equals(be.isLoadDisabled())) {
be.setLoadDisabled(alterClause.isLoadDisabled());
shouldModify = true;
}
}
if (shouldModify) {
Catalog.getCurrentCatalog().getEditLog().logModifyBackend(be);
LOG.info("finished to modify backend {} ", be);
}
@ -1215,6 +1279,8 @@ public class SystemInfoService {
public void replayModifyBackend(Backend backend) {
Backend memBe = getBackend(backend.getId());
memBe.setTag(backend.getTag());
memBe.setQueryDisabled(backend.isQueryDisabled());
memBe.setLoadDisabled(backend.isLoadDisabled());
LOG.debug("replay modify backend: {}", backend);
}

View File

@ -106,7 +106,7 @@ public class ExportPendingTask extends MasterTask {
return Status.CANCELLED;
}
long backendId = backend.getId();
if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
if (!Catalog.getCurrentSystemInfo().checkBackendQueryAvailable(backendId)) {
return Status.CANCELLED;
}
TSnapshotRequest snapshotRequest = new TSnapshotRequest();

View File

@ -161,8 +161,8 @@ public class RestoreJobTest {
new Expectations() {
{
systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, anyBoolean, anyBoolean, anyString,
(TStorageMedium) any, (Tag) any);
systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
minTimes = 0;
result = new Delegate() {
public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive,

View File

@ -41,7 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
public class ModifyBackendTagTest {
public class ModifyBackendTest {
private static String runningDir = "fe/mocked/ModifyBackendTagTest/" + UUID.randomUUID().toString() + "/";
private static ConnectContext connectContext;
@ -64,7 +64,7 @@ public class ModifyBackendTagTest {
}
@Test
public void testModifyBackend() throws Exception {
public void testModifyBackendTag() throws Exception {
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
List<Backend> backends = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
Assert.assertEquals(1, backends.size());
@ -183,5 +183,25 @@ public class ModifyBackendTagTest {
tblProperties = tableProperty.getProperties();
Assert.assertTrue(tblProperties.containsKey("default.replication_allocation"));
}
@Test
public void testModifyBackendAvailableProperty() throws Exception {
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
List<Backend> backends = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
String beHostPort = backends.get(0).getHost() + ":" + backends.get(0).getHeartbeatPort();
// modify backend available property
String stmtStr = "alter system modify backend \"" + beHostPort + "\" set ('disable_query' = 'true', 'disable_load' = 'true')";
AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext);
DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt);
Backend backend = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER).get(0);
Assert.assertFalse(backend.isQueryAvailable());
Assert.assertFalse(backend.isLoadAvailable());
stmtStr = "alter system modify backend \"" + beHostPort + "\" set ('disable_query' = 'false', 'disable_load' = 'false')";
stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext);
DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt);
Assert.assertTrue(backend.isQueryAvailable());
Assert.assertTrue(backend.isLoadAvailable());
}
}

View File

@ -345,7 +345,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(2L);
result = myBackend2;
minTimes = 0;
myBackend2.isAvailable();
myBackend2.isScheduleAvailable();
result = true;
minTimes = 0;
myBackend2.getTag();
@ -356,7 +356,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(3L);
result = myBackend3;
minTimes = 0;
myBackend3.isAvailable();
myBackend3.isScheduleAvailable();
result = false;
minTimes = 0;
myBackend3.isAlive();
@ -373,7 +373,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(4L);
result = myBackend4;
minTimes = 0;
myBackend4.isAvailable();
myBackend4.isScheduleAvailable();
result = false;
minTimes = 0;
myBackend4.isAlive();
@ -390,7 +390,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(5L);
result = myBackend5;
minTimes = 0;
myBackend5.isAvailable();
myBackend5.isScheduleAvailable();
result = false;
minTimes = 0;
myBackend5.isAlive();
@ -438,7 +438,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(2L);
result = myBackend2;
minTimes = 0;
myBackend2.isAvailable();
myBackend2.isScheduleAvailable();
result = true;
minTimes = 0;
myBackend2.getTag();
@ -449,7 +449,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(3L);
result = myBackend3;
minTimes = 0;
myBackend3.isAvailable();
myBackend3.isScheduleAvailable();
result = false;
minTimes = 0;
myBackend3.isAlive();
@ -466,7 +466,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(4L);
result = myBackend4;
minTimes = 0;
myBackend4.isAvailable();
myBackend4.isScheduleAvailable();
result = false;
minTimes = 0;
myBackend4.isAlive();
@ -483,7 +483,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(5L);
result = myBackend5;
minTimes = 0;
myBackend5.isAvailable();
myBackend5.isScheduleAvailable();
result = false;
minTimes = 0;
myBackend5.isAlive();
@ -500,7 +500,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(5L);
result = myBackend6;
minTimes = 0;
myBackend6.isAvailable();
myBackend6.isScheduleAvailable();
result = false;
minTimes = 0;
myBackend6.isAlive();
@ -517,7 +517,7 @@ public class ColocateTableCheckerAndBalancerTest {
infoService.getBackend(5L);
result = myBackend7;
minTimes = 0;
myBackend7.isAvailable();
myBackend7.isScheduleAvailable();
result = false;
minTimes = 0;
myBackend7.isAlive();

View File

@ -151,7 +151,7 @@ public class CanalSyncDataTest {
minTimes = 0;
result = execPlanFragmentParams;
systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, anyBoolean, anyBoolean, anyString,
systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, anyBoolean, anyString,
(TStorageMedium) any, (Tag) any);
minTimes = 0;
result = backendIds;

View File

@ -62,13 +62,13 @@ public class MultiLoadMgrTest {
};
new Expectations() {
{
systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, anyBoolean, anyBoolean, anyString,
(TStorageMedium) any, (Tag) any);
systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
minTimes = 0;
result = new Delegate() {
public synchronized List<Long> seqChooseBackendIdsByStorageMediumAndTag(int backendNum, boolean needAlive,
boolean isCreate, String clusterName, TStorageMedium medium,
Tag tag) {
public synchronized List<Long> seqChooseBackendIdsByStorageMediumAndTag(
int backendNum, SystemInfoService.BeAvailablePredicate availablePredicate,
boolean isCreate, String clusterName, TStorageMedium medium, Tag tag) {
List<Long> beIds = Lists.newArrayList();
beIds.add(CatalogMocker.BACKEND1_ID);
beIds.add(CatalogMocker.BACKEND2_ID);

View File

@ -199,7 +199,7 @@ public class DemoMultiBackendsTest {
ProcResult result = dir.fetchResult();
Assert.assertEquals(BackendsProcDir.TITLE_NAMES.size(), result.getColumnNames().size());
Assert.assertEquals("{\"location\" : \"default\"}", result.getRows().get(0).get(19));
Assert.assertEquals("{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1}",
Assert.assertEquals("{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 1));
}