From 6d2924668e31616b694f82b11409f42dbdb63c23 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 12 Mar 2024 22:44:30 +0800 Subject: [PATCH] [fix](audit-loader) fix invalid token check logic (#32095) The check of the token should be forwarded to Master FE. I add a new RPC method `checkToken()` in Frontend for this logic. Otherwise, after enable the audit loader, the log from non-master FE can not be loaded to audit table with `Invalid token` error. --- .../apache/doris/httpv2/rest/LoadAction.java | 7 ++- .../doris/load/loadv2/TokenManager.java | 61 ++++++++++++++++--- .../doris/service/FrontendServiceImpl.java | 28 ++++++--- gensrc/thrift/FrontendService.thrift | 2 + 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 6952bd37b5..6be5654a2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; @@ -362,7 +363,11 @@ public class LoadAction extends RestBaseController { // temporarily addressing the users' needs for audit logs. // So this function is not widely tested under general scenario private boolean checkClusterToken(String token) { - return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); + try { + return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); + } catch (UserException e) { + throw new UnauthorizedException(e.getMessage()); + } } // NOTE: This function can only be used for AuditlogPlugin stream load for now. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java index 6443e6b232..ca714d66b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java @@ -63,11 +63,6 @@ public class TokenManager { return UUID.randomUUID().toString(); } - // this method only will be called in master node, since stream load only send message to master. - public boolean checkAuthToken(String token) { - return tokenQueue.contains(token); - } - public String acquireToken() throws UserException { if (Env.getCurrentEnv().isMaster() || FeConstants.runningUnitTest) { return tokenQueue.peek(); @@ -81,9 +76,8 @@ public class TokenManager { } } - public String acquireTokenFromMaster() throws TException { + private String acquireTokenFromMaster() throws TException { TNetworkAddress thriftAddress = getMasterAddress(); - FrontendService.Client client = getClient(thriftAddress); if (LOG.isDebugEnabled()) { @@ -108,7 +102,7 @@ public class TokenManager { } else { TMySqlLoadAcquireTokenResult result = client.acquireToken(); if (result.getStatus().getStatusCode() != TStatusCode.OK) { - throw new TException("commit failed."); + throw new TException("acquire token from master failed. " + result.getStatus()); } isReturnToPool = true; return result.getToken(); @@ -122,6 +116,57 @@ public class TokenManager { } } + /** + * Check if the token is valid. + * If this is not Master FE, will send the request to Master FE. + */ + public boolean checkAuthToken(String token) throws UserException { + if (Env.getCurrentEnv().isMaster() || FeConstants.runningUnitTest) { + return tokenQueue.contains(token); + } else { + try { + return checkTokenFromMaster(token); + } catch (TException e) { + LOG.warn("check token error", e); + throw new UserException("Check token from master failed", e); + } + } + } + + private boolean checkTokenFromMaster(String token) throws TException { + TNetworkAddress thriftAddress = getMasterAddress(); + FrontendService.Client client = getClient(thriftAddress); + + if (LOG.isDebugEnabled()) { + LOG.debug("Send check token to Master {}", thriftAddress); + } + + boolean isReturnToPool = false; + try { + boolean result = client.checkToken(token); + isReturnToPool = true; + return result; + } catch (TTransportException e) { + boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); + if (!ok) { + throw e; + } + if (e.getType() == TTransportException.TIMED_OUT) { + throw e; + } else { + boolean result = client.checkToken(token); + isReturnToPool = true; + return result; + } + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } + private TNetworkAddress getMasterAddress() throws TException { Env.getCurrentEnv().checkReadyOrThrowTException(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 97b6f52e33..7f2b0db8c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1063,12 +1063,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { } } - private void checkToken(String token) throws AuthenticationException { - if (!Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token)) { - throw new AuthenticationException("Un matched cluster token."); - } - } - private void checkPassword(String user, String passwd, String clientIp) throws AuthenticationException { final String fullUserName = ClusterNamespace.getNameFromFullName(user); @@ -1133,7 +1127,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } else { - checkToken(request.getToken()); + if (!checkToken(request.getToken())) { + throw new AuthenticationException("Invalid token: " + request.getToken()); + } } // check label @@ -1344,7 +1340,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (request.isSetAuthCode()) { // CHECKSTYLE IGNORE THIS LINE } else if (request.isSetToken()) { - checkToken(request.getToken()); + if (!checkToken(request.getToken())) { + throw new AuthenticationException("Invalid token: " + request.getToken()); + } } else { // refactoring it if (CollectionUtils.isNotEmpty(request.getTbls())) { @@ -2406,6 +2404,20 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + @Override + public boolean checkToken(String token) { + String clientAddr = getClientAddrAsString(); + if (LOG.isDebugEnabled()) { + LOG.debug("receive check token request from client: {}", clientAddr); + } + try { + return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + return false; + } + } + @Override public TCheckAuthResult checkAuth(TCheckAuthRequest request) throws TException { String clientAddr = getClientAddrAsString(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 176199325c..17bc432d80 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1435,6 +1435,8 @@ service FrontendService { TMySqlLoadAcquireTokenResult acquireToken() + bool checkToken(1: string token) + TConfirmUnusedRemoteFilesResult confirmUnusedRemoteFiles(1: TConfirmUnusedRemoteFilesRequest request) TCheckAuthResult checkAuth(1: TCheckAuthRequest request)