[Improvement](set) enable admin_set_frontend_config can apply to all fe (#37022)
bp #34685
This commit is contained in:
@ -25,6 +25,7 @@ import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.OriginStatement;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
@ -38,22 +39,25 @@ public class AdminSetConfigStmt extends DdlStmt {
|
||||
BACKEND
|
||||
}
|
||||
|
||||
private boolean applyToAll;
|
||||
private ConfigType type;
|
||||
private Map<String, String> configs;
|
||||
|
||||
private RedirectStatus redirectStatus = RedirectStatus.NO_FORWARD;
|
||||
|
||||
public AdminSetConfigStmt(ConfigType type, Map<String, String> configs) {
|
||||
public AdminSetConfigStmt(ConfigType type, Map<String, String> configs, boolean applyToAll) {
|
||||
this.type = type;
|
||||
this.configs = configs;
|
||||
if (this.configs == null) {
|
||||
this.configs = Maps.newHashMap();
|
||||
}
|
||||
this.applyToAll = applyToAll;
|
||||
|
||||
// we have to analyze configs here to determine whether to forward it to master
|
||||
for (String key : this.configs.keySet()) {
|
||||
if (ConfigBase.checkIsMasterOnly(key)) {
|
||||
redirectStatus = RedirectStatus.FORWARD_NO_SYNC;
|
||||
this.applyToAll = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -66,6 +70,10 @@ public class AdminSetConfigStmt extends DdlStmt {
|
||||
return configs;
|
||||
}
|
||||
|
||||
public boolean isApplyToAll() {
|
||||
return applyToAll;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
|
||||
super.analyze(analyzer);
|
||||
@ -87,4 +95,13 @@ public class AdminSetConfigStmt extends DdlStmt {
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
return redirectStatus;
|
||||
}
|
||||
|
||||
public OriginStatement getLocalSetStmt() {
|
||||
OriginStatement stmt = this.getOrigStmt();
|
||||
Object[] keyArr = configs.keySet().toArray();
|
||||
String sql = String.format("ADMIN SET FRONTEND CONFIG (\"%s\" = \"%s\");",
|
||||
keyArr[0].toString(), configs.get(keyArr[0].toString()));
|
||||
|
||||
return new OriginStatement(sql, stmt.idx);
|
||||
}
|
||||
}
|
||||
|
||||
@ -233,6 +233,7 @@ import org.apache.doris.plugin.PluginMgr;
|
||||
import org.apache.doris.policy.PolicyMgr;
|
||||
import org.apache.doris.qe.AuditEventProcessor;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.FEOpExecutor;
|
||||
import org.apache.doris.qe.GlobalVariable;
|
||||
import org.apache.doris.qe.JournalObservable;
|
||||
import org.apache.doris.qe.QueryCancelWorker;
|
||||
@ -5386,7 +5387,7 @@ public class Env {
|
||||
globalFunctionMgr.replayDropFunction(functionSearchDesc);
|
||||
}
|
||||
|
||||
public void setConfig(AdminSetConfigStmt stmt) throws DdlException {
|
||||
public void setConfig(AdminSetConfigStmt stmt) throws Exception {
|
||||
Map<String, String> configs = stmt.getConfigs();
|
||||
Preconditions.checkState(configs.size() == 1);
|
||||
|
||||
@ -5397,6 +5398,22 @@ public class Env {
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (stmt.isApplyToAll()) {
|
||||
for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) {
|
||||
if (!fe.isAlive() || fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TNetworkAddress feAddr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
|
||||
FEOpExecutor executor = new FEOpExecutor(feAddr, stmt.getLocalSetStmt(), ConnectContext.get(), false);
|
||||
executor.execute();
|
||||
if (executor.getStatusCode() != TStatusCode.OK.getValue()) {
|
||||
throw new DdlException(String.format("failed to apply to fe %s:%s, error message: %s",
|
||||
fe.getHost(), fe.getRpcPort(), executor.getErrMsg()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void replayBackendReplicasInfo(BackendReplicasInfo backendReplicasInfo) {
|
||||
|
||||
216
fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
Normal file
216
fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
Normal file
@ -0,0 +1,216 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.qe;
|
||||
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.ClientPool;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
import org.apache.doris.thrift.TExpr;
|
||||
import org.apache.doris.thrift.TExprNode;
|
||||
import org.apache.doris.thrift.TMasterOpRequest;
|
||||
import org.apache.doris.thrift.TMasterOpResult;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class FEOpExecutor {
|
||||
private static final Logger LOG = LogManager.getLogger(FEOpExecutor.class);
|
||||
|
||||
private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
|
||||
|
||||
private final OriginStatement originStmt;
|
||||
private final ConnectContext ctx;
|
||||
private TMasterOpResult result;
|
||||
private TNetworkAddress feAddr;
|
||||
|
||||
// the total time of thrift connectTime, readTime and writeTime
|
||||
private int thriftTimeoutMs;
|
||||
|
||||
private boolean shouldNotRetry;
|
||||
|
||||
public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, ConnectContext ctx, boolean isQuery) {
|
||||
this.feAddr = feAddress;
|
||||
this.originStmt = originStmt;
|
||||
this.ctx = ctx;
|
||||
this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * RPC_TIMEOUT_COEFFICIENT);
|
||||
// if isQuery=false, we shouldn't retry twice when catch exception because of Idempotency
|
||||
this.shouldNotRetry = !isQuery;
|
||||
}
|
||||
|
||||
public void execute() throws Exception {
|
||||
result = forward(feAddr, buildStmtForwardParams());
|
||||
}
|
||||
|
||||
public void cancel() throws Exception {
|
||||
TUniqueId queryId = ctx.queryId();
|
||||
if (queryId == null) {
|
||||
return;
|
||||
}
|
||||
Preconditions.checkNotNull(feAddr, "query with id %s is not forwarded to fe", queryId);
|
||||
TMasterOpRequest request = new TMasterOpRequest();
|
||||
request.setCancelQeury(true);
|
||||
request.setQueryId(queryId);
|
||||
request.setDb(ctx.getDatabase());
|
||||
request.setUser(ctx.getQualifiedUser());
|
||||
request.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
|
||||
request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
|
||||
// just make the protocol happy
|
||||
request.setSql("");
|
||||
result = forward(feAddr, request);
|
||||
}
|
||||
|
||||
// Send request to specific fe
|
||||
private TMasterOpResult forward(TNetworkAddress thriftAddress, TMasterOpRequest params) throws Exception {
|
||||
ctx.getEnv().checkReadyOrThrow();
|
||||
|
||||
FrontendService.Client client;
|
||||
try {
|
||||
client = ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs);
|
||||
} catch (Exception e) {
|
||||
// may throw NullPointerException. add err msg
|
||||
throw new Exception("Failed to get fe client: " + thriftAddress.toString(), e);
|
||||
}
|
||||
final StringBuilder forwardMsg = new StringBuilder("forward to FE " + thriftAddress.toString());
|
||||
forwardMsg.append(", statement id: ").append(ctx.getStmtId());
|
||||
LOG.info(forwardMsg.toString());
|
||||
|
||||
boolean isReturnToPool = false;
|
||||
try {
|
||||
final TMasterOpResult result = client.forward(params);
|
||||
isReturnToPool = true;
|
||||
return result;
|
||||
} catch (TTransportException e) {
|
||||
// wrap the raw exception.
|
||||
forwardMsg.append(" : failed");
|
||||
Exception exception = new ForwardToFEException(forwardMsg.toString(), e);
|
||||
|
||||
boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs);
|
||||
if (!ok) {
|
||||
throw exception;
|
||||
}
|
||||
if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) {
|
||||
throw exception;
|
||||
} else {
|
||||
LOG.warn(forwardMsg.append(" twice").toString(), e);
|
||||
try {
|
||||
TMasterOpResult result = client.forward(params);
|
||||
isReturnToPool = true;
|
||||
return result;
|
||||
} catch (TException ex) {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (isReturnToPool) {
|
||||
ClientPool.frontendPool.returnObject(thriftAddress, client);
|
||||
} else {
|
||||
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private TMasterOpRequest buildStmtForwardParams() {
|
||||
TMasterOpRequest params = new TMasterOpRequest();
|
||||
// node ident
|
||||
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
|
||||
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
|
||||
params.setSql(originStmt.originStmt);
|
||||
params.setStmtIdx(originStmt.idx);
|
||||
params.setUser(ctx.getQualifiedUser());
|
||||
params.setDefaultCatalog(ctx.getDefaultCatalog());
|
||||
params.setDefaultDatabase(ctx.getDatabase());
|
||||
params.setDb(ctx.getDatabase());
|
||||
params.setUserIp(ctx.getRemoteIP());
|
||||
params.setStmtId(ctx.getStmtId());
|
||||
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
|
||||
params.setCluster(String.valueOf(ctx.getEnv().getClusterId()));
|
||||
|
||||
// query options
|
||||
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
|
||||
// session variables
|
||||
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
|
||||
params.setUserVariables(getForwardUserVariables(ctx.getUserVars()));
|
||||
if (null != ctx.queryId()) {
|
||||
params.setQueryId(ctx.queryId());
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
public int getStatusCode() {
|
||||
if (result == null || !result.isSetStatusCode()) {
|
||||
return ErrorCode.ERR_UNKNOWN_ERROR.getCode();
|
||||
}
|
||||
return result.getStatusCode();
|
||||
}
|
||||
|
||||
public String getErrMsg() {
|
||||
if (result == null) {
|
||||
return ErrorCode.ERR_UNKNOWN_ERROR.getErrorMsg();
|
||||
}
|
||||
if (!result.isSetErrMessage()) {
|
||||
return "";
|
||||
}
|
||||
return result.getErrMessage();
|
||||
}
|
||||
|
||||
private Map<String, TExprNode> getForwardUserVariables(Map<String, LiteralExpr> userVariables) {
|
||||
Map<String, TExprNode> forwardVariables = Maps.newHashMap();
|
||||
for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
|
||||
LiteralExpr literalExpr = entry.getValue();
|
||||
TExpr tExpr = literalExpr.treeToThrift();
|
||||
TExprNode tExprNode = tExpr.nodes.get(0);
|
||||
forwardVariables.put(entry.getKey(), tExprNode);
|
||||
}
|
||||
return forwardVariables;
|
||||
}
|
||||
|
||||
public static class ForwardToFEException extends RuntimeException {
|
||||
|
||||
private static final Map<Integer, String> TYPE_MSG_MAP =
|
||||
ImmutableMap.<Integer, String>builder()
|
||||
.put(TTransportException.UNKNOWN, "Unknown exception")
|
||||
.put(TTransportException.NOT_OPEN, "Connection is not open")
|
||||
.put(TTransportException.ALREADY_OPEN, "Connection has already opened up")
|
||||
.put(TTransportException.TIMED_OUT, "Connection timeout")
|
||||
.put(TTransportException.END_OF_FILE, "EOF")
|
||||
.put(TTransportException.CORRUPTED_DATA, "Corrupted data")
|
||||
.build();
|
||||
|
||||
private final String msg;
|
||||
|
||||
public ForwardToFEException(String msg, TTransportException exception) {
|
||||
this.msg = msg + ", cause: " + TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMessage() {
|
||||
return msg;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user