[cherry-pick](branch-2.1) Pick "[Enhancement](group commit)Optimize be select for group commit #35558" (#37830)

Pick #35558
This commit is contained in:
abmdocrt
2024-07-24 09:21:07 +08:00
committed by GitHub
parent cf2120a44a
commit 792bd7c74a
9 changed files with 421 additions and 16 deletions

View File

@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import java.util.concurrent.atomic.AtomicLongArray;
public class SlidingWindowCounter {
private final int windowSizeInSeconds;
private final int numberOfBuckets;
private final AtomicLongArray buckets;
private final AtomicLongArray bucketTimestamps;
public SlidingWindowCounter(int windowSizeInSeconds) {
this.windowSizeInSeconds = windowSizeInSeconds;
this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 1 second
this.buckets = new AtomicLongArray(numberOfBuckets);
this.bucketTimestamps = new AtomicLongArray(numberOfBuckets);
}
private int getCurrentBucketIndex() {
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
return (int) (currentTime % numberOfBuckets);
}
private void updateCurrentBucket() {
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
int currentBucketIndex = getCurrentBucketIndex();
long bucketTimestamp = bucketTimestamps.get(currentBucketIndex);
if (currentTime - bucketTimestamp >= 1) {
buckets.set(currentBucketIndex, 0);
bucketTimestamps.set(currentBucketIndex, currentTime);
}
}
public void add(long value) {
updateCurrentBucket();
int bucketIndex = getCurrentBucketIndex();
buckets.addAndGet(bucketIndex, value);
}
public long get() {
updateCurrentBucket();
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
long count = 0;
for (int i = 0; i < numberOfBuckets; i++) {
if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) {
count += buckets.get(i);
}
}
return count;
}
public String toString() {
return String.valueOf(get());
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.httpv2.rest;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@ -128,11 +129,16 @@ public class LoadAction extends RestBaseController {
String sql = request.getHeader("sql");
LOG.info("streaming load sql={}", sql);
boolean groupCommit = false;
long tableId = -1;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
return new RestBaseResult(msg);
@ -150,8 +156,7 @@ public class LoadAction extends RestBaseController {
}
String label = request.getHeader(LABEL_KEY);
TNetworkAddress redirectAddr;
redirectAddr = selectRedirectBackend(groupCommit);
TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit, tableId);
LOG.info("redirect load action to destination={}, label: {}",
redirectAddr.toString(), label);
@ -274,7 +279,9 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult(e.getMessage());
}
} else {
redirectAddr = selectRedirectBackend(groupCommit);
long tableId = ((OlapTable) ((Database) Env.getCurrentEnv().getCurrentCatalog().getDb(dbName)
.get()).getTable(tableName).get()).getId();
redirectAddr = selectRedirectBackend(request, groupCommit, tableId);
}
LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}",
@ -305,7 +312,7 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected.");
}
TNetworkAddress redirectAddr = selectRedirectBackend(false);
TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1);
LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}",
redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation);
@ -323,12 +330,18 @@ public class LoadAction extends RestBaseController {
return index;
}
private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException {
private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId)
throws LoadException {
long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L);
if (debugBackendId != -1L) {
Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId);
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
return selectLocalRedirectBackend(groupCommit, request, tableId);
}
private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId)
throws LoadException {
Backend backend = null;
BeSelectionPolicy policy = null;
String qualifiedUser = ConnectContext.get().getQualifiedUser();
@ -348,12 +361,17 @@ public class LoadAction extends RestBaseController {
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
if (groupCommit) {
for (Long backendId : backendIds) {
Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId);
if (!candidateBe.isDecommissioned()) {
backend = candidateBe;
break;
}
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(request.getRemoteAddr());
ctx.setThreadLocalInfo();
try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx, false);
} catch (DdlException e) {
throw new RuntimeException(e);
}
} else {
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
@ -416,10 +434,10 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult("No label selected.");
}
TNetworkAddress redirectAddr = selectRedirectBackend(false);
TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1);
LOG.info("Redirect load action with auth token to destination={},"
+ "stream: {}, db: {}, tbl: {}, label: {}",
+ "stream: {}, db: {}, tbl: {}, label: {}",
redirectAddr.toString(), isStreamLoad, dbName, tableName, label);
URI urlObj = null;

View File

