From 1ebd156b99d1e1eaaedbf8ca0e9d3708438e4b11 Mon Sep 17 00:00:00 2001 From: HaiBo Li Date: Fri, 31 Jul 2020 13:23:24 +0800 Subject: [PATCH] [Feature]Add fetch/update/clear proto of fe&be for cache (#4190) --- .../apache/doris/rpc/BackendServiceProxy.java | 42 +++++++++++++ .../org/apache/doris/rpc/PBackendService.java | 14 +++++ gensrc/proto/internal_service.proto | 62 +++++++++++++++++++ gensrc/proto/palo_internal_service.proto | 3 + 4 files changed, 121 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index d9a90dedea..7a01b7ae3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -28,6 +28,11 @@ import org.apache.doris.proto.PProxyRequest; import org.apache.doris.proto.PProxyResult; import org.apache.doris.proto.PTriggerProfileReportResult; import org.apache.doris.proto.PUniqueId; +import org.apache.doris.proto.PUpdateCacheRequest; +import org.apache.doris.proto.PCacheResponse; +import org.apache.doris.proto.PFetchCacheRequest; +import org.apache.doris.proto.PFetchCacheResult; +import org.apache.doris.proto.PClearCacheRequest; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; @@ -164,6 +169,43 @@ public class BackendServiceProxy { } } + public Future updateCache( + TNetworkAddress address, PUpdateCacheRequest request) throws RpcException{ + try { + PBackendService service = getProxy(address); + return service.updateCache(request); + } catch (Throwable e) { + LOG.warn("update cache catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + + public Future fetchCache( + TNetworkAddress address, PFetchCacheRequest request) throws RpcException { + try { + PBackendService service = getProxy(address); + return service.fetchCache(request); + } catch (Throwable e) { + LOG.warn("fetch cache catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + + public Future clearCache( + TNetworkAddress address, PClearCacheRequest request) throws RpcException { + try { + PBackendService service = getProxy(address); + return service.clearCache(request); + } catch (Throwable e) { + LOG.warn("clear cache catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + + public Future triggerProfileReportAsync( TNetworkAddress address, PTriggerProfileReportRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java index 38bc2e7012..3e680093f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java @@ -24,6 +24,11 @@ import org.apache.doris.proto.PFetchDataResult; import org.apache.doris.proto.PProxyRequest; import org.apache.doris.proto.PProxyResult; import org.apache.doris.proto.PTriggerProfileReportResult; +import org.apache.doris.proto.PUpdateCacheRequest; +import org.apache.doris.proto.PClearCacheRequest; +import org.apache.doris.proto.PCacheResponse; +import org.apache.doris.proto.PFetchCacheRequest; +import org.apache.doris.proto.PFetchCacheResult; import com.baidu.jprotobuf.pbrpc.ProtobufRPC; @@ -43,6 +48,15 @@ public interface PBackendService { attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 86400000) Future fetchDataAsync(PFetchDataRequest request); + @ProtobufRPC(serviceName = "PBackendService", methodName = "update_cache", onceTalkTimeout = 10000) + Future updateCache(PUpdateCacheRequest request); + + @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_cache", onceTalkTimeout = 10000) + Future fetchCache(PFetchCacheRequest request); + + @ProtobufRPC(serviceName = "PBackendService", methodName = "clear_cache", onceTalkTimeout = 10000) + Future clearCache(PClearCacheRequest request); + @ProtobufRPC(serviceName = "PBackendService", methodName = "trigger_profile_report", attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000) Future triggerProfileReport(PTriggerProfileReportRequest request); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 80f3be9d32..cc0d08bcae 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -147,6 +147,65 @@ message PFetchDataResult { optional PQueryStatistics query_statistics = 4; }; +//Add message definition to fetch and update cache +enum PCacheStatus { + DEFAULT = 0; + CACHE_OK = 1; + PARAM_ERROR = 2; + SIZE_OVER_LIMIT = 3; + NO_SQL_KEY = 4; + NO_PARTITION_KEY = 5; + INVALID_KEY_RANGE = 6; + DATA_OVERDUE = 7; + EMPTY_DATA = 8; +}; + +message PCacheParam { + required int64 partition_key = 1; + optional int64 last_version = 2; + optional int64 last_version_time = 3; +}; + +message PCacheValue { + required PCacheParam param = 1; + required int32 data_size = 2; + repeated bytes row = 3; +}; + +//for update&clear return +message PCacheResponse { + required PCacheStatus status = 1; +}; + +message PUpdateCacheRequest{ + required PUniqueId sql_key = 1; + repeated PCacheValue value = 2; +}; + +message PFetchCacheRequest { + required PUniqueId sql_key = 1; + repeated PCacheParam param = 2; +}; + +message PFetchCacheResult { + required PCacheStatus status = 1; + repeated PCacheValue value = 2; +}; + +enum PClearType { + CLEAR_ALL = 0; + PRUNE_CACHE = 1; + CLEAR_BEFORE_TIME = 2; + CLEAR_SQL_KEY = 3; +}; + +message PClearCacheRequest { + required PClearType clear_type = 1; + optional int64 before_time = 2; + optional PUniqueId sql_key = 3; +}; +//End cache proto definition + message PTriggerProfileReportRequest { repeated PUniqueId instance_ids = 1; }; @@ -195,5 +254,8 @@ service PBackendService { rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult); rpc trigger_profile_report(PTriggerProfileReportRequest) returns (PTriggerProfileReportResult); rpc get_info(PProxyRequest) returns (PProxyResult); + rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse); + rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult); + rpc clear_cache(PClearCacheRequest) returns (PCacheResponse); }; diff --git a/gensrc/proto/palo_internal_service.proto b/gensrc/proto/palo_internal_service.proto index 3adc1ec34d..31adb767d2 100644 --- a/gensrc/proto/palo_internal_service.proto +++ b/gensrc/proto/palo_internal_service.proto @@ -36,4 +36,7 @@ service PInternalService { rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns (doris.PTabletWriterCancelResult); rpc trigger_profile_report(doris.PTriggerProfileReportRequest) returns (doris.PTriggerProfileReportResult); rpc get_info(doris.PProxyRequest) returns (doris.PProxyResult); + rpc update_cache(doris.PUpdateCacheRequest) returns (doris.PCacheResponse); + rpc fetch_cache(doris.PFetchCacheRequest) returns (doris.PFetchCacheResult); + rpc clear_cache(doris.PClearCacheRequest) returns (doris.PCacheResponse); };