[fix](ctx) manager the lifecycle of connection context (#29346)

In FrontendService, we may create some connection context and set it as a thread local varaible.
These context should be removed from thread local after call.
Otherwise, it may be reused by other thread incorrectly.
This commit is contained in:
Mingyu Chen
2024-01-01 23:32:28 +08:00
committed by GitHub
parent 738abac9ed
commit 4cbbd25d8c
3 changed files with 32 additions and 22 deletions

View File

@ -30,6 +30,7 @@ import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@ -122,9 +123,13 @@ public class FederationBackendPolicy {
Set<Tag> tags = Sets.newHashSet();
if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
throw new UserException("No valid resource tag for user: " + qualifiedUser);
// Some request from stream load(eg, mysql load) may not set user info in ConnectContext
// just ignore it.
if (!Strings.isNullOrEmpty(qualifiedUser)) {
tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
throw new UserException("No valid resource tag for user: " + qualifiedUser);
}
}
} else {
LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer");

View File

@ -1786,9 +1786,19 @@ public class FrontendServiceImpl implements FrontendService.Iface {
TStreamLoadPutResult result = new TStreamLoadPutResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
// create connect context
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQueryId(request.getLoadId());
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(request.getUser(), "%"));
ctx.setQualifiedUser(request.getUser());
ctx.setBackendId(request.getBackendId());
ctx.setThreadLocalInfo();
try {
if (!Strings.isNullOrEmpty(request.getLoadSql())) {
httpStreamPutImpl(request, result);
httpStreamPutImpl(request, result, ctx);
return result;
} else {
if (Config.enable_pipeline_load) {
@ -1806,6 +1816,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
return result;
} finally {
ConnectContext.remove();
}
return result;
}
@ -1917,12 +1929,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result)
private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result, ConnectContext ctx)
throws UserException {
LOG.info("receive http stream put request");
if (LOG.isDebugEnabled()) {
LOG.debug("receive http stream put request: {}", request);
}
String originStmt = request.getLoadSql();
ConnectContext ctx = new ConnectContext();
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
} else if (Strings.isNullOrEmpty(request.getToken())) {
@ -1930,12 +1942,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
}
ctx.setEnv(Env.getCurrentEnv());
ctx.setQueryId(request.getLoadId());
ctx.setCurrentUserIdentity(UserIdentity.ROOT);
ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
ctx.setBackendId(request.getBackendId());
ctx.setThreadLocalInfo();
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
@ -2859,16 +2865,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
restoreStmt.setIsBeingSynced();
LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
ctx = new ConnectContext();
ctx.setThreadLocalInfo();
}
ConnectContext ctx = new ConnectContext();
ctx.setQualifiedUser(request.getUser());
String fullUserName = ClusterNamespace.getNameFromFullName(request.getUser());
UserIdentity currentUserIdentity = new UserIdentity(fullUserName, "%");
ctx.setCurrentUserIdentity(currentUserIdentity);
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(fullUserName, "%"));
ctx.setThreadLocalInfo();
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
restoreStmt.analyze(analyzer);
DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt);
@ -2880,6 +2881,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
} finally {
ConnectContext.remove();
}
return result;