@ -18,9 +18,16 @@
package org.apache.doris.load;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.SlidingWindowCounter;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
@ -28,11 +35,17 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class GroupCommitManager {
@ -40,6 +53,11 @@ public class GroupCommitManager {
private Set<Long> blockedTableIds = new HashSet<>();
// Table id to BE id map. Only for group commit.
private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
// BE id to pressure map. Only for group commit.
private Map<Long, SlidingWindowCounter> tablePressureMap = new ConcurrentHashMap<>();
public boolean isBlock(long tableId) {
return blockedTableIds.contains(tableId);
}
@ -163,4 +181,146 @@ public class GroupCommitManager {
return size;
}
public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud)
throws LoadException, DdlException {
// If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE
// can select a BE and return this BE id to follower FE.
if (!Env.getCurrentEnv().isMaster()) {
try {
long backendId = new MasterOpExecutor(context)
.getGroupCommitLoadBeId(tableId);
return Env.getCurrentSystemInfo().getBackend(backendId);
} catch (Exception e) {
throw new LoadException(e.getMessage());
}
} else {
// Master FE will select BE by itself.
return Env.getCurrentSystemInfo()
.getBackend(selectBackendForGroupCommitInternal(tableId));
}
}
public long selectBackendForGroupCommitInternal(long tableId)
throws LoadException, DdlException {
// Understanding Group Commit and Backend Selection Logic
//
// Group commit is a server-side technique used for batching data imports.
// The primary purpose of group commit is to enhance import performance by
// reducing the number of versions created for high-frequency, small-batch imports.
// Without batching, each import operation creates a separate version, similar to a rowset in an LSM Tree,
// which can consume significant compaction resources and degrade system performance.
// By batching data, fewer versions are generated from the same amount of data,
// thus minimizing compaction and improving performance. For detailed usage,
// you can refer to the Group Commit Manual
// (https://doris.incubator.apache.org/docs/data-operate/import/group-commit-manual/) .
//
// The specific backend (BE) selection logic for group commits aims to
// direct data belonging to the same table to the same BE for batching.
// This is because group commit batches data imported to the same table
// on the same BE into a single version, which is then flushed periodically.
// For example, if data for the same table is distributed across three BEs,
// it will result in three versions.
// Conversely, if data for four different tables is directed to the same BE,
// it will create four versions. However,
// directing all data for the same table to a single BE will only produce one version.
//
// To optimize performance and avoid overloading a single BE, the strategy for selecting a BE works as follows:
//
// If a BE is already handling imports for table A and is not under significant load,
// the data is sent to this BE.
// If the BE is overloaded or if there is no existing record of a BE handling imports for table A,
// a BE is chosen at random. This BE is then recorded along with the mapping of table A and its load level.
// This approach ensures that group commits can effectively batch data together
// while managing the load on each BE efficiently.
return selectBackendForLocalGroupCommitInternal(tableId);
}
private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException {
LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(),
tablePressureMap.toString());
Long cachedBackendId = getCachedBackend(tableId);
if (cachedBackendId != null) {
return cachedBackendId;
}
List<Backend> backends = new ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends());
if (backends.isEmpty()) {
throw new LoadException("No alive backend");
}
// If the cached backend is not active or decommissioned, select a random new backend.
Long randomBackendId = getRandomBackend(tableId, backends);
if (randomBackendId != null) {
return randomBackendId;
}
List<String> backendsInfo = backends.stream()
.map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive()
+ ", decommission=" + be.isDecommissioned() + " }")
.collect(Collectors.toList());
throw new LoadException("No suitable backend " + ", backends = " + backendsInfo);
}
@Nullable
private Long getCachedBackend(long tableId) {
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
if (tableToBeMap.containsKey(tableId)) {
if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) {
Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
if (backend.isAlive() && !backend.isDecommissioned()) {
return backend.getId();
} else {
tableToBeMap.remove(tableId);
}
} else {
tableToBeMap.remove(tableId);
}
}
return null;
}
@Nullable
private Long getRandomBackend(long tableId, List<Backend> backends) {
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
Collections.shuffle(backends);
for (Backend backend : backends) {
if (backend.isAlive() && !backend.isDecommissioned()) {
tableToBeMap.put(tableId, backend.getId());
tablePressureMap.put(tableId,
new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
return backend.getId();
}
}
return null;
}
public void updateLoadData(long tableId, long receiveData) {
if (tableId == -1) {
LOG.warn("invalid table id: " + tableId);
}
if (!Env.getCurrentEnv().isMaster()) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
// set user to ADMIN_USER, so that we can get the proper resource tag
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();
try {
new MasterOpExecutor(ctx).updateLoadData(tableId, receiveData);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
updateLoadDataInternal(tableId, receiveData);
}
}
public void updateLoadDataInternal(long tableId, long receiveData) {
if (tablePressureMap.containsKey(tableId)) {
tablePressureMap.get(tableId).add(receiveData);
LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData,
tablePressureMap.toString());
} else {
LOG.warn("can not find backend id: {}", tableId);
}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
@ -202,6 +203,15 @@ public class GroupCommitPlanner {
}
}
protected void selectBackends(ConnectContext ctx) throws DdlException {
try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(this.table.getId(), ctx, false);
} catch (LoadException e) {
throw new DdlException("No suitable backend");
}
}
public Backend getBackend() {
return backend;
}

View File

