From 3e10be4ba77e157dca4e213709db7f422228ed31 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Tue, 16 Aug 2022 14:37:34 +0800 Subject: [PATCH] [remote-udaf](sample) add some java demo (#11752) --- .../remote-udaf-java-demo/README.md | 166 ++++++++++ .../doris-demo/remote-udaf-java-demo/pom.xml | 172 +++++++++++ .../doris/udaf/FunctionServiceDemo.java | 89 ++++++ .../doris/udaf/FunctionServiceImpl.java | 287 ++++++++++++++++++ .../src/main/proto/function_service.proto | 1 + .../src/main/proto/types.proto | 1 + 6 files changed, 716 insertions(+) create mode 100644 samples/doris-demo/remote-udaf-java-demo/README.md create mode 100644 samples/doris-demo/remote-udaf-java-demo/pom.xml create mode 100644 samples/doris-demo/remote-udaf-java-demo/src/main/java/org/apache/doris/udaf/FunctionServiceDemo.java create mode 100644 samples/doris-demo/remote-udaf-java-demo/src/main/java/org/apache/doris/udaf/FunctionServiceImpl.java create mode 120000 samples/doris-demo/remote-udaf-java-demo/src/main/proto/function_service.proto create mode 120000 samples/doris-demo/remote-udaf-java-demo/src/main/proto/types.proto diff --git a/samples/doris-demo/remote-udaf-java-demo/README.md b/samples/doris-demo/remote-udaf-java-demo/README.md new file mode 100644 index 0000000000..7974ea0433 --- /dev/null +++ b/samples/doris-demo/remote-udaf-java-demo/README.md @@ -0,0 +1,166 @@ + + +# Remote udaf Function Service In Java Demo + +## Compile +`mvn package` + +# Run + +`java -jar target/remote-udaf-java-demo-jar-with-dependencies.jar 9000` +`9000` is the port that the server will listen on + +# Demo + + +``` +//create one table such as table2 +CREATE TABLE `table2` ( + `event_day` date NULL, + `siteid` int(11) NULL DEFAULT "10", + `citycode` smallint(6) NULL, + `visitinfo` varchar(1024) NULL DEFAULT "", + `pv` varchar(1024) REPLACE NULL DEFAULT "0" +) ENGINE=OLAP +AGGREGATE KEY(`event_day`, `siteid`, `citycode`, `visitinfo`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`siteid`) BUCKETS 10 +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1", +"in_memory" = "false", +"storage_format" = "V2" +) + +//import some data +MySQL [test_db]> select * from table2; ++------------+--------+----------+------------------------------------+------+ +| event_day | siteid | citycode | visitinfo | pv | ++------------+--------+----------+------------------------------------+------+ +| 2017-07-03 | 8 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 | +| 2017-07-03 | 37 | 12 | {"ip":"192.168.0.3","source":"pc"} | 81 | +| 2017-07-03 | 67 | 16 | {"ip":"192.168.0.2","source":"pc"} | 79 | +| 2017-07-03 | 101 | 11 | {"ip":"192.168.0.5","source":"pc"} | 65 | +| 2017-07-03 | 32 | 15 | {"ip":"192.168.0.1","source":"pc"} | 188 | +| 2017-07-03 | 103 | 12 | {"ip":"192.168.0.5","source":"pc"} | 123 | +| 2017-07-03 | 104 | 16 | {"ip":"192.168.0.5","source":"pc"} | 79 | +| 2017-07-03 | 3 | 12 | {"ip":"192.168.0.3","source":"pc"} | 123 | +| 2017-07-03 | 3 | 15 | {"ip":"192.168.0.2","source":"pc"} | 188 | +| 2017-07-03 | 13 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 | +| 2017-07-03 | 53 | 12 | {"ip":"192.168.0.2","source":"pc"} | 123 | +| 2017-07-03 | 1 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 | +| 2017-07-03 | 7 | 16 | {"ip":"192.168.0.4","source":"pc"} | 79 | +| 2017-07-03 | 102 | 15 | {"ip":"192.168.0.5","source":"pc"} | 188 | +| 2017-07-03 | 105 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 | ++------------+--------+----------+------------------------------------+------+ + +``` + +### 1. find most visit top 3 ip +``` + +MySQL [test_db]> CREATE AGGREGATE FUNCTION rpc_count_visit_info(varchar(1024)) RETURNS varchar(1024) PROPERTIES ( + "TYPE"="RPC", + "OBJECT_FILE"="127.0.0.1:9000", + "update_fn"="rpc_count_visit_info_update", + "merge_fn"="rpc_count_visit_info_merge", + "finalize_fn"="rpc_count_visit_info_finalize" +); + +MySQL [test_db]> select rpc_count_visit_info(visitinfo) from table2; ++--------------------------------------------+ +| rpc_count_visit_info(`visitinfo`) | ++--------------------------------------------+ +| 192.168.0.5:6 192.168.0.2:3 192.168.0.1:3 | ++--------------------------------------------+ +1 row in set (0.036 sec) + +MySQL [test_db]> select citycode, rpc_count_visit_info(visitinfo) from table2 group by citycode; ++----------+--------------------------------------------+ +| citycode | rpc_count_visit_info(`visitinfo`) | ++----------+--------------------------------------------+ +| 15 | 192.168.0.2:1 192.168.0.1:1 192.168.0.5:1 | +| 11 | 192.168.0.1:2 192.168.0.5:1 | +| 12 | 192.168.0.5:3 192.168.0.3:2 192.168.0.2:1 | +| 16 | 192.168.0.2:1 192.168.0.4:1 192.168.0.5:1 | ++----------+--------------------------------------------+ +4 rows in set (0.050 sec) + +``` +### 2. sum pv +``` +CREATE AGGREGATE FUNCTION rpc_sum(bigint) RETURNS bigint PROPERTIES ( + "TYPE"="RPC", + "OBJECT_FILE"="127.0.0.1:9000", + "update_fn"="rpc_sum_update", + "merge_fn"="rpc_sum_merge", + "finalize_fn"="rpc_sum_finalize" +); + +MySQL [test_db]> select citycode, rpc_sum(pv) from table2 group by citycode; ++----------+---------------+ +| citycode | rpc_sum(`pv`) | ++----------+---------------+ +| 15 | 564 | +| 11 | 195 | +| 12 | 612 | +| 16 | 237 | ++----------+---------------+ +4 rows in set (0.067 sec) + +MySQL [test_db]> select rpc_sum(pv) from table2; ++---------------+ +| rpc_sum(`pv`) | ++---------------+ +| 1608 | ++---------------+ +1 row in set (0.030 sec) +``` + +### 3. avg pv + +``` +CREATE AGGREGATE FUNCTION rpc_avg(int) RETURNS double PROPERTIES ( + "TYPE"="RPC", + "OBJECT_FILE"="127.0.0.1:9000", + "update_fn"="rpc_avg_update", + "merge_fn"="rpc_avg_merge", + "finalize_fn"="rpc_avg_finalize" +); + +MySQL [test_db]> select citycode, rpc_avg(pv) from table2 group by citycode; ++----------+---------------+ +| citycode | rpc_avg(`pv`) | ++----------+---------------+ +| 15 | 188 | +| 11 | 65 | +| 12 | 102 | +| 16 | 79 | ++----------+---------------+ +4 rows in set (0.039 sec) + +MySQL [test_db]> select rpc_avg(pv) from table2; ++---------------+ +| rpc_avg(`pv`) | ++---------------+ +| 107.2 | ++---------------+ +1 row in set (0.028 sec) + +``` diff --git a/samples/doris-demo/remote-udaf-java-demo/pom.xml b/samples/doris-demo/remote-udaf-java-demo/pom.xml new file mode 100644 index 0000000000..c07b09099e --- /dev/null +++ b/samples/doris-demo/remote-udaf-java-demo/pom.xml @@ -0,0 +1,172 @@ + + + + 4.0.0 + org.apache.doris + remote-udaf-java-demo + 1.0-SNAPSHOT + Remote udaf Demo in Java + https://doris.apache.org/ + + 3.14.0 + UTF-8 + UTF-8 + 1.30.0 + 1.8 + ${basedir}/../../../thirdparty + + + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + + + remote-udaf-java-demo + + + + com.github.os72 + protoc-jar-maven-plugin + 3.11.1 + + + generate-sources + + run + + + ${doris.thirdparty}/installed/bin/protoc + You can use following protocArtifact instead of protocCommand, so that you don't need to install protobuf tools<--> + + ${protobuf.version} + + ../remote-udaf-java-demo/src/main/proto + + + + java + + + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version} + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.2.0 + + + add-source + generate-sources + + add-source + + + + + ${basedir}/target/generated-sources/build/ + ${basedir}/target/generated-sources/ + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.2.2 + + + + org.apache.doris.udaf.FunctionServiceDemo + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.3.0 + + + jar-with-dependencies + + + + org.apache.doris.udaf.FunctionServiceDemo + + + + + + package + + single + + + + + + + + + + custom-env + + + env.CUSTOM_MAVEN_REPO + + + + + custom-nexus + ${env.CUSTOM_MAVEN_REPO} + + + + + custom-nexus + ${env.CUSTOM_MAVEN_REPO} + + + + + diff --git a/samples/doris-demo/remote-udaf-java-demo/src/main/java/org/apache/doris/udaf/FunctionServiceDemo.java b/samples/doris-demo/remote-udaf-java-demo/src/main/java/org/apache/doris/udaf/FunctionServiceDemo.java new file mode 100644 index 0000000000..9236a7ab49 --- /dev/null +++ b/samples/doris-demo/remote-udaf-java-demo/src/main/java/org/apache/doris/udaf/FunctionServiceDemo.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udaf; + +import io.grpc.Server; +import io.grpc.ServerBuilder; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class FunctionServiceDemo { + private static final Logger logger = Logger.getLogger(FunctionServiceDemo.class.getName()); + + private Server server; + + private void start(int port) throws IOException { + server = ServerBuilder.forPort(port) + .addService(new FunctionServiceImpl()) + .build() + .start(); + logger.info("Server started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + FunctionServiceDemo.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + } + }); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + int port = 9000; + if (args.length > 0) { + try { + port = Integer.parseInt(args[0]); + } catch (NumberFormatException e) { + System.err.println("port " + args[0] + " must be an integer."); + System.exit(1); + } + } + if (port <= 0) { + System.err.println("port " + args[0] + " must be positive."); + System.exit(1); + } + final FunctionServiceDemo server = new FunctionServiceDemo(); + server.start(port); + server.blockUntilShutdown(); + } +} diff --git a/samples/doris-demo/remote-udaf-java-demo/src/main/java/org/apache/doris/udaf/FunctionServiceImpl.java b/samples/doris-demo/remote-udaf-java-demo/src/main/java/org/apache/doris/udaf/FunctionServiceImpl.java new file mode 100644 index 0000000000..7ff4fe9b47 --- /dev/null +++ b/samples/doris-demo/remote-udaf-java-demo/src/main/java/org/apache/doris/udaf/FunctionServiceImpl.java @@ -0,0 +1,287 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udaf; + + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; + +import org.apache.doris.proto.FunctionService; +import org.apache.doris.proto.PFunctionServiceGrpc; +import org.apache.doris.proto.Types; + +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import io.grpc.stub.StreamObserver; + +import java.util.Map; +import java.util.Iterator; +import java.util.stream.Collectors; +import java.util.List; +import java.util.*; + + + + +public class FunctionServiceImpl extends PFunctionServiceGrpc.PFunctionServiceImplBase { + private static final Logger logger = Logger.getLogger(FunctionServiceImpl.class.getName()); + + public static void ok(StreamObserver observer, T data) { + observer.onNext(data); + observer.onCompleted(); + } + + @Override + public void fnCall(FunctionService.PFunctionCallRequest request, + StreamObserver responseObserver) { + // symbol is functionName + String functionName = request.getFunctionName(); + logger.info("fnCall request=" + request); + FunctionService.PFunctionCallResponse.Builder res = FunctionService.PFunctionCallResponse.newBuilder(); + if ("rpc_sum_update".equals(functionName)) { + Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0); + Types.PValues.Builder pValues = Types.PValues.newBuilder(); + Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.INT64); + + long total = 0; + int size = request.getArgs(0).getInt64ValueCount(); + for(int i=0;i0){ + total += request.getContext().getFunctionContext().getArgsData(0).getInt64Value(0); + } + pValues.setHasNull(false); + pValues.addInt64Value(total); + pValues.setType(returnType.build()); + res.setStatus(statusCode.build()); + res.addResult(pValues.build()); + } + if ("rpc_sum_merge".equals(functionName)) { + Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0); + Types.PValues.Builder pValues = Types.PValues.newBuilder(); + Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.INT64); + int size = request.getArgsCount(); + long total = 0; + for(int i=0;i0){ + total += request.getContext().getFunctionContext().getArgsData(0).getDoubleValue(0); + size += request.getContext().getFunctionContext().getArgsData(0).getInt32Value(0); + } + pValues.setHasNull(false); + pValues.addDoubleValue(total); + pValues.addInt32Value(size); + pValues.setType(returnType.build()); + res.setStatus(statusCode.build()); + res.addResult(pValues.build()); + } + + if ("rpc_avg_merge".equals(functionName)) { + Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0); + Types.PValues.Builder pValues = Types.PValues.newBuilder(); + Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.DOUBLE); + int size = 0; + double total = 0; + int args_len = request.getArgsCount(); + for(int i=0;i0 + && request.getContext().getFunctionContext().getArgsData(0).getStringValueCount()>0 ){ + String s= request.getContext().getFunctionContext().getArgsData(0).getStringValue(0); + GlobalIpMap = JSONObject.parseObject(s); + } + for(int i=0;i mapRepeat = new HashMap(); + Iterator iter = finalMap.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = (Map.Entry) iter.next(); + String key = entry.getKey().toString(); + int value = Integer.parseInt(entry.getValue().toString()); + mapRepeat.put(key, value); + } + List> list1 = new ArrayList>(); + list1.addAll(mapRepeat.entrySet()); + Collections.sort(list1, new Comparator>() { + @Override + public int compare(Map.Entry o1, Map.Entry o2) { + return o2.getValue() - o1.getValue(); + } + }); + int topN=3; + String sortedOutputStr=""; + for (Map.Entry entry : list1) { + topN --; + if(topN<0){ + break; + } + sortedOutputStr += entry.getKey()+":"+entry.getValue()+" "; + } + pValues.setHasNull(false); + pValues.addStringValue(sortedOutputStr); + pValues.setType(returnType.build()); + res.setStatus(statusCode.build()); + res.addResult(pValues.build()); + } + logger.info("fnCall res=" + res); + ok(responseObserver, res.build()); + } + + @Override + public void checkFn(FunctionService.PCheckFunctionRequest request, + StreamObserver responseObserver) { + // symbol is functionName + logger.info("checkFn request=" + request); + int status = 0; + if ("add_int".equals(request.getFunction().getFunctionName())) { + // check inputs count + if (request.getFunction().getInputsCount() != 2) { + status = -1; + } + } + FunctionService.PCheckFunctionResponse res = + FunctionService.PCheckFunctionResponse.newBuilder() + .setStatus(Types.PStatus.newBuilder().setStatusCode(status).build()).build(); + logger.info("checkFn res=" + res); + ok(responseObserver, res); + } + + @Override + public void handShake(Types.PHandShakeRequest request, StreamObserver responseObserver) { + logger.info("handShake request=" + request); + ok(responseObserver, + Types.PHandShakeResponse.newBuilder().setStatus(Types.PStatus.newBuilder().setStatusCode(0).build()) + .setHello(request.getHello()).build()); + } + +} diff --git a/samples/doris-demo/remote-udaf-java-demo/src/main/proto/function_service.proto b/samples/doris-demo/remote-udaf-java-demo/src/main/proto/function_service.proto new file mode 120000 index 0000000000..0bc5702ba6 --- /dev/null +++ b/samples/doris-demo/remote-udaf-java-demo/src/main/proto/function_service.proto @@ -0,0 +1 @@ +../../../../../../gensrc/proto/function_service.proto \ No newline at end of file diff --git a/samples/doris-demo/remote-udaf-java-demo/src/main/proto/types.proto b/samples/doris-demo/remote-udaf-java-demo/src/main/proto/types.proto new file mode 120000 index 0000000000..85da993bd4 --- /dev/null +++ b/samples/doris-demo/remote-udaf-java-demo/src/main/proto/types.proto @@ -0,0 +1 @@ +../../../../../../gensrc/proto/types.proto \ No newline at end of file