[improve](stream-load) choose stream load coordinator by round robin (#28915)

This commit is contained in:
HHoflittlefish777
2024-01-06 17:20:48 +08:00
committed by GitHub
parent bdc69a4175
commit 720bee7c1e

View File

@ -62,6 +62,8 @@ public class LoadAction extends RestBaseController {
private ExecuteEnv execEnv = ExecuteEnv.getInstance();
private int lastSelectedBackendIndex = 0;
@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT)
public Object load(HttpServletRequest request, HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
@ -315,6 +317,12 @@ public class LoadAction extends RestBaseController {
}
}
private final synchronized int getLastSelectedBackendIndexAndUpdate() {
int index = lastSelectedBackendIndex;
lastSelectedBackendIndex = (index >= Integer.MAX_VALUE - 1) ? 0 : index + 1;
return index;
}
private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException {
Backend backend = null;
BeSelectionPolicy policy = null;
@ -322,7 +330,9 @@ public class LoadAction extends RestBaseController {
Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
policy = new BeSelectionPolicy.Builder()
.addTags(userTags)
.setEnableRoundRobin(true)
.needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);