@ -273,6 +273,8 @@ public class Coordinator implements CoordInterface {
private boolean enablePipelineXEngine = false;
private boolean useNereids = false;
private Backend groupCommitBackend;
// Runtime filter merge instance address and ID
public TNetworkAddress runtimeFilterMergeAddr;
public TUniqueId runtimeFilterMergeInstanceId;
@ -298,6 +300,10 @@ public class Coordinator implements CoordInterface {
// fragmentid -> backendid
private MarkedCountDownLatch<Integer, Long> fragmentsDoneLatch = null;
public void setGroupCommitBe(Backend backend) {
this.groupCommitBackend = backend;
}
public void setTWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}
@ -1955,8 +1961,11 @@ public class Coordinator implements CoordInterface {
if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostport;
if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || (isAllExternalScan
&& Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) {
if (groupCommitBackend != null) {
execHostport = getGroupCommitBackend(addressToBackendID);
} else if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || (
isAllExternalScan
&& Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) {
// 2 cases:
// case 1: user set resource tag, we need to use the BE with the specified resource tags.
// case 2: All scan nodes are external scan node,
@ -2148,7 +2157,9 @@ public class Coordinator implements CoordInterface {
if (params.instanceExecParams.isEmpty()) {
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostport;
if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()
if (groupCommitBackend != null) {
execHostport = getGroupCommitBackend(addressToBackendID);
} else if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()
&& !addressToBackendID.isEmpty()) {
// In this case, we only use the BE where the replica selected by the tag is located to
// execute this query. Otherwise, except for the scan node, the rest of the execution nodes
@ -2172,6 +2183,14 @@ public class Coordinator implements CoordInterface {
}
}
private TNetworkAddress getGroupCommitBackend(Map<TNetworkAddress, Long> addressToBackendID) {
// Used for Nereids planner Group commit insert BE select.
TNetworkAddress execHostport = new TNetworkAddress(groupCommitBackend.getHost(),
groupCommitBackend.getBePort());
addressToBackendID.put(execHostport, groupCommitBackend.getId());
return execHostport;
}
// Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship
// between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter
private void assignRuntimeFilterAddr() throws Exception {

View File

@ -26,6 +26,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TGroupCommitInfo;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TNetworkAddress;
@ -88,6 +89,17 @@ public class MasterOpExecutor {
waitOnReplaying();
}
public long getGroupCommitLoadBeId(long tableId) throws Exception {
result = forward(buildGetGroupCommitLoadBeIdParmas(tableId));
waitOnReplaying();
return result.groupCommitLoadBeId;
}
public void updateLoadData(long tableId, long receiveData) throws Exception {
result = forward(buildUpdateLoadDataParams(tableId, receiveData));
waitOnReplaying();
}
private void waitOnReplaying() throws DdlException {
LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId);
ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs);
@ -187,6 +199,41 @@ public class MasterOpExecutor {
return params;
}
private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId) {
final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo();
groupCommitParams.setGetGroupCommitLoadBeId(true);
groupCommitParams.setGroupCommitLoadTableId(tableId);
final TMasterOpRequest params = new TMasterOpRequest();
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setGroupCommitInfo(groupCommitParams);
params.setDb(ctx.getDatabase());
params.setUser(ctx.getQualifiedUser());
// just make the protocol happy
params.setSql("");
return params;
}
private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveData) {
final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo();
groupCommitParams.setUpdateLoadData(true);
groupCommitParams.setTableId(tableId);
groupCommitParams.setReceiveData(receiveData);
final TMasterOpRequest params = new TMasterOpRequest();
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setGroupCommitInfo(groupCommitParams);
params.setDb(ctx.getDatabase());
params.setUser(ctx.getQualifiedUser());
// just make the protocol happy
params.setSql("");
return params;
}
public ByteBuffer getOutputPacket() {
if (result == null) {
return null;

View File

@ -53,6 +53,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
@ -170,6 +171,7 @@ import org.apache.doris.thrift.TGetTablesParams;
import org.apache.doris.thrift.TGetTablesResult;
import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
import org.apache.doris.thrift.TGetTabletReplicaInfosResult;
import org.apache.doris.thrift.TGroupCommitInfo;
import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
import org.apache.doris.thrift.TInitExternalCtlMetaResult;
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
@ -1008,6 +1010,28 @@ public class FrontendServiceImpl implements FrontendService.Iface {
result.setPacket("".getBytes());
return result;
}
if (params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) {
final TGroupCommitInfo info = params.getGroupCommitInfo();
final TMasterOpResult result = new TMasterOpResult();
try {
result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommitInternal(info.groupCommitLoadTableId));
} catch (LoadException | DdlException e) {
throw new TException(e.getMessage());
}
// just make the protocol happy
result.setPacket("".getBytes());
return result;
}
if (params.getGroupCommitInfo().isUpdateLoadData()) {
final TGroupCommitInfo info = params.getGroupCommitInfo();
final TMasterOpResult result = new TMasterOpResult();
Env.getCurrentEnv().getGroupCommitManager()
.updateLoadData(info.tableId, info.receiveData);
// just make the protocol happy
result.setPacket("".getBytes());
return result;
}
// add this log so that we can track this stmt
if (LOG.isDebugEnabled()) {
@ -1567,6 +1591,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
}
}
if (request.groupCommit) {
try {
Env.getCurrentEnv().getGroupCommitManager().updateLoadData(request.table_id, request.receiveBytes);
} catch (Exception e) {
LOG.warn("Failed to update group commit load data, {}", e.getMessage());
}
}
// get database
Env env = Env.getCurrentEnv();