From 720bee7c1eea5c1d7f2ac98c1cec1b2c5c1c4613 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Sat, 6 Jan 2024 17:20:48 +0800 Subject: [PATCH] [improve](stream-load) choose stream load coordinator by round robin (#28915) --- .../java/org/apache/doris/httpv2/rest/LoadAction.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index f4a0953949..787f77f1da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -62,6 +62,8 @@ public class LoadAction extends RestBaseController { private ExecuteEnv execEnv = ExecuteEnv.getInstance(); + private int lastSelectedBackendIndex = 0; + @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT) public Object load(HttpServletRequest request, HttpServletResponse response, @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { @@ -315,6 +317,12 @@ public class LoadAction extends RestBaseController { } } + private final synchronized int getLastSelectedBackendIndexAndUpdate() { + int index = lastSelectedBackendIndex; + lastSelectedBackendIndex = (index >= Integer.MAX_VALUE - 1) ? 0 : index + 1; + return index; + } + private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException { Backend backend = null; BeSelectionPolicy policy = null; @@ -322,7 +330,9 @@ public class LoadAction extends RestBaseController { Set userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); policy = new BeSelectionPolicy.Builder() .addTags(userTags) + .setEnableRoundRobin(true) .needLoadAvailable().build(); + policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate(); List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); if (backendIds.isEmpty()) { throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);