[remote-udaf](sample) add some java demo (#11752)

This commit is contained in:
chenlinzhong
2022-08-16 14:37:34 +08:00
committed by GitHub
parent cf45151a66
commit 3e10be4ba7
6 changed files with 716 additions and 0 deletions

View File

@ -0,0 +1,166 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Remote udaf Function Service In Java Demo
## Compile
`mvn package`
# Run
`java -jar target/remote-udaf-java-demo-jar-with-dependencies.jar 9000`
`9000` is the port that the server will listen on
# Demo
```
//create one table such as table2
CREATE TABLE `table2` (
`event_day` date NULL,
`siteid` int(11) NULL DEFAULT "10",
`citycode` smallint(6) NULL,
`visitinfo` varchar(1024) NULL DEFAULT "",
`pv` varchar(1024) REPLACE NULL DEFAULT "0"
) ENGINE=OLAP
AGGREGATE KEY(`event_day`, `siteid`, `citycode`, `visitinfo`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`siteid`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
//import some data
MySQL [test_db]> select * from table2;
+------------+--------+----------+------------------------------------+------+
| event_day | siteid | citycode | visitinfo | pv |
+------------+--------+----------+------------------------------------+------+
| 2017-07-03 | 8 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 |
| 2017-07-03 | 37 | 12 | {"ip":"192.168.0.3","source":"pc"} | 81 |
| 2017-07-03 | 67 | 16 | {"ip":"192.168.0.2","source":"pc"} | 79 |
| 2017-07-03 | 101 | 11 | {"ip":"192.168.0.5","source":"pc"} | 65 |
| 2017-07-03 | 32 | 15 | {"ip":"192.168.0.1","source":"pc"} | 188 |
| 2017-07-03 | 103 | 12 | {"ip":"192.168.0.5","source":"pc"} | 123 |
| 2017-07-03 | 104 | 16 | {"ip":"192.168.0.5","source":"pc"} | 79 |
| 2017-07-03 | 3 | 12 | {"ip":"192.168.0.3","source":"pc"} | 123 |
| 2017-07-03 | 3 | 15 | {"ip":"192.168.0.2","source":"pc"} | 188 |
| 2017-07-03 | 13 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 |
| 2017-07-03 | 53 | 12 | {"ip":"192.168.0.2","source":"pc"} | 123 |
| 2017-07-03 | 1 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 |
| 2017-07-03 | 7 | 16 | {"ip":"192.168.0.4","source":"pc"} | 79 |
| 2017-07-03 | 102 | 15 | {"ip":"192.168.0.5","source":"pc"} | 188 |
| 2017-07-03 | 105 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 |
+------------+--------+----------+------------------------------------+------+
```
### 1. find most visit top 3 ip
```
MySQL [test_db]> CREATE AGGREGATE FUNCTION rpc_count_visit_info(varchar(1024)) RETURNS varchar(1024) PROPERTIES (
"TYPE"="RPC",
"OBJECT_FILE"="127.0.0.1:9000",
"update_fn"="rpc_count_visit_info_update",
"merge_fn"="rpc_count_visit_info_merge",
"finalize_fn"="rpc_count_visit_info_finalize"
);
MySQL [test_db]> select rpc_count_visit_info(visitinfo) from table2;
+--------------------------------------------+
| rpc_count_visit_info(`visitinfo`) |
+--------------------------------------------+
| 192.168.0.5:6 192.168.0.2:3 192.168.0.1:3 |
+--------------------------------------------+
1 row in set (0.036 sec)
MySQL [test_db]> select citycode, rpc_count_visit_info(visitinfo) from table2 group by citycode;
+----------+--------------------------------------------+
| citycode | rpc_count_visit_info(`visitinfo`) |
+----------+--------------------------------------------+
| 15 | 192.168.0.2:1 192.168.0.1:1 192.168.0.5:1 |
| 11 | 192.168.0.1:2 192.168.0.5:1 |
| 12 | 192.168.0.5:3 192.168.0.3:2 192.168.0.2:1 |
| 16 | 192.168.0.2:1 192.168.0.4:1 192.168.0.5:1 |
+----------+--------------------------------------------+
4 rows in set (0.050 sec)
```
### 2. sum pv
```
CREATE AGGREGATE FUNCTION rpc_sum(bigint) RETURNS bigint PROPERTIES (
"TYPE"="RPC",
"OBJECT_FILE"="127.0.0.1:9000",
"update_fn"="rpc_sum_update",
"merge_fn"="rpc_sum_merge",
"finalize_fn"="rpc_sum_finalize"
);
MySQL [test_db]> select citycode, rpc_sum(pv) from table2 group by citycode;
+----------+---------------+
| citycode | rpc_sum(`pv`) |
+----------+---------------+
| 15 | 564 |
| 11 | 195 |
| 12 | 612 |
| 16 | 237 |
+----------+---------------+
4 rows in set (0.067 sec)
MySQL [test_db]> select rpc_sum(pv) from table2;
+---------------+
| rpc_sum(`pv`) |
+---------------+
| 1608 |
+---------------+
1 row in set (0.030 sec)
```
### 3. avg pv
```
CREATE AGGREGATE FUNCTION rpc_avg(int) RETURNS double PROPERTIES (
"TYPE"="RPC",
"OBJECT_FILE"="127.0.0.1:9000",
"update_fn"="rpc_avg_update",
"merge_fn"="rpc_avg_merge",
"finalize_fn"="rpc_avg_finalize"
);
MySQL [test_db]> select citycode, rpc_avg(pv) from table2 group by citycode;
+----------+---------------+
| citycode | rpc_avg(`pv`) |
+----------+---------------+
| 15 | 188 |
| 11 | 65 |
| 12 | 102 |
| 16 | 79 |
+----------+---------------+
4 rows in set (0.039 sec)
MySQL [test_db]> select rpc_avg(pv) from table2;
+---------------+
| rpc_avg(`pv`) |
+---------------+
| 107.2 |
+---------------+
1 row in set (0.028 sec)
```

View File

@ -0,0 +1,172 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.doris</groupId>
<artifactId>remote-udaf-java-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Remote udaf Demo in Java</name>
<url>https://doris.apache.org/</url>
<properties>
<protobuf.version>3.14.0</protobuf.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<grpc.version>1.30.0</grpc.version>
<java.version>1.8</java.version>
<doris.thirdparty>${basedir}/../../../thirdparty</doris.thirdparty>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.grpc/grpc-protobuf -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependencies>
<build>
<finalName>remote-udaf-java-demo</finalName>
<plugins>
<!-- protobuf -->
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocCommand>${doris.thirdparty}/installed/bin/protoc</protocCommand>
<!-->You can use following protocArtifact instead of protocCommand, so that you don't need to install protobuf tools<-->
<!--protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact-->
<protocVersion>${protobuf.version}</protocVersion>
<inputDirectories>
<include>../remote-udaf-java-demo/src/main/proto</include>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
</outputTarget>
<outputTarget>
<type>grpc-java</type>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}</pluginArtifact>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>
<!-- add gensrc java build src dir -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<!-- add arbitrary num of src dirs here -->
<source>${basedir}/target/generated-sources/build/</source>
<source>${basedir}/target/generated-sources/</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.doris.udaf.FunctionServiceDemo</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.apache.doris.udaf.FunctionServiceDemo</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<!-- for custom internal repository -->
<profile>
<id>custom-env</id>
<activation>
<property>
<name>env.CUSTOM_MAVEN_REPO</name>
</property>
</activation>
<repositories>
<repository>
<id>custom-nexus</id>
<url>${env.CUSTOM_MAVEN_REPO}</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>custom-nexus</id>
<url>${env.CUSTOM_MAVEN_REPO}</url>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.udaf;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
public class FunctionServiceDemo {
private static final Logger logger = Logger.getLogger(FunctionServiceDemo.class.getName());
private Server server;
private void start(int port) throws IOException {
server = ServerBuilder.forPort(port)
.addService(new FunctionServiceImpl())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
FunctionServiceDemo.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
int port = 9000;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
System.err.println("port " + args[0] + " must be an integer.");
System.exit(1);
}
}
if (port <= 0) {
System.err.println("port " + args[0] + " must be positive.");
System.exit(1);
}
final FunctionServiceDemo server = new FunctionServiceDemo();
server.start(port);
server.blockUntilShutdown();
}
}

