[Feature]Add fetch/update/clear proto of fe&be for cache (#4190)

This commit is contained in:
HaiBo Li
2020-07-31 13:23:24 +08:00
committed by GitHub
parent e6059341e8
commit 1ebd156b99
4 changed files with 121 additions and 0 deletions

View File

@ -28,6 +28,11 @@ import org.apache.doris.proto.PProxyRequest;
import org.apache.doris.proto.PProxyResult;
import org.apache.doris.proto.PTriggerProfileReportResult;
import org.apache.doris.proto.PUniqueId;
import org.apache.doris.proto.PUpdateCacheRequest;
import org.apache.doris.proto.PCacheResponse;
import org.apache.doris.proto.PFetchCacheRequest;
import org.apache.doris.proto.PFetchCacheResult;
import org.apache.doris.proto.PClearCacheRequest;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
@ -164,6 +169,43 @@ public class BackendServiceProxy {
}
}
public Future<PCacheResponse> updateCache(
TNetworkAddress address, PUpdateCacheRequest request) throws RpcException{
try {
PBackendService service = getProxy(address);
return service.updateCache(request);
} catch (Throwable e) {
LOG.warn("update cache catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}
public Future<PFetchCacheResult> fetchCache(
TNetworkAddress address, PFetchCacheRequest request) throws RpcException {
try {
PBackendService service = getProxy(address);
return service.fetchCache(request);
} catch (Throwable e) {
LOG.warn("fetch cache catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}
public Future<PCacheResponse> clearCache(
TNetworkAddress address, PClearCacheRequest request) throws RpcException {
try {
PBackendService service = getProxy(address);
return service.clearCache(request);
} catch (Throwable e) {
LOG.warn("clear cache catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}
public Future<PTriggerProfileReportResult> triggerProfileReportAsync(
TNetworkAddress address, PTriggerProfileReportRequest request) throws RpcException {
try {

View File

@ -24,6 +24,11 @@ import org.apache.doris.proto.PFetchDataResult;
import org.apache.doris.proto.PProxyRequest;
import org.apache.doris.proto.PProxyResult;
import org.apache.doris.proto.PTriggerProfileReportResult;
import org.apache.doris.proto.PUpdateCacheRequest;
import org.apache.doris.proto.PClearCacheRequest;
import org.apache.doris.proto.PCacheResponse;
import org.apache.doris.proto.PFetchCacheRequest;
import org.apache.doris.proto.PFetchCacheResult;
import com.baidu.jprotobuf.pbrpc.ProtobufRPC;
@ -43,6 +48,15 @@ public interface PBackendService {
attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 86400000)
Future<PFetchDataResult> fetchDataAsync(PFetchDataRequest request);
@ProtobufRPC(serviceName = "PBackendService", methodName = "update_cache", onceTalkTimeout = 10000)
Future<PCacheResponse> updateCache(PUpdateCacheRequest request);
@ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_cache", onceTalkTimeout = 10000)
Future<PFetchCacheResult> fetchCache(PFetchCacheRequest request);
@ProtobufRPC(serviceName = "PBackendService", methodName = "clear_cache", onceTalkTimeout = 10000)
Future<PCacheResponse> clearCache(PClearCacheRequest request);
@ProtobufRPC(serviceName = "PBackendService", methodName = "trigger_profile_report",
attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000)
Future<PTriggerProfileReportResult> triggerProfileReport(PTriggerProfileReportRequest request);

View File

@ -147,6 +147,65 @@ message PFetchDataResult {
optional PQueryStatistics query_statistics = 4;
};
//Add message definition to fetch and update cache
enum PCacheStatus {
DEFAULT = 0;
CACHE_OK = 1;
PARAM_ERROR = 2;
SIZE_OVER_LIMIT = 3;
NO_SQL_KEY = 4;
NO_PARTITION_KEY = 5;
INVALID_KEY_RANGE = 6;
DATA_OVERDUE = 7;
EMPTY_DATA = 8;
};
message PCacheParam {
required int64 partition_key = 1;
optional int64 last_version = 2;
optional int64 last_version_time = 3;
};
message PCacheValue {
required PCacheParam param = 1;
required int32 data_size = 2;
repeated bytes row = 3;
};
//for update&clear return
message PCacheResponse {
required PCacheStatus status = 1;
};
message PUpdateCacheRequest{
required PUniqueId sql_key = 1;
repeated PCacheValue value = 2;
};
message PFetchCacheRequest {
required PUniqueId sql_key = 1;
repeated PCacheParam param = 2;
};
message PFetchCacheResult {
required PCacheStatus status = 1;
repeated PCacheValue value = 2;
};
enum PClearType {
CLEAR_ALL = 0;
PRUNE_CACHE = 1;
CLEAR_BEFORE_TIME = 2;
CLEAR_SQL_KEY = 3;
};
message PClearCacheRequest {
required PClearType clear_type = 1;
optional int64 before_time = 2;
optional PUniqueId sql_key = 3;
};
//End cache proto definition
message PTriggerProfileReportRequest {
repeated PUniqueId instance_ids = 1;
};
@ -195,5 +254,8 @@ service PBackendService {
rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult);
rpc trigger_profile_report(PTriggerProfileReportRequest) returns (PTriggerProfileReportResult);
rpc get_info(PProxyRequest) returns (PProxyResult);
rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult);
rpc clear_cache(PClearCacheRequest) returns (PCacheResponse);
};

View File

@ -36,4 +36,7 @@ service PInternalService {
rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns (doris.PTabletWriterCancelResult);
rpc trigger_profile_report(doris.PTriggerProfileReportRequest) returns (doris.PTriggerProfileReportResult);
rpc get_info(doris.PProxyRequest) returns (doris.PProxyResult);
rpc update_cache(doris.PUpdateCacheRequest) returns (doris.PCacheResponse);
rpc fetch_cache(doris.PFetchCacheRequest) returns (doris.PFetchCacheResult);
rpc clear_cache(doris.PClearCacheRequest) returns (doris.PCacheResponse);
};