select coordinator node from user's tag when exec streaming load (#27106)

This commit is contained in:
wangbo
2023-11-16 19:55:50 +08:00
committed by GitHub
parent 0ac3984d4b
commit 492a22dced

View File

@ -27,6 +27,7 @@ import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
@ -46,6 +47,7 @@ import org.springframework.web.servlet.view.RedirectView;
import java.net.URI;
import java.util.List;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -260,7 +262,11 @@ public class LoadAction extends RestBaseController {
}
private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException {
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
String qualifiedUser = ConnectContext.get().getQualifiedUser();
Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
.addTags(userTags)
.needLoadAvailable().build();
List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);