View File

@ -0,0 +1,287 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.udaf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.doris.proto.FunctionService;
import org.apache.doris.proto.PFunctionServiceGrpc;
import org.apache.doris.proto.Types;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.Iterator;
import java.util.stream.Collectors;
import java.util.List;
import java.util.*;
public class FunctionServiceImpl extends PFunctionServiceGrpc.PFunctionServiceImplBase {
private static final Logger logger = Logger.getLogger(FunctionServiceImpl.class.getName());
public static <T> void ok(StreamObserver<T> observer, T data) {
observer.onNext(data);
observer.onCompleted();
}
@Override
public void fnCall(FunctionService.PFunctionCallRequest request,
StreamObserver<FunctionService.PFunctionCallResponse> responseObserver) {
// symbol is functionName
String functionName = request.getFunctionName();
logger.info("fnCall request=" + request);
FunctionService.PFunctionCallResponse.Builder res = FunctionService.PFunctionCallResponse.newBuilder();
if ("rpc_sum_update".equals(functionName)) {
Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0);
Types.PValues.Builder pValues = Types.PValues.newBuilder();
Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.INT64);
long total = 0;
int size = request.getArgs(0).getInt64ValueCount();
for(int i=0;i<size;i++){
total += request.getArgs(0).getInt64Value(i);
}
if(request.getContext().getFunctionContext().getArgsDataCount()>0){
total += request.getContext().getFunctionContext().getArgsData(0).getInt64Value(0);
}
pValues.setHasNull(false);
pValues.addInt64Value(total);
pValues.setType(returnType.build());
res.setStatus(statusCode.build());
res.addResult(pValues.build());
}
if ("rpc_sum_merge".equals(functionName)) {
Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0);
Types.PValues.Builder pValues = Types.PValues.newBuilder();
Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.INT64);
int size = request.getArgsCount();
long total = 0;
for(int i=0;i<size;i++){
total += request.getArgs(i).getInt64Value(0);
}
pValues.setHasNull(false);
pValues.addInt64Value(total);
pValues.setType(returnType.build());
res.setStatus(statusCode.build());
res.addResult(pValues.build());
}
if ("rpc_sum_finalize".equals(functionName)) {
Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0);
Types.PValues.Builder pValues = Types.PValues.newBuilder();
Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.INT64);
int size = request.getArgsCount();
long total = request.getContext().getFunctionContext().getArgsData(0).getInt64Value(0);
pValues.setHasNull(false);
pValues.addInt64Value(total);
pValues.setType(returnType.build());
res.setStatus(statusCode.build());
res.addResult(pValues.build());
}
if ("rpc_avg_update".equals(functionName)) {
Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0);
Types.PValues.Builder pValues = Types.PValues.newBuilder();
Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.DOUBLE);
int size = request.getArgs(0).getInt32ValueCount();
double total = 0;
for(int i=0;i<size;i++){
total += request.getArgs(0).getInt32Value(i);
}
if(request.getContext().getFunctionContext().getArgsDataCount()>0){
total += request.getContext().getFunctionContext().getArgsData(0).getDoubleValue(0);
size += request.getContext().getFunctionContext().getArgsData(0).getInt32Value(0);
}
pValues.setHasNull(false);
pValues.addDoubleValue(total);
pValues.addInt32Value(size);
pValues.setType(returnType.build());
res.setStatus(statusCode.build());
res.addResult(pValues.build());
}
if ("rpc_avg_merge".equals(functionName)) {
Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0);
Types.PValues.Builder pValues = Types.PValues.newBuilder();
Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.DOUBLE);
int size = 0;
double total = 0;
int args_len = request.getArgsCount();
for(int i=0;i<args_len;i++){
total += request.getArgs(i).getDoubleValue(0);
size += request.getArgs(i).getInt32Value(0);
}
pValues.setHasNull(false);
pValues.addDoubleValue(total);
pValues.addInt32Value(size);
pValues.setType(returnType.build());
res.setStatus(statusCode.build());
res.addResult(pValues.build());
}
if ("rpc_avg_finalize".equals(functionName)) {
Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0);
Types.PValues.Builder pValues = Types.PValues.newBuilder();
Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.DOUBLE);
double total = request.getContext().getFunctionContext().getArgsData(0).getDoubleValue(0);
int size = request.getContext().getFunctionContext().getArgsData(0).getInt32Value(0);
pValues.setHasNull(false);
pValues.addDoubleValue(total/size);
pValues.setType(returnType.build());
res.setStatus(statusCode.build());
res.addResult(pValues.build());
}
if ("rpc_count_visit_info_update".equals(functionName)) {
Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0);
Types.PValues.Builder pValues = Types.PValues.newBuilder();
Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.STRING);
long total = 0;
int size = request.getArgs(0).getStringValueCount();
JSONObject GlobalIpMap = new JSONObject();
if(request.getContext().getFunctionContext().getArgsDataCount()>0
&& request.getContext().getFunctionContext().getArgsData(0).getStringValueCount()>0 ){
String s= request.getContext().getFunctionContext().getArgsData(0).getStringValue(0);
GlobalIpMap = JSONObject.parseObject(s);
}
for(int i=0;i<size;i++){
String str = request.getArgs(0).getStringValue(i);
JSONObject object = JSONObject.parseObject(str);
String ip = object.getString("ip");
if(GlobalIpMap.containsKey(ip)){
int num = GlobalIpMap.getIntValue(ip);
num ++;
GlobalIpMap.put(ip,num);
}else{
GlobalIpMap.put(ip,1);
}
}
String objStr = JSON.toJSONString(GlobalIpMap);
pValues.setHasNull(false);
pValues.addStringValue(objStr);
pValues.setType(returnType.build());
res.setStatus(statusCode.build());
res.addResult(pValues.build());
}
if ("rpc_count_visit_info_merge".equals(functionName)) {
Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0);
Types.PValues.Builder pValues = Types.PValues.newBuilder();
Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.STRING);
String s1= request.getArgs(0).getStringValue(0);
JSONObject GlobalIpMap1 = JSONObject.parseObject(s1);
String s2= request.getArgs(1).getStringValue(0);
JSONObject GlobalIpMap2 = JSONObject.parseObject(s2);
Iterator iter = GlobalIpMap2.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
String ip = (entry.getKey().toString());
int num2 = Integer.parseInt(entry.getValue().toString());
if(GlobalIpMap1.containsKey(ip)){
int num1 = GlobalIpMap1.getIntValue(ip);
int total = num1+num2;
GlobalIpMap1.put(ip,total);
}else{
GlobalIpMap1.put(ip,num2);
}
}
String objStr = JSON.toJSONString(GlobalIpMap1);
pValues.setHasNull(false);
pValues.addStringValue(objStr);
pValues.setType(returnType.build());
res.setStatus(statusCode.build());
res.addResult(pValues.build());
}
if ("rpc_count_visit_info_finalize".equals(functionName)) {
Types.PStatus.Builder statusCode = Types.PStatus.newBuilder().setStatusCode(0);
Types.PValues.Builder pValues = Types.PValues.newBuilder();
Types.PGenericType.Builder returnType = Types.PGenericType.newBuilder().setId(Types.PGenericType.TypeId.STRING);
String s= request.getContext().getFunctionContext().getArgsData(0).getStringValue(0);
JSONObject finalMap = JSONObject.parseObject(s);
Map<String,Integer> mapRepeat = new HashMap<String,Integer>();
Iterator iter = finalMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
String key = entry.getKey().toString();
int value = Integer.parseInt(entry.getValue().toString());
mapRepeat.put(key, value);
}
List<Map.Entry<String, Integer>> list1 = new ArrayList<Map.Entry<String, Integer>>();
list1.addAll(mapRepeat.entrySet());
Collections.sort(list1, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o2.getValue() - o1.getValue();
}
});
int topN=3;
String sortedOutputStr="";
for (Map.Entry<String, Integer> entry : list1) {
topN --;
if(topN<0){
break;
}
sortedOutputStr += entry.getKey()+":"+entry.getValue()+" ";
}
pValues.setHasNull(false);
pValues.addStringValue(sortedOutputStr);
pValues.setType(returnType.build());
res.setStatus(statusCode.build());
res.addResult(pValues.build());
}
logger.info("fnCall res=" + res);
ok(responseObserver, res.build());
}
@Override
public void checkFn(FunctionService.PCheckFunctionRequest request,
StreamObserver<FunctionService.PCheckFunctionResponse> responseObserver) {
// symbol is functionName
logger.info("checkFn request=" + request);
int status = 0;
if ("add_int".equals(request.getFunction().getFunctionName())) {
// check inputs count
if (request.getFunction().getInputsCount() != 2) {
status = -1;
}
}
FunctionService.PCheckFunctionResponse res =
FunctionService.PCheckFunctionResponse.newBuilder()
.setStatus(Types.PStatus.newBuilder().setStatusCode(status).build()).build();
logger.info("checkFn res=" + res);
ok(responseObserver, res);
}
@Override
public void handShake(Types.PHandShakeRequest request, StreamObserver<Types.PHandShakeResponse> responseObserver) {
logger.info("handShake request=" + request);
ok(responseObserver,
Types.PHandShakeResponse.newBuilder().setStatus(Types.PStatus.newBuilder().setStatusCode(0).build())
.setHello(request.getHello()).build());
}
}

View File

@ -0,0 +1 @@
../../../../../../gensrc/proto/function_service.proto

View File

@ -0,0 +1 @@
../../../../../../gensrc/proto/types.proto