diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index c30b936769..0f9fd47948 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -1111,14 +1111,4 @@ void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result, } } -void BackendService::get_be_resource(TGetBeResourceResult& result, - const TGetBeResourceRequest& request) { - int64_t mem_usage = PerfCounters::get_vm_rss(); - int64_t mem_limit = MemInfo::mem_limit(); - TGlobalResourceUsage global_resource_usage; - global_resource_usage.__set_mem_limit(mem_limit); - global_resource_usage.__set_mem_usage(mem_usage); - result.__set_global_resource_usage(global_resource_usage); -} - } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index bbcf103167..9d53ec4bc4 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -140,9 +140,6 @@ public: void query_ingest_binlog(TQueryIngestBinlogResult& result, const TQueryIngestBinlogRequest& request) override; - void get_be_resource(TGetBeResourceResult& result, - const TGetBeResourceRequest& request) override; - private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); ExecEnv* _exec_env = nullptr; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ad8769566c..0801f30fb2 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -2328,4 +2328,25 @@ void PInternalServiceImpl::get_wal_queue_size(google::protobuf::RpcController* c } } +void PInternalServiceImpl::get_be_resource(google::protobuf::RpcController* controller, + const PGetBeResourceRequest* request, + PGetBeResourceResponse* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([response, done]() { + brpc::ClosureGuard closure_guard(done); + int64_t mem_limit = MemInfo::mem_limit(); + int64_t mem_usage = PerfCounters::get_vm_rss(); + + PGlobalResourceUsage* global_resource_usage = response->mutable_global_be_resource_usage(); + global_resource_usage->set_mem_limit(mem_limit); + global_resource_usage->set_mem_usage(mem_usage); + + Status st = Status::OK(); + response->mutable_status()->set_status_code(st.code()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + } +} + } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index abd6d6d1ca..4bf09255ff 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -269,6 +269,10 @@ private: PFetchColIdsResponse* response, google::protobuf::Closure* done); + void get_be_resource(google::protobuf::RpcController* controller, + const PGetBeResourceRequest* request, PGetBeResourceResponse* response, + google::protobuf::Closure* done) override; + private: ExecEnv* _exec_env = nullptr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java index 480afcde5b..ad4e9f94c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java @@ -17,24 +17,26 @@ package org.apache.doris.resource; -import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; +import org.apache.doris.common.Status; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.proto.InternalService; import org.apache.doris.resource.workloadgroup.QueueToken; +import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.BackendService; -import org.apache.doris.thrift.TGetBeResourceRequest; -import org.apache.doris.thrift.TGetBeResourceResult; -import org.apache.doris.thrift.TGlobalResourceUsage; -import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class AdmissionControl extends MasterDaemon { @@ -71,54 +73,44 @@ public class AdmissionControl extends MasterDaemon { Collection backends = clusterInfoService.getIdToBackend().values(); this.currentMemoryLimit = Config.query_queue_by_be_used_memory; boolean tmpIsAllBeMemoryEnough = true; + List> futureList = new ArrayList(); for (Backend be : backends) { if (!be.isAlive()) { continue; } - TNetworkAddress address = null; - BackendService.Client client = null; - TGetBeResourceResult result = null; - boolean rpcOk = true; - try { - address = new TNetworkAddress(be.getHost(), be.getBePort()); - client = ClientPool.backendPool.borrowObject(address, 5000); - result = client.getBeResource(new TGetBeResourceRequest()); - } catch (Throwable t) { - rpcOk = false; - LOG.warn("get be {} resource failed, ", be.getHost(), t); - } finally { - try { - if (rpcOk) { - ClientPool.backendPool.returnObject(address, client); - } else { - ClientPool.backendPool.invalidateObject(address, client); - } - } catch (Throwable e) { - LOG.warn("return rpc client failed. related backend[{}]", be.getHost(), - e); - } - } - if (result != null && result.isSetGlobalResourceUsage()) { - TGlobalResourceUsage globalResourceUsage = result.getGlobalResourceUsage(); - if (globalResourceUsage != null && globalResourceUsage.isSetMemLimit() - && globalResourceUsage.isSetMemUsage()) { - long memUsageL = globalResourceUsage.getMemUsage(); - long memLimitL = globalResourceUsage.getMemLimit(); - double memUsage = Double.valueOf(String.valueOf(memUsageL)); - double memLimit = Double.valueOf(String.valueOf(memLimitL)); - double memUsagePercent = memUsage / memLimit; + final InternalService.PGetBeResourceRequest request = InternalService.PGetBeResourceRequest.newBuilder() + .build(); + Future response = BackendServiceProxy.getInstance() + .getBeResourceAsync(be.getBrpcAddress(), 5, request); + futureList.add(response); + } - if (memUsagePercent > this.currentMemoryLimit) { - tmpIsAllBeMemoryEnough = false; + for (Future future : futureList) { + if (future == null) { + continue; + } + try { + InternalService.PGetBeResourceResponse response = future.get(5, TimeUnit.SECONDS); + if (response.hasStatus()) { + Status status = new Status(response.getStatus()); + if (status.getErrorCode() == TStatusCode.OK) { + InternalService.PGlobalResourceUsage globalUsage = response.getGlobalBeResourceUsage(); + long memUsageL = globalUsage.getMemUsage(); + long memLimitL = globalUsage.getMemLimit(); + double memUsage = Double.valueOf(String.valueOf(memUsageL)); + double memLimit = Double.valueOf(String.valueOf(memLimitL)); + double memUsagePercent = memUsage / memLimit; + if (memUsagePercent > this.currentMemoryLimit) { + tmpIsAllBeMemoryEnough = false; + break; + } } - LOG.debug( - "be ip:{}, mem limit:{}, mem usage:{}, mem usage percent:{}, " - + "query queue mem:{}, query wait size:{}", - be.getHost(), memLimitL, memUsageL, memUsagePercent, this.currentMemoryLimit, - this.queryWaitQueue.size()); } + } catch (Throwable t) { + LOG.warn("wait get be resource response failed, ", t); } } + this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 924955e662..9416687b58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -184,6 +184,11 @@ public class BackendServiceClient { } + public Future getBeResource(InternalService.PGetBeResourceRequest request, + int timeoutSec) { + return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).getBeResource(request); + } + public void shutdown() { ConnectivityState state = channel.getState(false); LOG.warn("shut down backend service client: {}, channel state: {}", address, state); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 0fce50c327..830e7da1a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -556,5 +556,16 @@ public class BackendServiceProxy { } } + public Future getBeResourceAsync(TNetworkAddress address, int timeoutSec, + InternalService.PGetBeResourceRequest request) { + try { + final BackendServiceClient client = getProxy(address); + return client.getBeResource(request, timeoutSec); + } catch (Throwable e) { + LOG.warn("get be resource failed, address={}:{}", + address.getHostname(), address.getPort(), e); + } + return null; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index af921cd182..d03d359568 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -29,8 +29,6 @@ import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentResult; import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; -import org.apache.doris.thrift.TGetBeResourceRequest; -import org.apache.doris.thrift.TGetBeResourceResult; import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; import org.apache.doris.thrift.TNetworkAddress; @@ -150,11 +148,6 @@ public class GenericPoolTest { return null; } - @Override - public TGetBeResourceResult getBeResource(TGetBeResourceRequest request) throws TException { - return null; - } - @Override public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException { // TODO Auto-generated method stub diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 0537e7fb7b..93ce8b6766 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -48,8 +48,6 @@ import org.apache.doris.thrift.TExportState; import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; import org.apache.doris.thrift.TFinishTaskRequest; -import org.apache.doris.thrift.TGetBeResourceRequest; -import org.apache.doris.thrift.TGetBeResourceResult; import org.apache.doris.thrift.THeartbeatResult; import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; @@ -387,10 +385,6 @@ public class MockedBackendFactory { return new TPublishTopicResult(new TStatus(TStatusCode.OK)); } - @Override - public TGetBeResourceResult getBeResource(TGetBeResourceRequest request) throws TException { - return null; - } @Override public TStatus submitExportTask(TExportTaskRequest request) throws TException { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 1d68d60aa6..c2ec320554 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -899,6 +899,19 @@ message PFetchRemoteSchemaResponse { optional TabletSchemaPB merged_schema = 2; } +message PGetBeResourceRequest { +} + +message PGlobalResourceUsage { + optional int64 mem_limit = 1; + optional int64 mem_usage = 2; +} + +message PGetBeResourceResponse { + optional PStatus status = 1; + optional PGlobalResourceUsage global_be_resource_usage = 2; +} + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -947,5 +960,6 @@ service PBackendService { rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns (PFetchArrowFlightSchemaResult); rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns (PFetchRemoteSchemaResponse); rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); + rpc get_be_resource(PGetBeResourceRequest) returns (PGetBeResourceResponse); }; diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 0255d0d61a..daf49a2520 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -243,18 +243,6 @@ struct TPublishTopicResult { } -struct TGetBeResourceRequest { -} - -struct TGlobalResourceUsage { - 1: optional i64 mem_limit - 2: optional i64 mem_usage -} - -struct TGetBeResourceResult { - 1: optional TGlobalResourceUsage global_resource_usage -} - service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -312,5 +300,4 @@ service BackendService { TPublishTopicResult publish_topic_info(1:TPublishTopicRequest topic_request); - TGetBeResourceResult get_be_resource(1: TGetBeResourceRequest request); }