[FlinkConnector] Make flink datastream source parameterized (#6473)

make flink datastream source parameterized as List<?> instead of Object.
This commit is contained in:
wunan1210
2021-08-22 22:03:32 +08:00
committed by GitHub
parent c71f58fef9
commit 4ff6eb55d0
3 changed files with 14 additions and 12 deletions

View File

@ -36,17 +36,17 @@ import java.util.List;
* DorisSource
**/
public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
public class DorisSourceFunction extends RichSourceFunction<List<?>> implements ResultTypeQueryable<List<?>> {
private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
private DorisDeserializationSchema deserializer;
private DorisOptions options;
private DorisReadOptions readOptions;
private final DorisDeserializationSchema<List<?>> deserializer;
private final DorisOptions options;
private final DorisReadOptions readOptions;
private List<PartitionDefinition> dorisPartitions;
private ScalaValueReader scalaValueReader;
public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) {
this.deserializer = deserializer;
this.options = streamOptions.getOptions();
this.readOptions = streamOptions.getReadOptions();
@ -59,11 +59,11 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res
}
@Override
public void run(SourceContext sourceContext) throws Exception {
public void run(SourceContext<List<?>> sourceContext) {
for (PartitionDefinition partitions : dorisPartitions) {
scalaValueReader = new ScalaValueReader(partitions, options, readOptions);
while (scalaValueReader.hasNext()) {
Object next = scalaValueReader.next();
List<?> next = scalaValueReader.next();
sourceContext.collect(next);
}
}
@ -76,7 +76,7 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res
@Override
public TypeInformation<T> getProducedType() {
public TypeInformation<List<?>> getProducedType() {
return this.deserializer.getProducedType();
}
}

View File

@ -17,15 +17,17 @@
package org.apache.doris.flink.deserialization;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.util.List;
public class SimpleListDeserializationSchema implements DorisDeserializationSchema {
public class SimpleListDeserializationSchema implements DorisDeserializationSchema<List<?>> {
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(List.class);
public TypeInformation<List<?>> getProducedType() {
return TypeInformation.of(new TypeHint<List<?>>() {
});
}
}

View File

@ -206,7 +206,7 @@ class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, re
* get next value.
* @return next value
*/
def next: AnyRef = {
def next: java.util.List[_] = {
if (!hasNext) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
throw new ShouldNeverHappenException