From 4ff6eb55d04b5923f04e5e9f03e1acdd0a561bd1 Mon Sep 17 00:00:00 2001 From: wunan1210 Date: Sun, 22 Aug 2021 22:03:32 +0800 Subject: [PATCH] [FlinkConnector] Make flink datastream source parameterized (#6473) make flink datastream source parameterized as List instead of Object. --- .../flink/datastream/DorisSourceFunction.java | 16 ++++++++-------- .../SimpleListDeserializationSchema.java | 8 +++++--- .../flink/datastream/ScalaValueReader.scala | 2 +- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java index 82ab224cf2..08ec5b038f 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java @@ -36,17 +36,17 @@ import java.util.List; * DorisSource **/ -public class DorisSourceFunction extends RichSourceFunction implements ResultTypeQueryable { +public class DorisSourceFunction extends RichSourceFunction> implements ResultTypeQueryable> { private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class); - private DorisDeserializationSchema deserializer; - private DorisOptions options; - private DorisReadOptions readOptions; + private final DorisDeserializationSchema> deserializer; + private final DorisOptions options; + private final DorisReadOptions readOptions; private List dorisPartitions; private ScalaValueReader scalaValueReader; - public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) { + public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema> deserializer) { this.deserializer = deserializer; this.options = streamOptions.getOptions(); this.readOptions = streamOptions.getReadOptions(); @@ -59,11 +59,11 @@ public class DorisSourceFunction extends RichSourceFunction implements Res } @Override - public void run(SourceContext sourceContext) throws Exception { + public void run(SourceContext> sourceContext) { for (PartitionDefinition partitions : dorisPartitions) { scalaValueReader = new ScalaValueReader(partitions, options, readOptions); while (scalaValueReader.hasNext()) { - Object next = scalaValueReader.next(); + List next = scalaValueReader.next(); sourceContext.collect(next); } } @@ -76,7 +76,7 @@ public class DorisSourceFunction extends RichSourceFunction implements Res @Override - public TypeInformation getProducedType() { + public TypeInformation> getProducedType() { return this.deserializer.getProducedType(); } } diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java index 7fcf2f693f..d9ec6e5eae 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java @@ -17,15 +17,17 @@ package org.apache.doris.flink.deserialization; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.util.List; -public class SimpleListDeserializationSchema implements DorisDeserializationSchema { +public class SimpleListDeserializationSchema implements DorisDeserializationSchema> { @Override - public TypeInformation getProducedType() { - return TypeInformation.of(List.class); + public TypeInformation> getProducedType() { + return TypeInformation.of(new TypeHint>() { + }); } } diff --git a/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala index bdf948744a..e69a86fe63 100644 --- a/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala +++ b/extension/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala @@ -206,7 +206,7 @@ class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, re * get next value. * @return next value */ - def next: AnyRef = { + def next: java.util.List[_] = { if (!hasNext) { logger.error(SHOULD_NOT_HAPPEN_MESSAGE) throw new ShouldNeverHappenException