[Feature](auditloader) Plugin auditloader use auth token to avoid using cleartext passwords in config (#26278)

Doris FE will check if stream load http request has auth token after checking password failed;
Plugin audit-log loader can use auth token if plugin config set use_auth_token to true

Co-authored-by: Mingyu Chen <morningman.cmy@gmail.com>
This commit is contained in:
zhiqiang
2023-11-07 05:14:57 -06:00
committed by GitHub
parent 38a14c3325
commit ad1f635070
4 changed files with 138 additions and 8 deletions

View File

@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
@ -43,6 +44,7 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
import java.net.URI;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -81,7 +83,20 @@ public class LoadAction extends RestBaseController {
return redirectToHttps(request);
}
executeCheckPassword(request, response);
try {
executeCheckPassword(request, response);
} catch (UnauthorizedException unauthorizedException) {
if (LOG.isDebugEnabled()) {
LOG.debug("Check password failed, going to check auth token, request: {}", request.toString());
}
if (!checkClusterToken(request)) {
throw unauthorizedException;
} else {
return executeWithClusterToken(request, db, table, true);
}
}
return executeWithoutPassword(request, response, db, table, true);
}
@ -257,4 +272,99 @@ public class LoadAction extends RestBaseController {
}
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
// NOTE: This function can only be used for AuditlogPlugin stream load for now.
// AuditlogPlugin should be re-disigned carefully, and blow method focuses on
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private boolean checkClusterToken(HttpServletRequest request) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking cluser token, request {}", request.toString());
}
String authToken = request.getHeader("token");
if (Strings.isNullOrEmpty(authToken)) {
return false;
}
return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken);
}
// NOTE: This function can only be used for AuditlogPlugin stream load for now.
// AuditlogPlugin should be re-disigned carefully, and blow method focuses on
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private Object executeWithClusterToken(HttpServletRequest request, String db,
String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
ctx.setRemoteIP(request.getRemoteAddr());
String dbName = db;
String tableName = table;
// A 'Load' request must have 100-continue header
if (request.getHeader(HttpHeaderNames.EXPECT.toString()) == null) {
return new RestBaseResult("There is no 100-continue header");
}
final String clusterName = ConnectContext.get().getClusterName();
if (Strings.isNullOrEmpty(clusterName)) {
return new RestBaseResult("No cluster selected.");
}
if (Strings.isNullOrEmpty(dbName)) {
return new RestBaseResult("No database selected.");
}
if (Strings.isNullOrEmpty(tableName)) {
return new RestBaseResult("No table selected.");
}
String label = request.getParameter(LABEL_KEY);
if (isStreamLoad) {
label = request.getHeader(LABEL_KEY);
}
if (!isStreamLoad && Strings.isNullOrEmpty(label)) {
// for stream load, the label can be generated by system automatically
return new RestBaseResult("No label selected.");
}
TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);
LOG.info("Redirect load action with auth token to destination={},"
+ "stream: {}, db: {}, tbl: {}, label: {}",
redirectAddr.toString(), isStreamLoad, dbName, tableName, label);
URI urlObj = null;
URI resultUriObj = null;
String urlStr = request.getRequestURI();
String userInfo = null;
try {
urlObj = new URI(urlStr);
resultUriObj = new URI("http", userInfo, redirectAddr.getHostname(),
redirectAddr.getPort(), urlObj.getPath(), "", null);
} catch (Exception e) {
throw new RuntimeException(e);
}
String redirectUrl = resultUriObj.toASCIIString();
if (!Strings.isNullOrEmpty(request.getQueryString())) {
redirectUrl += request.getQueryString();
}
LOG.info("Redirect url: {}", redirectUrl);
RedirectView redirectView = new RedirectView(redirectUrl);
redirectView.setContentType("text/html;charset=utf-8");
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
return redirectView;
} catch (Exception e) {
LOG.warn("Failed to execute stream load with cluster token, {}", e);
return new RestBaseResult(e.getMessage());
}
}
}