[branch-2.1][Refactor]use async to get be resource (#38389) (#39826)

pick #38389
This commit is contained in:
wangbo
2024-08-23 17:16:19 +08:00
committed by GitHub
parent e03b887a97
commit 7a7292ad5a
11 changed files with 92 additions and 84 deletions

View File

@ -17,24 +17,26 @@
package org.apache.doris.resource;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.proto.InternalService;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TGetBeResourceRequest;
import org.apache.doris.thrift.TGetBeResourceResult;
import org.apache.doris.thrift.TGlobalResourceUsage;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class AdmissionControl extends MasterDaemon {
@ -71,54 +73,44 @@ public class AdmissionControl extends MasterDaemon {
Collection<Backend> backends = clusterInfoService.getIdToBackend().values();
this.currentMemoryLimit = Config.query_queue_by_be_used_memory;
boolean tmpIsAllBeMemoryEnough = true;
List<Future<InternalService.PGetBeResourceResponse>> futureList = new ArrayList();
for (Backend be : backends) {
if (!be.isAlive()) {
continue;
}
TNetworkAddress address = null;
BackendService.Client client = null;
TGetBeResourceResult result = null;
boolean rpcOk = true;
try {
address = new TNetworkAddress(be.getHost(), be.getBePort());
client = ClientPool.backendPool.borrowObject(address, 5000);
result = client.getBeResource(new TGetBeResourceRequest());
} catch (Throwable t) {
rpcOk = false;
LOG.warn("get be {} resource failed, ", be.getHost(), t);
} finally {
try {
if (rpcOk) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
} catch (Throwable e) {
LOG.warn("return rpc client failed. related backend[{}]", be.getHost(),
e);
}
}
if (result != null && result.isSetGlobalResourceUsage()) {
TGlobalResourceUsage globalResourceUsage = result.getGlobalResourceUsage();
if (globalResourceUsage != null && globalResourceUsage.isSetMemLimit()
&& globalResourceUsage.isSetMemUsage()) {
long memUsageL = globalResourceUsage.getMemUsage();
long memLimitL = globalResourceUsage.getMemLimit();
double memUsage = Double.valueOf(String.valueOf(memUsageL));
double memLimit = Double.valueOf(String.valueOf(memLimitL));
double memUsagePercent = memUsage / memLimit;
final InternalService.PGetBeResourceRequest request = InternalService.PGetBeResourceRequest.newBuilder()
.build();
Future<InternalService.PGetBeResourceResponse> response = BackendServiceProxy.getInstance()
.getBeResourceAsync(be.getBrpcAddress(), 5, request);
futureList.add(response);
}
if (memUsagePercent > this.currentMemoryLimit) {
tmpIsAllBeMemoryEnough = false;
for (Future<InternalService.PGetBeResourceResponse> future : futureList) {
if (future == null) {
continue;
}
try {
InternalService.PGetBeResourceResponse response = future.get(5, TimeUnit.SECONDS);
if (response.hasStatus()) {
Status status = new Status(response.getStatus());
if (status.getErrorCode() == TStatusCode.OK) {
InternalService.PGlobalResourceUsage globalUsage = response.getGlobalBeResourceUsage();
long memUsageL = globalUsage.getMemUsage();
long memLimitL = globalUsage.getMemLimit();
double memUsage = Double.valueOf(String.valueOf(memUsageL));
double memLimit = Double.valueOf(String.valueOf(memLimitL));
double memUsagePercent = memUsage / memLimit;
if (memUsagePercent > this.currentMemoryLimit) {
tmpIsAllBeMemoryEnough = false;
break;
}
}
LOG.debug(
"be ip:{}, mem limit:{}, mem usage:{}, mem usage percent:{}, "
+ "query queue mem:{}, query wait size:{}",
be.getHost(), memLimitL, memUsageL, memUsagePercent, this.currentMemoryLimit,
this.queryWaitQueue.size());
}
} catch (Throwable t) {
LOG.warn("wait get be resource response failed, ", t);
}
}
this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough;
}

View File

@ -184,6 +184,11 @@ public class BackendServiceClient {
}
public Future<InternalService.PGetBeResourceResponse> getBeResource(InternalService.PGetBeResourceRequest request,
int timeoutSec) {
return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).getBeResource(request);
}
public void shutdown() {
ConnectivityState state = channel.getState(false);
LOG.warn("shut down backend service client: {}, channel state: {}", address, state);

View File

@ -556,5 +556,16 @@ public class BackendServiceProxy {
}
}
public Future<InternalService.PGetBeResourceResponse> getBeResourceAsync(TNetworkAddress address, int timeoutSec,
InternalService.PGetBeResourceRequest request) {
try {
final BackendServiceClient client = getProxy(address);
return client.getBeResource(request, timeoutSec);
} catch (Throwable e) {
LOG.warn("get be resource failed, address={}:{}",
address.getHostname(), address.getPort(), e);
}
return null;
}
}