diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-extension.yml
index 261d6c4519..2270746989 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-extension.yml
@@ -55,27 +55,6 @@ jobs:
run: |
cd fs_brokers/apache_hdfs_broker/ && /bin/bash build.sh
- - name: Build spark connector v2
- run: |
- thrift --version
- cd extension/spark-doris-connector/ && /bin/bash build.sh 2.3.4 2.11
-
- - name: Build spark connector v3
- run: |
- cd extension/spark-doris-connector/ && /bin/bash build.sh 3.1.2 2.12
-
- - name: Build flink connector 1.11
- run: |
- cd extension/flink-doris-connector/ && /bin/bash build.sh 1.11.6 2.12
-
- - name: Build flink connector 1.12
- run: |
- cd extension/flink-doris-connector/ && /bin/bash build.sh 1.12.7 2.12
-
- - name: Build flink connector 1.13
- run: |
- cd extension/flink-doris-connector/ && /bin/bash build.sh 1.13.5 2.12
-
- name: Build docs
run: |
cd docs && npm install && npm run build
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index c594d0a824..669c2d2f91 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -25,7 +25,7 @@ Your suggestions, comments and comments on Doris can be made directly through Gi
There are many ways to participate in and contribute to Doris projects: code implementation, test writing, process tool improvement, document improvement, and so on. Any contribution will be welcomed and you will be added to the list of contributors. Further, with sufficient contributions, you will have the opportunity to become a Commiter of Apache with Apache mailbox and be included in the list of [Apache Commiters] (http://people.apache.org/committer-index.html).
-Any questions, you can contact us to get timely answers, including Wechat, Gitter (GitHub instant messaging tool), e-mail and so on.
+Any questions, you can contact us to get timely answers, including dev mail list or Slack.
## Initial contact
@@ -33,8 +33,7 @@ For the first time in Doris community, you can:
* Follow [Doris Github](https://github.com/apache/incubator-doris)
* Subscribe to our [mailing list] (./subscribe-mail-list.md);
-* Join Doris Wechat Group (add micro-signal: morningman-cmy, note: join Doris Group) and ask questions at any time.
-* Enter Doris's [Gitter] (./gitter.md) chat room;
+* Join Doris [Slack](https://join.slack.com/t/apachedoriscommunity/shared_invite/zt-11jb8gesh-7IukzSrdea6mqoG0HB4gZg)
Learn the development trends of Doris project in time and give your opinions on the topics you are concerned about.
diff --git a/CONTRIBUTING_CN.md b/CONTRIBUTING_CN.md
index 72f0839df6..7de688f06e 100644
--- a/CONTRIBUTING_CN.md
+++ b/CONTRIBUTING_CN.md
@@ -25,7 +25,7 @@ under the License.
参与 Doris 项目并为其作出贡献的方法有很多:代码实现、测试编写、流程工具改进、文档完善等等。任何贡献我们都会非常欢迎,并将您加入贡献者列表,进一步,有了足够的贡献后,您还可以有机会成为 Apache 的 Commiter,拥有 Apache 邮箱,并被收录到 [Apache Commiter 列表中](http://people.apache.org/committer-index.html)。
-任何问题,您都可以联系我们得到及时解答,联系方式包括微信、Gitter(GitHub提供的即时聊天工具)、邮件等等。
+任何问题,您都可以联系我们得到及时解答,联系方式包括 dev 邮件组,Slack 等。
## 初次接触
@@ -33,8 +33,7 @@ under the License.
* 关注 Doris [Github 代码库](https://github.com/apache/incubator-doris)
* 订阅我们的 [邮件列表](./subscribe-mail-list.md);
-* 加入 Doris 微信群(加微信号:morningman-cmy, 备注:加入Doris群) 随时提问;
-* 进入 Doris 的 [Gitter](./gitter.md) 聊天室;
+* 加入 Doris 的 [Slack](https://join.slack.com/t/apachedoriscommunity/shared_invite/zt-11jb8gesh-7IukzSrdea6mqoG0HB4gZg)
通过以上方式及时了解 Doris 项目的开发动态并为您关注的话题发表意见。
diff --git a/docs/en/extending-doris/flink-doris-connector.md b/docs/en/extending-doris/flink-doris-connector.md
index 1f66e2c6c8..3538f52ccc 100644
--- a/docs/en/extending-doris/flink-doris-connector.md
+++ b/docs/en/extending-doris/flink-doris-connector.md
@@ -28,6 +28,8 @@ under the License.
- The Flink Doris Connector can support operations (read, insert, modify, delete) data stored in Doris through Flink.
+Github: https://github.com/apache/incubator-doris-connectors
+
* `Doris` table can be mapped to `DataStream` or `Table`.
>**Note:**
@@ -377,4 +379,4 @@ WITH (
);
insert into doris_sink select id,name from cdc_mysql_source;
-```
\ No newline at end of file
+```
diff --git a/docs/en/extending-doris/spark-doris-connector.md b/docs/en/extending-doris/spark-doris-connector.md
index 60a8f4259b..c7d71db781 100644
--- a/docs/en/extending-doris/spark-doris-connector.md
+++ b/docs/en/extending-doris/spark-doris-connector.md
@@ -28,6 +28,8 @@ under the License.
Spark Doris Connector can support reading data stored in Doris and writing data to Doris through Spark.
+Github: https://github.com/apache/incubator-doris-connectors
+
- Support reading data from `Doris`.
- Support `Spark DataFrame` batch/stream writing data to `Doris`
- You can map the `Doris` table to` DataFrame` or `RDD`, it is recommended to use` DataFrame`.
@@ -244,4 +246,4 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
| TIME | DataTypes.DoubleType |
| HLL | Unsupported datatype |
-* Note: In Connector, `DATE` and` DATETIME` are mapped to `String`. Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use `String` type to directly return the corresponding time readable text.
\ No newline at end of file
+* Note: In Connector, `DATE` and` DATETIME` are mapped to `String`. Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use `String` type to directly return the corresponding time readable text.
diff --git a/docs/zh-CN/extending-doris/flink-doris-connector.md b/docs/zh-CN/extending-doris/flink-doris-connector.md
index e0718e2d69..756c99e730 100644
--- a/docs/zh-CN/extending-doris/flink-doris-connector.md
+++ b/docs/zh-CN/extending-doris/flink-doris-connector.md
@@ -30,6 +30,8 @@ under the License.
Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。
+代码库地址:https://github.com/apache/incubator-doris-connectors
+
* 可以将`Doris`表映射为`DataStream`或者`Table`。
>**注意:**
@@ -381,4 +383,4 @@ WITH (
);
insert into doris_sink select id,name from cdc_mysql_source;
-```
\ No newline at end of file
+```
diff --git a/docs/zh-CN/extending-doris/spark-doris-connector.md b/docs/zh-CN/extending-doris/spark-doris-connector.md
index d6a422b3b3..81e7f9196b 100644
--- a/docs/zh-CN/extending-doris/spark-doris-connector.md
+++ b/docs/zh-CN/extending-doris/spark-doris-connector.md
@@ -28,6 +28,8 @@ under the License.
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
+代码库地址:https://github.com/apache/incubator-doris-connectors
+
- 支持从`Doris`中读取数据
- 支持`Spark DataFrame`批量/流式 写入`Doris`
- 可以将`Doris`表映射为`DataFrame`或者`RDD`,推荐使用`DataFrame`。
diff --git a/extension/flink-doris-connector/build.sh b/extension/flink-doris-connector/build.sh
deleted file mode 100644
index 83d26d21d7..0000000000
--- a/extension/flink-doris-connector/build.sh
+++ /dev/null
@@ -1,80 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-##############################################################
-# This script is used to compile Flink-Doris-Connector
-# Usage:
-# sh build.sh
-#
-##############################################################
-
-set -eo pipefail
-
-usage() {
- echo "
- Usage:
- $0 flink_version scala_version
- e.g.:
- $0 1.11.6 2.12
- $0 1.12.7 2.12
- $0 1.13.5 2.12
- "
- exit 1
-}
-
-if [ $# -ne 2 ]; then
- usage
-fi
-
-ROOT=$(dirname "$0")
-ROOT=$(
- cd "$ROOT"
- pwd
-)
-
-export DORIS_HOME=${ROOT}/../../
-
-. "${DORIS_HOME}"/env.sh
-
-# include custom environment variables
-if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then
- . "${DORIS_HOME}"/custom_env.sh
-fi
-
-# check maven
-MVN_CMD=mvn
-if [[ -n ${CUSTOM_MVN} ]]; then
- MVN_CMD=${CUSTOM_MVN}
-fi
-
-if ! ${MVN_CMD} --version; then
- echo "Error: mvn is not found"
- exit 1
-fi
-export MVN_CMD
-rm -rf output/
-${MVN_CMD} clean package -Dscala.version=$2 -Dflink.version=$1
-
-mkdir -p output/
-cp target/doris-flink-*.jar ./output/
-
-echo "*****************************************"
-echo "Successfully build Flink-Doris-Connector"
-echo "*****************************************"
-
-exit 0
diff --git a/extension/flink-doris-connector/pom.xml b/extension/flink-doris-connector/pom.xml
deleted file mode 100644
index 10b750a830..0000000000
--- a/extension/flink-doris-connector/pom.xml
+++ /dev/null
@@ -1,432 +0,0 @@
-
-
-
- 4.0.0
-
- org.apache
- apache
- 23
-
- org.apache.doris
- doris-flink-connector
- ${flink.version}-${scala.version}-1.0.0-SNAPSHOT
- Doris Flink Connector
- https://doris.apache.org/
-
-
- Apache 2.0 License
- https://www.apache.org/licenses/LICENSE-2.0.html
- repo
-
-
-
- scm:git:https://git@github.com/apache/incubator-doris.git
- scm:git:https://git@github.com/apache/incubator-doris.git
- scm:git:https://git@github.com/apache/incubator-doris.git
- HEAD
-
-
- GitHub
- https://github.com/apache/incubator-doris/issues
-
-
-
- Dev Mailing List
- dev@doris.apache.org
- dev-subscribe@doris.apache.org
- dev-unsubscribe@doris.apache.org
-
-
- Commits Mailing List
- commits@doris.apache.org
- commits-subscribe@doris.apache.org
- commits-unsubscribe@doris.apache.org
-
-
-
- ${env.scala.version}
- ${env.flink.version}
- 0.13.0
- 5.0.0
- 3.8.1
- 3.3.0
- 3.2.1
- UTF-8
- ${env.DORIS_THIRDPARTY}
- github
-
-
-
- thirdparty
-
-
- env.DORIS_THIRDPARTY
-
-
-
- ${env.DORIS_THIRDPARTY}
-
-
-
-
- custom-env
-
-
- env.CUSTOM_MAVEN_REPO
-
-
-
-
- custom-nexus
- ${env.CUSTOM_MAVEN_REPO}
-
-
-
-
- custom-nexus
- ${env.CUSTOM_MAVEN_REPO}
-
-
-
-
- flink.version
-
- 1.11.6
-
-
- true
-
-
-
- scala.version
-
- 2.12
-
-
- true
-
-
-
-
- general-env
-
-
- !env.CUSTOM_MAVEN_REPO
-
-
-
-
- central
- central maven repo https
- https://repo.maven.apache.org/maven2
-
-
-
-
-
-
- org.apache.flink
- flink-java
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-streaming-java_${scala.version}
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-clients_${scala.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-table-common
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-table-api-java-bridge_${scala.version}
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-table-planner-blink_${scala.version}
- ${flink.version}
- provided
-
-
- org.apache.thrift
- libthrift
- ${libthrift.version}
-
-
- httpclient
- org.apache.httpcomponents
-
-
- httpcore
- org.apache.httpcomponents
-
-
-
-
- org.apache.httpcomponents
- httpclient
- 4.5.13
-
-
- org.apache.arrow
- arrow-vector
- ${arrow.version}
-
-
- org.apache.arrow
- arrow-memory-netty
- ${arrow.version}
- runtime
-
-
- org.slf4j
- slf4j-api
- 1.7.25
-
-
- org.slf4j
- slf4j-log4j12
- 1.7.25
- test
-
-
- log4j
- log4j
- 1.2.17
-
-
-
- org.hamcrest
- hamcrest-core
- 1.3
- test
-
-
- org.mockito
- mockito-scala_${scala.version}
- 1.4.7
-
-
- hamcrest-core
- org.hamcrest
-
-
- test
-
-
- junit
- junit
- 4.11
-
-
- hamcrest-core
- org.hamcrest
-
-
- test
-
-
-
-
-
- org.apache.thrift.tools
- maven-thrift-plugin
- 0.1.11
-
- ${doris.thirdparty}/installed/bin/thrift
- java:fullcamel
-
-
-
- thrift-sources
- generate-sources
-
- compile
-
-
-
-
-
- net.alchim31.maven
- scala-maven-plugin
- 3.2.1
-
-
- scala-compile-first
- process-resources
-
- compile
-
-
-
- scala-test-compile
- process-test-resources
-
- testCompile
-
-
-
-
-
- -feature
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.2.1
-
-
-
- com.google.code.findbugs:*
- org.slf4j:*
-
-
-
-
- org.apache.arrow
- org.apache.doris.shaded.org.apache.arrow
-
-
- io.netty
- org.apache.doris.shaded.io.netty
-
-
- com.fasterxml.jackson
- org.apache.doris.shaded.com.fasterxml.jackson
-
-
- org.apache.commons.codec
- org.apache.doris.shaded.org.apache.commons.codec
-
-
- com.google.flatbuffers
- org.apache.doris.shaded.com.google.flatbuffers
-
-
- org.apache.thrift
- org.apache.doris.shaded.org.apache.thrift
-
-
-
-
-
- package
-
- shade
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- ${maven-compiler-plugin.version}
-
- 8
- 8
-
-
-
- org.apache.maven.plugins
- maven-javadoc-plugin
- ${maven-javadoc-plugin.version}
-
- true
- 8
- false
-
-
-
- attach-javadocs
-
- jar
-
-
-
-
-
- org.apache.maven.plugins
- maven-source-plugin
- ${maven-source-plugin.version}
-
- true
-
-
-
- compile
-
- jar
-
-
-
-
-
-
-
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
deleted file mode 100644
index 9b8d955d69..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
+++ /dev/null
@@ -1,220 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.backend;
-
-import org.apache.doris.flink.cfg.ConfigurationOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.ConnectedFailedException;
-import org.apache.doris.flink.exception.DorisException;
-import org.apache.doris.flink.exception.DorisInternalException;
-import org.apache.doris.flink.serialization.Routing;
-import org.apache.doris.flink.util.ErrorMessages;
-import org.apache.doris.thrift.TDorisExternalService;
-import org.apache.doris.thrift.TScanBatchResult;
-import org.apache.doris.thrift.TScanCloseParams;
-import org.apache.doris.thrift.TScanCloseResult;
-import org.apache.doris.thrift.TScanNextBatchParams;
-import org.apache.doris.thrift.TScanOpenParams;
-import org.apache.doris.thrift.TScanOpenResult;
-import org.apache.doris.thrift.TStatusCode;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Client to request Doris BE
- */
-public class BackendClient {
- private static Logger logger = LoggerFactory.getLogger(BackendClient.class);
-
- private Routing routing;
-
- private TDorisExternalService.Client client;
- private TTransport transport;
-
- private boolean isConnected = false;
- private final int retries;
- private final int socketTimeout;
- private final int connectTimeout;
-
- public BackendClient(Routing routing, DorisReadOptions readOptions) throws ConnectedFailedException {
- this.routing = routing;
- this.connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs();
- this.socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs();
- this.retries = readOptions.getRequestRetries() == null ? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT : readOptions.getRequestRetries();
- logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
- this.connectTimeout, this.socketTimeout, this.retries);
- open();
- }
-
- private void open() throws ConnectedFailedException {
- logger.debug("Open client to Doris BE '{}'.", routing);
- TException ex = null;
- for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
- logger.debug("Attempt {} to connect {}.", attempt, routing);
- TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
- transport = new TSocket(routing.getHost(), routing.getPort(), socketTimeout, connectTimeout);
- TProtocol protocol = factory.getProtocol(transport);
- client = new TDorisExternalService.Client(protocol);
- if (isConnected) {
- logger.info("Success connect to {}.", routing);
- return;
- }
- try {
- logger.trace("Connect status before open transport to {} is '{}'.", routing, isConnected);
- if (!transport.isOpen()) {
- transport.open();
- isConnected = true;
- }
- } catch (TTransportException e) {
- logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, routing, e);
- ex = e;
- }
-
- }
- if (!isConnected) {
- logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
- throw new ConnectedFailedException(routing.toString(), ex);
- }
- }
-
- private void close() {
- logger.trace("Connect status before close with '{}' is '{}'.", routing, isConnected);
- isConnected = false;
- if ((transport != null) && transport.isOpen()) {
- transport.close();
- logger.info("Closed a connection to {}.", routing);
- }
- if (null != client) {
- client = null;
- }
- }
-
- /**
- * Open a scanner for reading Doris data.
- *
- * @param openParams thrift struct to required by request
- * @return scan open result
- * @throws ConnectedFailedException throw if cannot connect to Doris BE
- */
- public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedFailedException {
- logger.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams);
- if (!isConnected) {
- open();
- }
- TException ex = null;
- for (int attempt = 0; attempt < retries; ++attempt) {
- logger.debug("Attempt {} to openScanner {}.", attempt, routing);
- try {
- TScanOpenResult result = client.openScanner(openParams);
- if (result == null) {
- logger.warn("Open scanner result from {} is null.", routing);
- continue;
- }
- if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) {
- logger.warn("The status of open scanner result from {} is '{}', error message is: {}.",
- routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs());
- continue;
- }
- return result;
- } catch (TException e) {
- logger.warn("Open scanner from {} failed.", routing, e);
- ex = e;
- }
- }
- logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
- throw new ConnectedFailedException(routing.toString(), ex);
- }
-
- /**
- * get next row batch from Doris BE
- *
- * @param nextBatchParams thrift struct to required by request
- * @return scan batch result
- * @throws ConnectedFailedException throw if cannot connect to Doris BE
- */
- public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws DorisException {
- logger.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams);
- if (!isConnected) {
- open();
- }
- TException ex = null;
- TScanBatchResult result = null;
- for (int attempt = 0; attempt < retries; ++attempt) {
- logger.debug("Attempt {} to getNext {}.", attempt, routing);
- try {
- result = client.getNext(nextBatchParams);
- if (result == null) {
- logger.warn("GetNext result from {} is null.", routing);
- continue;
- }
- if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) {
- logger.warn("The status of get next result from {} is '{}', error message is: {}.",
- routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs());
- continue;
- }
- return result;
- } catch (TException e) {
- logger.warn("Get next from {} failed.", routing, e);
- ex = e;
- }
- }
- if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) {
- logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatusCode(),
- result.getStatus().getErrorMsgs());
- throw new DorisInternalException(routing.toString(), result.getStatus().getStatusCode(),
- result.getStatus().getErrorMsgs());
- }
- logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
- throw new ConnectedFailedException(routing.toString(), ex);
- }
-
- /**
- * close an scanner.
- *
- * @param closeParams thrift struct to required by request
- */
- public void closeScanner(TScanCloseParams closeParams) {
- logger.debug("CloseScanner to '{}', parameter is '{}'.", routing, closeParams);
- for (int attempt = 0; attempt < retries; ++attempt) {
- logger.debug("Attempt {} to closeScanner {}.", attempt, routing);
- try {
- TScanCloseResult result = client.closeScanner(closeParams);
- if (result == null) {
- logger.warn("CloseScanner result from {} is null.", routing);
- continue;
- }
- if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) {
- logger.warn("The status of get next result from {} is '{}', error message is: {}.",
- routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs());
- continue;
- }
- break;
- } catch (TException e) {
- logger.warn("Close scanner from {} failed.", routing, e);
- }
- }
- logger.info("CloseScanner to Doris BE '{}' success.", routing);
- close();
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
deleted file mode 100644
index 47d07b8d17..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.cfg;
-
-public interface ConfigurationOptions {
- // doris fe node address
- String DORIS_FENODES = "fenodes";
-
- String DORIS_DEFAULT_CLUSTER = "default_cluster";
-
- String TABLE_IDENTIFIER = "table.identifier";
- String DORIS_TABLE_IDENTIFIER = "doris.table.identifier";
- String DORIS_READ_FIELD = "doris.read.field";
- String DORIS_FILTER_QUERY = "doris.filter.query";
- String DORIS_FILTER_QUERY_IN_MAX_COUNT = "doris.filter.query.in.max.count";
- Integer DORIS_FILTER_QUERY_IN_VALUE_UPPER_LIMIT = 10000;
-
- String DORIS_USER = "username";
- String DORIS_PASSWORD = "password";
-
- String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user";
- String DORIS_REQUEST_AUTH_PASSWORD = "doris.request.auth.password";
- String DORIS_REQUEST_RETRIES = "doris.request.retries";
- String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout.ms";
- String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout.ms";
- String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout.s";
- Integer DORIS_REQUEST_RETRIES_DEFAULT = 3;
- Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
- Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
- Integer DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600;
-
- String DORIS_TABLET_SIZE = "doris.request.tablet.size";
- Integer DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
- Integer DORIS_TABLET_SIZE_MIN = 1;
-
- String DORIS_BATCH_SIZE = "doris.batch.size";
- Integer DORIS_BATCH_SIZE_DEFAULT = 1024;
-
- String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit";
- Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
-
- String DORIS_VALUE_READER_CLASS = "doris.value.reader.class";
-
- String DORIS_DESERIALIZE_ARROW_ASYNC = "doris.deserialize.arrow.async";
- Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
-
- String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
- Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
-
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
deleted file mode 100644
index 9b2187c904..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.cfg;
-
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-/**
- * Doris connection options.
- */
-public class DorisConnectionOptions implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- protected final String fenodes;
- protected final String username;
- protected final String password;
-
- public DorisConnectionOptions(String fenodes, String username, String password) {
- this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is empty");
- this.username = username;
- this.password = password;
- }
-
- public String getFenodes() {
- return fenodes;
- }
-
- public String getUsername() {
- return username;
- }
-
- public String getPassword() {
- return password;
- }
-
- /**
- * Builder for {@link DorisConnectionOptions}.
- */
- public static class DorisConnectionOptionsBuilder {
- private String fenodes;
- private String username;
- private String password;
-
- public DorisConnectionOptionsBuilder withFenodes(String fenodes) {
- this.fenodes = fenodes;
- return this;
- }
-
- public DorisConnectionOptionsBuilder withUsername(String username) {
- this.username = username;
- return this;
- }
-
- public DorisConnectionOptionsBuilder withPassword(String password) {
- this.password = password;
- return this;
- }
-
- public DorisConnectionOptions build() {
- return new DorisConnectionOptions(fenodes, username, password);
- }
- }
-
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
deleted file mode 100644
index ad1ab07228..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ /dev/null
@@ -1,128 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.cfg;
-
-
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-import java.util.Properties;
-
-/**
- * JDBC sink batch options.
- */
-public class DorisExecutionOptions implements Serializable {
-
- private static final long serialVersionUID = 1L;
- public static final Integer DEFAULT_BATCH_SIZE = 10000;
- public static final Integer DEFAULT_MAX_RETRY_TIMES = 1;
- private static final Long DEFAULT_INTERVAL_MILLIS = 10000L;
-
- private final Integer batchSize;
- private final Integer maxRetries;
- private final Long batchIntervalMs;
-
- /**
- * Properties for the StreamLoad.
- */
- private final Properties streamLoadProp;
-
- private final Boolean enableDelete;
-
-
- public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp, Boolean enableDelete) {
- Preconditions.checkArgument(maxRetries >= 0);
- this.batchSize = batchSize;
- this.maxRetries = maxRetries;
- this.batchIntervalMs = batchIntervalMs;
- this.streamLoadProp = streamLoadProp;
- this.enableDelete = enableDelete;
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static DorisExecutionOptions defaults() {
- Properties pro = new Properties();
- pro.setProperty("format", "json");
- pro.setProperty("strip_outer_array", "true");
- return new Builder().setStreamLoadProp(pro).build();
- }
-
- public Integer getBatchSize() {
- return batchSize;
- }
-
- public Integer getMaxRetries() {
- return maxRetries;
- }
-
- public Long getBatchIntervalMs() {
- return batchIntervalMs;
- }
-
- public Properties getStreamLoadProp() {
- return streamLoadProp;
- }
-
- public Boolean getEnableDelete() {
- return enableDelete;
- }
-
- /**
- * Builder of {@link DorisExecutionOptions}.
- */
- public static class Builder {
- private Integer batchSize = DEFAULT_BATCH_SIZE;
- private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES;
- private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
- private Properties streamLoadProp = new Properties();
- private Boolean enableDelete = false;
-
- public Builder setBatchSize(Integer batchSize) {
- this.batchSize = batchSize;
- return this;
- }
-
- public Builder setMaxRetries(Integer maxRetries) {
- this.maxRetries = maxRetries;
- return this;
- }
-
- public Builder setBatchIntervalMs(Long batchIntervalMs) {
- this.batchIntervalMs = batchIntervalMs;
- return this;
- }
-
- public Builder setStreamLoadProp(Properties streamLoadProp) {
- this.streamLoadProp = streamLoadProp;
- return this;
- }
-
- public Builder setEnableDelete(Boolean enableDelete) {
- this.enableDelete = enableDelete;
- return this;
- }
-
- public DorisExecutionOptions build() {
- return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp, enableDelete);
- }
- }
-
-
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
deleted file mode 100644
index 512d0ab456..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ /dev/null
@@ -1,103 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.cfg;
-
-import org.apache.doris.flink.util.IOUtils;
-
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Options for the Doris connector.
- */
-public class DorisOptions extends DorisConnectionOptions {
-
- private static final long serialVersionUID = 1L;
-
- private String tableIdentifier;
-
-
- public DorisOptions(String fenodes, String username, String password, String tableIdentifier) {
- super(fenodes, username, password);
- this.tableIdentifier = tableIdentifier;
- }
-
- public String getTableIdentifier() {
- return tableIdentifier;
- }
-
- public String save() throws IllegalArgumentException {
- Properties copy = new Properties();
- return IOUtils.propsToString(copy);
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- /**
- * Builder of {@link DorisOptions}.
- */
- public static class Builder {
- private String fenodes;
- private String username;
- private String password;
- private String tableIdentifier;
-
- /**
- * required, tableIdentifier
- */
- public Builder setTableIdentifier(String tableIdentifier) {
- this.tableIdentifier = tableIdentifier;
- return this;
- }
-
- /**
- * optional, user name.
- */
- public Builder setUsername(String username) {
- this.username = username;
- return this;
- }
-
- /**
- * optional, password.
- */
- public Builder setPassword(String password) {
- this.password = password;
- return this;
- }
-
- /**
- * required, JDBC DB url.
- */
- public Builder setFenodes(String fenodes) {
- this.fenodes = fenodes;
- return this;
- }
-
-
- public DorisOptions build() {
- checkNotNull(fenodes, "No fenodes supplied.");
- checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
- return new DorisOptions(fenodes, username, password, tableIdentifier);
- }
- }
-
-
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
deleted file mode 100644
index 0beb18c612..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ /dev/null
@@ -1,190 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.cfg;
-
-
-import java.io.Serializable;
-
-/**
- * Doris read Options
- */
-public class DorisReadOptions implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private String readFields;
- private String filterQuery;
- private Integer requestTabletSize;
- private Integer requestConnectTimeoutMs;
- private Integer requestReadTimeoutMs;
- private Integer requestQueryTimeoutS;
- private Integer requestRetries;
- private Integer requestBatchSize;
- private Long execMemLimit;
- private Integer deserializeQueueSize;
- private Boolean deserializeArrowAsync;
-
- public DorisReadOptions(String readFields, String filterQuery, Integer requestTabletSize, Integer requestConnectTimeoutMs, Integer requestReadTimeoutMs,
- Integer requestQueryTimeoutS, Integer requestRetries, Integer requestBatchSize, Long execMemLimit,
- Integer deserializeQueueSize, Boolean deserializeArrowAsync) {
- this.readFields = readFields;
- this.filterQuery = filterQuery;
- this.requestTabletSize = requestTabletSize;
- this.requestConnectTimeoutMs = requestConnectTimeoutMs;
- this.requestReadTimeoutMs = requestReadTimeoutMs;
- this.requestQueryTimeoutS = requestQueryTimeoutS;
- this.requestRetries = requestRetries;
- this.requestBatchSize = requestBatchSize;
- this.execMemLimit = execMemLimit;
- this.deserializeQueueSize = deserializeQueueSize;
- this.deserializeArrowAsync = deserializeArrowAsync;
- }
-
- public String getReadFields() {
- return readFields;
- }
-
- public String getFilterQuery() {
- return filterQuery;
- }
-
- public Integer getRequestTabletSize() {
- return requestTabletSize;
- }
-
- public Integer getRequestConnectTimeoutMs() {
- return requestConnectTimeoutMs;
- }
-
- public Integer getRequestReadTimeoutMs() {
- return requestReadTimeoutMs;
- }
-
- public Integer getRequestRetries() {
- return requestRetries;
- }
-
- public Integer getRequestBatchSize() {
- return requestBatchSize;
- }
-
- public Integer getRequestQueryTimeoutS() {
- return requestQueryTimeoutS;
- }
-
- public Long getExecMemLimit() {
- return execMemLimit;
- }
-
- public Integer getDeserializeQueueSize() {
- return deserializeQueueSize;
- }
-
- public Boolean getDeserializeArrowAsync() {
- return deserializeArrowAsync;
- }
-
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static DorisReadOptions defaults(){
- return DorisReadOptions.builder().build();
- }
-
- /**
- * Builder of {@link DorisReadOptions}.
- */
- public static class Builder {
-
- private String readFields;
- private String filterQuery;
- private Integer requestTabletSize;
- private Integer requestConnectTimeoutMs;
- private Integer requestReadTimeoutMs;
- private Integer requestQueryTimeoutS;
- private Integer requestRetries;
- private Integer requestBatchSize;
- private Long execMemLimit;
- private Integer deserializeQueueSize;
- private Boolean deserializeArrowAsync;
-
-
- public Builder setReadFields(String readFields) {
- this.readFields = readFields;
- return this;
- }
-
- public Builder setFilterQuery(String filterQuery) {
- this.filterQuery = filterQuery;
- return this;
- }
-
- public Builder setRequestTabletSize(Integer requestTabletSize) {
- this.requestTabletSize = requestTabletSize;
- return this;
- }
-
- public Builder setRequestConnectTimeoutMs(Integer requestConnectTimeoutMs) {
- this.requestConnectTimeoutMs = requestConnectTimeoutMs;
- return this;
- }
-
- public Builder setRequestReadTimeoutMs(Integer requestReadTimeoutMs) {
- this.requestReadTimeoutMs = requestReadTimeoutMs;
- return this;
- }
-
- public Builder setRequestQueryTimeoutS(Integer requesQueryTimeoutS) {
- this.requestQueryTimeoutS = requesQueryTimeoutS;
- return this;
- }
-
- public Builder setRequestRetries(Integer requestRetries) {
- this.requestRetries = requestRetries;
- return this;
- }
-
- public Builder setRequestBatchSize(Integer requestBatchSize) {
- this.requestBatchSize = requestBatchSize;
- return this;
- }
-
- public Builder setExecMemLimit(Long execMemLimit) {
- this.execMemLimit = execMemLimit;
- return this;
- }
-
- public Builder setDeserializeQueueSize(Integer deserializeQueueSize) {
- this.deserializeQueueSize = deserializeQueueSize;
- return this;
- }
-
- public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) {
- this.deserializeArrowAsync = deserializeArrowAsync;
- return this;
- }
-
- public DorisReadOptions build() {
- return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync);
- }
-
- }
-
-
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisSink.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisSink.java
deleted file mode 100644
index 2c3db4cf34..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisSink.java
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.cfg;
-
-import org.apache.doris.flink.table.DorisDynamicOutputFormat;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.table.types.logical.LogicalType;
-
-/** Facade to create Doris {@link SinkFunction sinks}. */
-public class DorisSink {
-
-
- private DorisSink() {
- }
-
-
- /**
- * Create a Doris DataStream sink with the default {@link DorisReadOptions}
- * stream elements could only be JsonString.
- *
- * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
- */
- public static SinkFunction sink(DorisExecutionOptions executionOptions, DorisOptions dorisOptions) {
-
- return sink(new String[]{}, new LogicalType[]{}, DorisReadOptions.defaults(), executionOptions, dorisOptions);
- }
-
- /**
- * Create a Doris DataStream sink with the default {@link DorisReadOptions}
- * stream elements could only be RowData.
- *
- * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
- */
- public static SinkFunction sink(String[] fiels, LogicalType[] types,
- DorisExecutionOptions executionOptions, DorisOptions dorisOptions) {
-
- return sink(fiels, types, DorisReadOptions.defaults(), executionOptions, dorisOptions);
- }
-
- /**
- * Create a Doris DataStream sink with the default {@link DorisExecutionOptions}
- * stream elements could only be JsonString.
- *
- * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
- */
- public static SinkFunction sink(DorisOptions dorisOptions) {
-
- return sink(new String[]{}, new LogicalType[]{}, DorisReadOptions.defaults(),
- DorisExecutionOptions.defaults(), dorisOptions);
- }
-
- /**
- * Create a Doris DataStream sink with the default {@link DorisExecutionOptions}
- * stream elements could only be RowData.
- *
- * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
- */
- public static SinkFunction sink(String[] fiels, LogicalType[] types, DorisOptions dorisOptions) {
- return sink(fiels, types, DorisReadOptions.defaults(), DorisExecutionOptions.defaults(), dorisOptions);
- }
-
-
- /**
- * Create a Doris DataStream sink, stream elements could only be JsonString.
- *
- * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
- */
- public static SinkFunction sink(DorisReadOptions readOptions,
- DorisExecutionOptions executionOptions, DorisOptions dorisOptions) {
-
- return sink(new String[]{}, new LogicalType[]{}, readOptions, executionOptions, dorisOptions);
- }
-
-
- /**
- * Create a Doris DataStream sink, stream elements could only be RowData.
- *
- *
Note: the objects passed to the return sink can be processed in batch and retried.
- * Therefore, objects can not be {@link org.apache.flink.api.common.ExecutionConfig#enableObjectReuse() reused}.
- *
- *
- * @param field array of field
- * @param types types of field
- * @param readOptions parameters of read, such as readFields, filterQuery
- * @param executionOptions parameters of execution, such as batch size and maximum retries
- * @param dorisOptions parameters of options, such as fenodes, username, password, tableIdentifier
- * @param type of data in {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord
- * StreamRecord}.
- */
- public static SinkFunction sink(String[] field, LogicalType[] types, DorisReadOptions readOptions,
- DorisExecutionOptions executionOptions, DorisOptions dorisOptions) {
-
- return new GenericDorisSinkFunction(new DorisDynamicOutputFormat(
- dorisOptions, readOptions, executionOptions, types, field));
- }
-
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
deleted file mode 100644
index c5c2c162d0..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.cfg;
-
-
-import java.io.Serializable;
-import java.util.Properties;
-
-
-/**
- * Options for the Doris stream connector.
- */
-public class DorisStreamOptions implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private Properties prop;
- private DorisOptions options;
- private DorisReadOptions readOptions;
-
- public DorisStreamOptions(Properties prop) {
- this.prop = prop;
- init();
- }
-
- /**
- * convert DorisStreamOptions to DorisOptions and DorisReadOptions
- */
- private void init() {
- DorisOptions.Builder optionsBuilder = DorisOptions.builder()
- .setFenodes(prop.getProperty(ConfigurationOptions.DORIS_FENODES))
- .setUsername(prop.getProperty(ConfigurationOptions.DORIS_USER))
- .setPassword(prop.getProperty(ConfigurationOptions.DORIS_PASSWORD))
- .setTableIdentifier(prop.getProperty(ConfigurationOptions.TABLE_IDENTIFIER));
-
- DorisReadOptions.Builder readOptionsBuilder = DorisReadOptions.builder()
- .setDeserializeArrowAsync(Boolean.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC, ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString())))
- .setDeserializeQueueSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE, ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString())))
- .setExecMemLimit(Long.valueOf(prop.getProperty(ConfigurationOptions.DORIS_EXEC_MEM_LIMIT, ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT.toString())))
- .setFilterQuery(prop.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY))
- .setReadFields(prop.getProperty(ConfigurationOptions.DORIS_READ_FIELD))
- .setRequestQueryTimeoutS(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S, ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString())))
- .setRequestBatchSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT.toString())))
- .setRequestConnectTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS, ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT.toString())))
- .setRequestReadTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS, ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT.toString())))
- .setRequestRetries(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT.toString())))
- .setRequestTabletSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE, ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT.toString())));
-
- this.options = optionsBuilder.build();
- this.readOptions = readOptionsBuilder.build();
-
- }
-
- public DorisOptions getOptions() {
- return options;
- }
-
- public DorisReadOptions getReadOptions() {
- return readOptions;
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
deleted file mode 100644
index 6be6aa4ed6..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.cfg;
-
-import org.apache.doris.flink.table.DorisDynamicOutputFormat;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-public class GenericDorisSinkFunction extends RichSinkFunction
- implements CheckpointedFunction {
-
- private final DorisDynamicOutputFormat outputFormat;
-
- public GenericDorisSinkFunction(@Nonnull DorisDynamicOutputFormat outputFormat) {
- this.outputFormat = Preconditions.checkNotNull(outputFormat);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- RuntimeContext ctx = getRuntimeContext();
- outputFormat.setRuntimeContext(ctx);
- outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
- }
-
-
- @Override
- public void invoke(T value, Context context) throws Exception {
- outputFormat.writeRecord(value);
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
-
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
-
- }
-
- @Override
- public void close() throws Exception {
- outputFormat.close();
- super.close();
- }
-
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
deleted file mode 100644
index edde9534b4..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.datastream;
-
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.cfg.DorisStreamOptions;
-import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
-import org.apache.doris.flink.exception.DorisException;
-import org.apache.doris.flink.rest.PartitionDefinition;
-import org.apache.doris.flink.rest.RestService;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-
-/**
- * DorisSource
- **/
-
-public class DorisSourceFunction extends RichParallelSourceFunction> implements ResultTypeQueryable> {
-
- private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
-
- private final DorisDeserializationSchema> deserializer;
- private final DorisOptions options;
- private final DorisReadOptions readOptions;
- private transient volatile boolean isRunning;
- private List dorisPartitions;
- private List taskDorisPartitions = Lists.newArrayList();
-
- public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema> deserializer) {
- this.deserializer = deserializer;
- this.options = streamOptions.getOptions();
- this.readOptions = streamOptions.getReadOptions();
- try {
- this.dorisPartitions = RestService.findPartitions(options, readOptions, logger);
- logger.info("Doris partitions size {}", dorisPartitions.size());
- } catch (DorisException e) {
- throw new RuntimeException("Failed fetch doris partitions");
- }
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.isRunning = true;
- assignTaskPartitions();
- }
-
- /**
- * Assign patitions to each task.
- */
- private void assignTaskPartitions() {
- int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
- int totalTasks = getRuntimeContext().getNumberOfParallelSubtasks();
-
- for (int i = 0; i < dorisPartitions.size(); i++) {
- if (i % totalTasks == taskIndex) {
- taskDorisPartitions.add(dorisPartitions.get(i));
- }
- }
- logger.info("subtask {} process {} partitions ", taskIndex, taskDorisPartitions.size());
- }
-
- @Override
- public void run(SourceContext> sourceContext) {
- for (PartitionDefinition partitions : taskDorisPartitions) {
- try (ScalaValueReader scalaValueReader = new ScalaValueReader(partitions, options, readOptions)) {
- while (isRunning && scalaValueReader.hasNext()) {
- List> next = scalaValueReader.next();
- sourceContext.collect(next);
- }
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- isRunning = false;
- }
-
- @Override
- public TypeInformation> getProducedType() {
- return this.deserializer.getProducedType();
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
deleted file mode 100644
index ba0921b9aa..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
+++ /dev/null
@@ -1,25 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.deserialization;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-import java.io.Serializable;
-
-
-public interface DorisDeserializationSchema extends Serializable, ResultTypeQueryable {
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
deleted file mode 100644
index d9ec6e5eae..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++ /dev/null
@@ -1,33 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.deserialization;
-
-
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.util.List;
-
-
-public class SimpleListDeserializationSchema implements DorisDeserializationSchema> {
-
- @Override
- public TypeInformation> getProducedType() {
- return TypeInformation.of(new TypeHint>() {
- });
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
deleted file mode 100644
index e25d1a592a..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.exception;
-
-public class ConnectedFailedException extends DorisException {
- public ConnectedFailedException(String server, Throwable cause) {
- super("Connect to " + server + "failed.", cause);
- }
-
- public ConnectedFailedException(String server, int statusCode, Throwable cause) {
- super("Connect to " + server + "failed, status code is " + statusCode + ".", cause);
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java
deleted file mode 100644
index 2274f87121..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.exception;
-
-public class DorisException extends Exception {
- public DorisException() {
- super();
- }
-
- public DorisException(String message) {
- super(message);
- }
-
- public DorisException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public DorisException(Throwable cause) {
- super(cause);
- }
-
- protected DorisException(String message, Throwable cause,
- boolean enableSuppression,
- boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java
deleted file mode 100644
index eadd860de5..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.exception;
-
-import org.apache.doris.thrift.TStatusCode;
-
-import java.util.List;
-
-public class DorisInternalException extends DorisException {
- public DorisInternalException(String server, TStatusCode statusCode, List errorMsgs) {
- super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs);
- }
-
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java
deleted file mode 100644
index 4c0ae0939a..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.exception;
-
-public class IllegalArgumentException extends DorisException {
- public IllegalArgumentException(String msg, Throwable cause) {
- super(msg, cause);
- }
-
- public IllegalArgumentException(String arg, String value) {
- super("argument '" + arg + "' is illegal, value is '" + value + "'.");
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
deleted file mode 100644
index a26718d657..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
+++ /dev/null
@@ -1,21 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.exception;
-
-public class ShouldNeverHappenException extends DorisException {
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java
deleted file mode 100644
index 233d27e167..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.exception;
-
-public class StreamLoadException extends Exception {
- public StreamLoadException() {
- super();
- }
-
- public StreamLoadException(String message) {
- super(message);
- }
-
- public StreamLoadException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public StreamLoadException(Throwable cause) {
- super(cause);
- }
-
- protected StreamLoadException(String message, Throwable cause,
- boolean enableSuppression,
- boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
deleted file mode 100644
index 8a66f76dfc..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.rest;
-
-import org.apache.doris.flink.cfg.DorisOptions;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * Doris partition info.
- */
-public class PartitionDefinition implements Serializable, Comparable {
- private final String database;
- private final String table;
-
- private final String beAddress;
- private final Set tabletIds;
- private final String queryPlan;
- private final String serializedSettings;
-
- public PartitionDefinition(String database, String table,
- DorisOptions settings, String beAddress, Set tabletIds, String queryPlan)
- throws IllegalArgumentException {
- if (settings != null) {
- this.serializedSettings = settings.save();
- } else {
- this.serializedSettings = null;
- }
- this.database = database;
- this.table = table;
- this.beAddress = beAddress;
- this.tabletIds = tabletIds;
- this.queryPlan = queryPlan;
- }
-
- public String getBeAddress() {
- return beAddress;
- }
-
- public Set getTabletIds() {
- return tabletIds;
- }
-
- public String getDatabase() {
- return database;
- }
-
- public String getTable() {
- return table;
- }
-
- public String getQueryPlan() {
- return queryPlan;
- }
-
-
- @Override
- public int compareTo(PartitionDefinition o) {
- int cmp = database.compareTo(o.database);
- if (cmp != 0) {
- return cmp;
- }
- cmp = table.compareTo(o.table);
- if (cmp != 0) {
- return cmp;
- }
- cmp = beAddress.compareTo(o.beAddress);
- if (cmp != 0) {
- return cmp;
- }
- cmp = queryPlan.compareTo(o.queryPlan);
- if (cmp != 0) {
- return cmp;
- }
-
- cmp = tabletIds.size() - o.tabletIds.size();
- if (cmp != 0) {
- return cmp;
- }
-
- Set similar = new HashSet<>(tabletIds);
- Set diffSelf = new HashSet<>(tabletIds);
- Set diffOther = new HashSet<>(o.tabletIds);
- similar.retainAll(o.tabletIds);
- diffSelf.removeAll(similar);
- diffOther.removeAll(similar);
- if (diffSelf.size() == 0) {
- return 0;
- }
- long diff = Collections.min(diffSelf) - Collections.min(diffOther);
- return diff < 0 ? -1 : 1;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PartitionDefinition that = (PartitionDefinition) o;
- return Objects.equals(database, that.database) &&
- Objects.equals(table, that.table) &&
- Objects.equals(beAddress, that.beAddress) &&
- Objects.equals(tabletIds, that.tabletIds) &&
- Objects.equals(queryPlan, that.queryPlan) &&
- Objects.equals(serializedSettings, that.serializedSettings);
- }
-
- @Override
- public int hashCode() {
- int result = database.hashCode();
- result = 31 * result + table.hashCode();
- result = 31 * result + beAddress.hashCode();
- result = 31 * result + queryPlan.hashCode();
- result = 31 * result + tabletIds.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "PartitionDefinition{" +
- ", database='" + database + '\'' +
- ", table='" + table + '\'' +
- ", beAddress='" + beAddress + '\'' +
- ", tabletIds=" + tabletIds +
- ", queryPlan='" + queryPlan + '\'' +
- '}';
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
deleted file mode 100644
index 0c4264f201..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ /dev/null
@@ -1,641 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.rest;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.IllegalArgumentException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.doris.flink.cfg.ConfigurationOptions;
-import org.apache.doris.flink.exception.ConnectedFailedException;
-import org.apache.doris.flink.exception.DorisException;
-import org.apache.doris.flink.exception.ShouldNeverHappenException;
-import org.apache.doris.flink.rest.models.Backend;
-import org.apache.doris.flink.rest.models.BackendRow;
-import org.apache.doris.flink.rest.models.BackendV2;
-import org.apache.doris.flink.rest.models.QueryPlan;
-import org.apache.doris.flink.rest.models.Schema;
-import org.apache.doris.flink.rest.models.Tablet;
-import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.entity.StringEntity;
-
-import org.slf4j.Logger;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN;
-import static org.apache.doris.flink.util.ErrorMessages.CONNECT_FAILED_MESSAGE;
-import static org.apache.doris.flink.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE;
-import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
-
-
-/**
- * Service for communicate with Doris FE.
- */
-public class RestService implements Serializable {
- public final static int REST_RESPONSE_STATUS_OK = 200;
- public final static int REST_RESPONSE_CODE_OK = 0;
- private final static String REST_RESPONSE_BE_ROWS_KEY = "rows";
- private static final String API_PREFIX = "/api";
- private static final String SCHEMA = "_schema";
- private static final String QUERY_PLAN = "_query_plan";
- @Deprecated
- private static final String BACKENDS = "/rest/v1/system?path=//backends";
- private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
- private static final String FE_LOGIN = "/rest/v1/login";
-
- /**
- * send request to Doris FE and get response json string.
- *
- * @param options configuration of request
- * @param request {@link HttpRequestBase} real request
- * @param logger {@link Logger}
- * @return Doris FE response in json string
- * @throws ConnectedFailedException throw when cannot connect to Doris FE
- */
- private static String send(DorisOptions options, DorisReadOptions readOptions, HttpRequestBase request, Logger logger) throws
- ConnectedFailedException {
- int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs();
- int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs();
- int retries = readOptions.getRequestRetries() == null ? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT : readOptions.getRequestRetries();
- logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
- connectTimeout, socketTimeout, retries);
-
- RequestConfig requestConfig = RequestConfig.custom()
- .setConnectTimeout(connectTimeout)
- .setSocketTimeout(socketTimeout)
- .build();
-
- request.setConfig(requestConfig);
- logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), options.getUsername());
- IOException ex = null;
- int statusCode = -1;
-
- for (int attempt = 0; attempt < retries; attempt++) {
- logger.debug("Attempt {} to request {}.", attempt, request.getURI());
- try {
- String response;
- if (request instanceof HttpGet) {
- response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(), logger);
- } else {
- response = getConnectionPost(request, options.getUsername(), options.getPassword(), logger);
- }
- if (response == null) {
- logger.warn("Failed to get response from Doris FE {}, http code is {}",
- request.getURI(), statusCode);
- continue;
- }
- logger.trace("Success get response from Doris FE: {}, response is: {}.",
- request.getURI(), response);
- //Handle the problem of inconsistent data format returned by http v1 and v2
- ObjectMapper mapper = new ObjectMapper();
- Map map = mapper.readValue(response, Map.class);
- if (map.containsKey("code") && map.containsKey("msg")) {
- Object data = map.get("data");
- return mapper.writeValueAsString(data);
- } else {
- return response;
- }
- } catch (IOException e) {
- ex = e;
- logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
- }
- }
-
- logger.error(CONNECT_FAILED_MESSAGE, request.getURI(), ex);
- throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
- }
-
- private static String getConnectionPost(HttpRequestBase request, String user, String passwd, Logger logger) throws IOException {
- URL url = new URL(request.getURI().toString());
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setInstanceFollowRedirects(false);
- conn.setRequestMethod(request.getMethod());
- String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
- conn.setRequestProperty("Authorization", "Basic " + authEncoding);
- InputStream content = ((HttpPost) request).getEntity().getContent();
- String res = IOUtils.toString(content);
- conn.setDoOutput(true);
- conn.setDoInput(true);
- PrintWriter out = new PrintWriter(conn.getOutputStream());
- // send request params
- out.print(res);
- // flush
- out.flush();
- // read response
- return parseResponse(conn, logger);
- }
-
- private static String getConnectionGet(String request, String user, String passwd, Logger logger) throws IOException {
- URL realUrl = new URL(request);
- // open connection
- HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
- String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
- connection.setRequestProperty("Authorization", "Basic " + authEncoding);
-
- connection.connect();
- return parseResponse(connection, logger);
- }
-
- private static String parseResponse(HttpURLConnection connection, Logger logger) throws IOException {
- if (connection.getResponseCode() != HttpStatus.SC_OK) {
- logger.warn("Failed to get response from Doris {}, http code is {}",
- connection.getURL(), connection.getResponseCode());
- throw new IOException("Failed to get response from Doris");
- }
- String result = "";
- BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
- String line;
- while ((line = in.readLine()) != null) {
- result += line;
- }
- if (in != null) {
- in.close();
- }
- return result;
- }
-
- /**
- * parse table identifier to array.
- *
- * @param tableIdentifier table identifier string
- * @param logger {@link Logger}
- * @return first element is db name, second element is table name
- * @throws IllegalArgumentException table identifier is illegal
- */
- @VisibleForTesting
- static String[] parseIdentifier(String tableIdentifier, Logger logger) throws IllegalArgumentException {
- logger.trace("Parse identifier '{}'.", tableIdentifier);
- if (StringUtils.isEmpty(tableIdentifier)) {
- logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
- throw new IllegalArgumentException("table.identifier", tableIdentifier);
- }
- String[] identifier = tableIdentifier.split("\\.");
- if (identifier.length != 2) {
- logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
- throw new IllegalArgumentException("table.identifier", tableIdentifier);
- }
- return identifier;
- }
-
- /**
- * choice a Doris FE node to request.
- *
- * @param feNodes Doris FE node list, separate be comma
- * @param logger slf4j logger
- * @return the chosen one Doris FE node
- * @throws IllegalArgumentException fe nodes is illegal
- */
- @VisibleForTesting
- static String randomEndpoint(String feNodes, Logger logger) throws IllegalArgumentException {
- logger.trace("Parse fenodes '{}'.", feNodes);
- if (StringUtils.isEmpty(feNodes)) {
- logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
- throw new IllegalArgumentException("fenodes", feNodes);
- }
- List nodes = Arrays.asList(feNodes.split(","));
- Collections.shuffle(nodes);
- return nodes.get(0).trim();
- }
-
- /**
- * choice a Doris BE node to request.
- *
- * @param options configuration of request
- * @param logger slf4j logger
- * @return the chosen one Doris BE node
- * @throws IllegalArgumentException BE nodes is illegal
- */
- @VisibleForTesting
- public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
- List backends = getBackendsV2(options, readOptions, logger);
- logger.trace("Parse beNodes '{}'.", backends);
- if (backends == null || backends.isEmpty()) {
- logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
- throw new IllegalArgumentException("beNodes", String.valueOf(backends));
- }
- Collections.shuffle(backends);
- BackendV2.BackendRowV2 backend = backends.get(0);
- return backend.getIp() + ":" + backend.getHttpPort();
- }
-
- /**
- * get Doris BE nodes to request.
- *
- * @param options configuration of request
- * @param logger slf4j logger
- * @return the chosen one Doris BE node
- * @throws IllegalArgumentException BE nodes is illegal
- *
- * This method is deprecated. Because it needs ADMIN_PRIV to get backends, which is not suitable for common users.
- * Use getBackendsV2 instead
- */
- @Deprecated
- @VisibleForTesting
- static List getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
- String feNodes = options.getFenodes();
- String feNode = randomEndpoint(feNodes, logger);
- String beUrl = "http://" + feNode + BACKENDS;
- HttpGet httpGet = new HttpGet(beUrl);
- String response = send(options, readOptions, httpGet, logger);
- logger.info("Backend Info:{}", response);
- List backends = parseBackend(response, logger);
- return backends;
- }
-
- @Deprecated
- static List parseBackend(String response, Logger logger) throws DorisException, IOException {
- ObjectMapper mapper = new ObjectMapper();
- Backend backend;
- try {
- backend = mapper.readValue(response, Backend.class);
- } catch (JsonParseException e) {
- String errMsg = "Doris BE's response is not a json. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- } catch (JsonMappingException e) {
- String errMsg = "Doris BE's response cannot map to schema. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- } catch (IOException e) {
- String errMsg = "Parse Doris BE's response to json failed. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- }
-
- if (backend == null) {
- logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
- throw new ShouldNeverHappenException();
- }
- List backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList());
- logger.debug("Parsing schema result is '{}'.", backendRows);
- return backendRows;
- }
-
- /**
- * get Doris BE nodes to request.
- *
- * @param options configuration of request
- * @param logger slf4j logger
- * @return the chosen one Doris BE node
- * @throws IllegalArgumentException BE nodes is illegal
- */
- @VisibleForTesting
- static List getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
- String feNodes = options.getFenodes();
- String feNode = randomEndpoint(feNodes, logger);
- String beUrl = "http://" + feNode + BACKENDS_V2;
- HttpGet httpGet = new HttpGet(beUrl);
- String response = send(options, readOptions, httpGet, logger);
- logger.info("Backend Info:{}", response);
- List backends = parseBackendV2(response, logger);
- return backends;
- }
-
- static List parseBackendV2(String response, Logger logger) throws DorisException, IOException {
- ObjectMapper mapper = new ObjectMapper();
- BackendV2 backend;
- try {
- backend = mapper.readValue(response, BackendV2.class);
- } catch (JsonParseException e) {
- String errMsg = "Doris BE's response is not a json. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- } catch (JsonMappingException e) {
- String errMsg = "Doris BE's response cannot map to schema. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- } catch (IOException e) {
- String errMsg = "Parse Doris BE's response to json failed. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- }
-
- if (backend == null) {
- logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
- throw new ShouldNeverHappenException();
- }
- List backendRows = backend.getBackends();
- logger.debug("Parsing schema result is '{}'.", backendRows);
- return backendRows;
- }
-
- /**
- * get a valid URI to connect Doris FE.
- *
- * @param options configuration of request
- * @param logger {@link Logger}
- * @return uri string
- * @throws IllegalArgumentException throw when configuration is illegal
- */
- @VisibleForTesting
- static String getUriStr(DorisOptions options, Logger logger) throws IllegalArgumentException {
- String[] identifier = parseIdentifier(options.getTableIdentifier(), logger);
- return "http://" +
- randomEndpoint(options.getFenodes(), logger) + API_PREFIX +
- "/" + identifier[0] +
- "/" + identifier[1] +
- "/";
- }
-
- /**
- * discover Doris table schema from Doris FE.
- *
- * @param options configuration of request
- * @param logger slf4j logger
- * @return Doris table schema
- * @throws DorisException throw when discover failed
- */
- public static Schema getSchema(DorisOptions options, DorisReadOptions readOptions, Logger logger)
- throws DorisException {
- logger.trace("Finding schema.");
- HttpGet httpGet = new HttpGet(getUriStr(options, logger) + SCHEMA);
- String response = send(options, readOptions, httpGet, logger);
- logger.debug("Find schema response is '{}'.", response);
- return parseSchema(response, logger);
- }
-
- /**
- * translate Doris FE response to inner {@link Schema} struct.
- *
- * @param response Doris FE response
- * @param logger {@link Logger}
- * @return inner {@link Schema} struct
- * @throws DorisException throw when translate failed
- */
- @VisibleForTesting
- public static Schema parseSchema(String response, Logger logger) throws DorisException {
- logger.trace("Parse response '{}' to schema.", response);
- ObjectMapper mapper = new ObjectMapper();
- Schema schema;
- try {
- schema = mapper.readValue(response, Schema.class);
- } catch (JsonParseException e) {
- String errMsg = "Doris FE's response is not a json. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- } catch (JsonMappingException e) {
- String errMsg = "Doris FE's response cannot map to schema. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- } catch (IOException e) {
- String errMsg = "Parse Doris FE's response to json failed. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- }
-
- if (schema == null) {
- logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
- throw new ShouldNeverHappenException();
- }
-
- if (schema.getStatus() != REST_RESPONSE_STATUS_OK) {
- String errMsg = "Doris FE's response is not OK, status is " + schema.getStatus();
- logger.error(errMsg);
- throw new DorisException(errMsg);
- }
- logger.debug("Parsing schema result is '{}'.", schema);
- return schema;
- }
-
- /**
- * find Doris partitions from Doris FE.
- *
- * @param options configuration of request
- * @param logger {@link Logger}
- * @return an list of Doris partitions
- * @throws DorisException throw when find partition failed
- */
- public static List findPartitions(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException {
- String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
- String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" : readOptions.getReadFields();
- String sql = "select " + readFields +
- " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
- if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
- sql += " where " + readOptions.getFilterQuery();
- }
- logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
-
- HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN);
- String entity = "{\"sql\": \"" + sql + "\"}";
- logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
- StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
- stringEntity.setContentEncoding("UTF-8");
- stringEntity.setContentType("application/json");
- httpPost.setEntity(stringEntity);
-
- String resStr = send(options, readOptions, httpPost, logger);
- logger.debug("Find partition response is '{}'.", resStr);
- QueryPlan queryPlan = getQueryPlan(resStr, logger);
- Map> be2Tablets = selectBeForTablet(queryPlan, logger);
- return tabletsMapToPartition(
- options,
- readOptions,
- be2Tablets,
- queryPlan.getOpaqued_query_plan(),
- tableIdentifiers[0],
- tableIdentifiers[1],
- logger);
- }
-
- /**
- * translate Doris FE response string to inner {@link QueryPlan} struct.
- *
- * @param response Doris FE response string
- * @param logger {@link Logger}
- * @return inner {@link QueryPlan} struct
- * @throws DorisException throw when translate failed.
- */
- @VisibleForTesting
- static QueryPlan getQueryPlan(String response, Logger logger) throws DorisException {
- ObjectMapper mapper = new ObjectMapper();
- QueryPlan queryPlan;
- try {
- queryPlan = mapper.readValue(response, QueryPlan.class);
- } catch (JsonParseException e) {
- String errMsg = "Doris FE's response is not a json. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- } catch (JsonMappingException e) {
- String errMsg = "Doris FE's response cannot map to schema. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- } catch (IOException e) {
- String errMsg = "Parse Doris FE's response to json failed. res: " + response;
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- }
-
- if (queryPlan == null) {
- logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
- throw new ShouldNeverHappenException();
- }
-
- if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) {
- String errMsg = "Doris FE's response is not OK, status is " + queryPlan.getStatus();
- logger.error(errMsg);
- throw new DorisException(errMsg);
- }
- logger.debug("Parsing partition result is '{}'.", queryPlan);
- return queryPlan;
- }
-
- /**
- * select which Doris BE to get tablet data.
- *
- * @param queryPlan {@link QueryPlan} translated from Doris FE response
- * @param logger {@link Logger}
- * @return BE to tablets {@link Map}
- * @throws DorisException throw when select failed.
- */
- @VisibleForTesting
- static Map> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
- Map> be2Tablets = new HashMap<>();
- for (Map.Entry part : queryPlan.getPartitions().entrySet()) {
- logger.debug("Parse tablet info: '{}'.", part);
- long tabletId;
- try {
- tabletId = Long.parseLong(part.getKey());
- } catch (NumberFormatException e) {
- String errMsg = "Parse tablet id '" + part.getKey() + "' to long failed.";
- logger.error(errMsg, e);
- throw new DorisException(errMsg, e);
- }
- String target = null;
- int tabletCount = Integer.MAX_VALUE;
- for (String candidate : part.getValue().getRoutings()) {
- logger.trace("Evaluate Doris BE '{}' to tablet '{}'.", candidate, tabletId);
- if (!be2Tablets.containsKey(candidate)) {
- logger.debug("Choice a new Doris BE '{}' for tablet '{}'.", candidate, tabletId);
- List tablets = new ArrayList<>();
- be2Tablets.put(candidate, tablets);
- target = candidate;
- break;
- } else {
- if (be2Tablets.get(candidate).size() < tabletCount) {
- target = candidate;
- tabletCount = be2Tablets.get(candidate).size();
- logger.debug("Current candidate Doris BE to tablet '{}' is '{}' with tablet count {}.",
- tabletId, target, tabletCount);
- }
- }
- }
- if (target == null) {
- String errMsg = "Cannot choice Doris BE for tablet " + tabletId;
- logger.error(errMsg);
- throw new DorisException(errMsg);
- }
-
- logger.debug("Choice Doris BE '{}' for tablet '{}'.", target, tabletId);
- be2Tablets.get(target).add(tabletId);
- }
- return be2Tablets;
- }
-
- /**
- * tablet count limit for one Doris RDD partition
- *
- * @param readOptions configuration of request
- * @param logger {@link Logger}
- * @return tablet count limit
- */
- @VisibleForTesting
- static int tabletCountLimitForOnePartition(DorisReadOptions readOptions, Logger logger) {
- int tabletsSize = DORIS_TABLET_SIZE_DEFAULT;
- if (readOptions.getRequestTabletSize() != null) {
- tabletsSize = readOptions.getRequestTabletSize();
- }
- if (tabletsSize < DORIS_TABLET_SIZE_MIN) {
- logger.warn("{} is less than {}, set to default value {}.",
- DORIS_TABLET_SIZE, DORIS_TABLET_SIZE_MIN, DORIS_TABLET_SIZE_MIN);
- tabletsSize = DORIS_TABLET_SIZE_MIN;
- }
- logger.debug("Tablet size is set to {}.", tabletsSize);
- return tabletsSize;
- }
-
- /**
- * translate BE tablets map to Doris RDD partition.
- *
- * @param options configuration of request
- * @param be2Tablets BE to tablets {@link Map}
- * @param opaquedQueryPlan Doris BE execute plan getting from Doris FE
- * @param database database name of Doris table
- * @param table table name of Doris table
- * @param logger {@link Logger}
- * @return Doris RDD partition {@link List}
- * @throws IllegalArgumentException throw when translate failed
- */
- @VisibleForTesting
- static List tabletsMapToPartition(DorisOptions options, DorisReadOptions readOptions, Map> be2Tablets,
- String opaquedQueryPlan, String database, String table, Logger logger)
- throws IllegalArgumentException {
- int tabletsSize = tabletCountLimitForOnePartition(readOptions, logger);
- List partitions = new ArrayList<>();
- for (Map.Entry> beInfo : be2Tablets.entrySet()) {
- logger.debug("Generate partition with beInfo: '{}'.", beInfo);
- HashSet tabletSet = new HashSet<>(beInfo.getValue());
- beInfo.getValue().clear();
- beInfo.getValue().addAll(tabletSet);
- int first = 0;
- while (first < beInfo.getValue().size()) {
- Set partitionTablets = new HashSet<>(beInfo.getValue().subList(
- first, Math.min(beInfo.getValue().size(), first + tabletsSize)));
- first = first + tabletsSize;
- PartitionDefinition partitionDefinition =
- new PartitionDefinition(database, table, options,
- beInfo.getKey(), partitionTablets, opaquedQueryPlan);
- logger.debug("Generate one PartitionDefinition '{}'.", partitionDefinition);
- partitions.add(partitionDefinition);
- }
- }
- return partitions;
- }
-
-
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
deleted file mode 100644
index 5c6455628f..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.rest;
-
-import org.apache.doris.flink.rest.models.Field;
-import org.apache.doris.flink.rest.models.Schema;
-import org.apache.doris.thrift.TScanColumnDesc;
-
-import java.util.List;
-
-
-public class SchemaUtils {
-
- /**
- * convert Doris return schema to inner schema struct.
- *
- * @param tscanColumnDescs Doris BE return schema
- * @return inner schema struct
- */
- public static Schema convertToSchema(List tscanColumnDescs) {
- Schema schema = new Schema(tscanColumnDescs.size());
- tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0, "")));
- return schema;
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
deleted file mode 100644
index d91614f442..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * Be response model
- **/
-@Deprecated
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Backend {
-
- @JsonProperty(value = "rows")
- private List rows;
-
- public List getRows() {
- return rows;
- }
-
- public void setRows(List rows) {
- this.rows = rows;
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
deleted file mode 100644
index 3dd04710ae..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
+++ /dev/null
@@ -1,68 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-@Deprecated
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class BackendRow {
-
- @JsonProperty(value = "HttpPort")
- private String HttpPort;
-
- @JsonProperty(value = "IP")
- private String IP;
-
- @JsonProperty(value = "Alive")
- private Boolean Alive;
-
- public String getHttpPort() {
- return HttpPort;
- }
-
- public void setHttpPort(String httpPort) {
- HttpPort = httpPort;
- }
-
- public String getIP() {
- return IP;
- }
-
- public void setIP(String IP) {
- this.IP = IP;
- }
-
- public Boolean getAlive() {
- return Alive;
- }
-
- public void setAlive(Boolean alive) {
- Alive = alive;
- }
-
- @Override
- public String toString() {
- return "BackendRow{" +
- "HttpPort='" + HttpPort + '\'' +
- ", IP='" + IP + '\'' +
- ", Alive=" + Alive +
- '}';
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
deleted file mode 100644
index 5efb85ec07..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * Be response model
- **/
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class BackendV2 {
-
- @JsonProperty(value = "backends")
- private List backends;
-
- public List getBackends() {
- return backends;
- }
-
- public void setBackends(List backends) {
- this.backends = backends;
- }
-
- public static class BackendRowV2 {
- @JsonProperty("ip")
- public String ip;
- @JsonProperty("http_port")
- public int httpPort;
- @JsonProperty("is_alive")
- public boolean isAlive;
-
- public String getIp() {
- return ip;
- }
-
- public void setIp(String ip) {
- this.ip = ip;
- }
-
- public int getHttpPort() {
- return httpPort;
- }
-
- public void setHttpPort(int httpPort) {
- this.httpPort = httpPort;
- }
-
- public boolean isAlive() {
- return isAlive;
- }
-
- public void setAlive(boolean alive) {
- isAlive = alive;
- }
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
deleted file mode 100644
index 04341bf571..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
+++ /dev/null
@@ -1,121 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.rest.models;
-
-import java.util.Objects;
-
-public class Field {
- private String name;
- private String type;
- private String comment;
- private int precision;
- private int scale;
- private String aggregation_type;
-
- public Field() {
- }
-
- public Field(String name, String type, String comment, int precision, int scale, String aggregation_type) {
- this.name = name;
- this.type = type;
- this.comment = comment;
- this.precision = precision;
- this.scale = scale;
- this.aggregation_type = aggregation_type;
- }
-
- public String getAggregation_type() {
- return aggregation_type;
- }
-
- public void setAggregation_type(String aggregation_type) {
- this.aggregation_type = aggregation_type;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getComment() {
- return comment;
- }
-
- public void setComment(String comment) {
- this.comment = comment;
- }
-
- public int getPrecision() {
- return precision;
- }
-
- public void setPrecision(int precision) {
- this.precision = precision;
- }
-
- public int getScale() {
- return scale;
- }
-
- public void setScale(int scale) {
- this.scale = scale;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Field field = (Field) o;
- return precision == field.precision &&
- scale == field.scale &&
- Objects.equals(name, field.name) &&
- Objects.equals(type, field.type) &&
- Objects.equals(comment, field.comment);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, type, comment, precision, scale);
- }
-
- @Override
- public String toString() {
- return "Field{" +
- "name='" + name + '\'' +
- ", type='" + type + '\'' +
- ", comment='" + comment + '\'' +
- ", precision=" + precision +
- ", scale=" + scale +
- '}';
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
deleted file mode 100644
index e65175ca4b..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.rest.models;
-
-import java.util.Map;
-import java.util.Objects;
-
-public class QueryPlan {
- private int status;
- private String opaqued_query_plan;
- private Map partitions;
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- public String getOpaqued_query_plan() {
- return opaqued_query_plan;
- }
-
- public void setOpaqued_query_plan(String opaqued_query_plan) {
- this.opaqued_query_plan = opaqued_query_plan;
- }
-
- public Map getPartitions() {
- return partitions;
- }
-
- public void setPartitions(Map partitions) {
- this.partitions = partitions;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- QueryPlan queryPlan = (QueryPlan) o;
- return status == queryPlan.status &&
- Objects.equals(opaqued_query_plan, queryPlan.opaqued_query_plan) &&
- Objects.equals(partitions, queryPlan.partitions);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(status, opaqued_query_plan, partitions);
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
deleted file mode 100644
index 07a356cf29..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class RespContent {
-
- @JsonProperty(value = "TxnId")
- private int TxnId;
-
- @JsonProperty(value = "Label")
- private String Label;
-
- @JsonProperty(value = "Status")
- private String Status;
-
- @JsonProperty(value = "ExistingJobStatus")
- private String ExistingJobStatus;
-
- @JsonProperty(value = "Message")
- private String Message;
-
- @JsonProperty(value = "NumberTotalRows")
- private long NumberTotalRows;
-
- @JsonProperty(value = "NumberLoadedRows")
- private long NumberLoadedRows;
-
- @JsonProperty(value = "NumberFilteredRows")
- private int NumberFilteredRows;
-
- @JsonProperty(value = "NumberUnselectedRows")
- private int NumberUnselectedRows;
-
- @JsonProperty(value = "LoadBytes")
- private long LoadBytes;
-
- @JsonProperty(value = "LoadTimeMs")
- private int LoadTimeMs;
-
- @JsonProperty(value = "BeginTxnTimeMs")
- private int BeginTxnTimeMs;
-
- @JsonProperty(value = "StreamLoadPutTimeMs")
- private int StreamLoadPutTimeMs;
-
- @JsonProperty(value = "ReadDataTimeMs")
- private int ReadDataTimeMs;
-
- @JsonProperty(value = "WriteDataTimeMs")
- private int WriteDataTimeMs;
-
- @JsonProperty(value = "CommitAndPublishTimeMs")
- private int CommitAndPublishTimeMs;
-
- @JsonProperty(value = "ErrorURL")
- private String ErrorURL;
-
- public String getStatus() {
- return Status;
- }
-
- public String getMessage() {
- return Message;
- }
-
- @Override
- public String toString() {
- ObjectMapper mapper = new ObjectMapper();
- try {
- return mapper.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- return "";
- }
-
- }
-
- public String getErrorURL() {
- return ErrorURL;
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
deleted file mode 100644
index 264e73683f..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
+++ /dev/null
@@ -1,105 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.rest.models;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class Schema {
- private int status = 0;
- private String keysType;
- private List properties;
-
- public Schema() {
- properties = new ArrayList<>();
- }
-
- public Schema(int fieldCount) {
- properties = new ArrayList<>(fieldCount);
- }
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- public String getKeysType() {
- return keysType;
- }
-
- public void setKeysType(String keysType) {
- this.keysType = keysType;
- }
-
- public List getProperties() {
- return properties;
- }
-
- public void setProperties(List properties) {
- this.properties = properties;
- }
-
- public void put(String name, String type, String comment, int scale, int precision, String aggregation_type) {
- properties.add(new Field(name, type, comment, scale, precision, aggregation_type));
- }
-
- public void put(Field f) {
- properties.add(f);
- }
-
- public Field get(int index) {
- if (index >= properties.size()) {
- throw new IndexOutOfBoundsException("Index: " + index + ", Fields size:" + properties.size());
- }
- return properties.get(index);
- }
-
- public int size() {
- return properties.size();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Schema schema = (Schema) o;
- return status == schema.status &&
- Objects.equals(properties, schema.properties);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(status, properties);
- }
-
- @Override
- public String toString() {
- return "Schema{" +
- "status=" + status +
- ", properties=" + properties +
- '}';
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
deleted file mode 100644
index 70b0f13959..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.rest.models;
-
-import java.util.List;
-import java.util.Objects;
-
-public class Tablet {
- private List routings;
- private int version;
- private long versionHash;
- private long schemaHash;
-
- public List getRoutings() {
- return routings;
- }
-
- public void setRoutings(List routings) {
- this.routings = routings;
- }
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
- }
-
- public long getVersionHash() {
- return versionHash;
- }
-
- public void setVersionHash(long versionHash) {
- this.versionHash = versionHash;
- }
-
- public long getSchemaHash() {
- return schemaHash;
- }
-
- public void setSchemaHash(long schemaHash) {
- this.schemaHash = schemaHash;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Tablet tablet = (Tablet) o;
- return version == tablet.version &&
- versionHash == tablet.versionHash &&
- schemaHash == tablet.schemaHash &&
- Objects.equals(routings, tablet.routings);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(routings, version, versionHash, schemaHash);
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/Routing.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/Routing.java
deleted file mode 100644
index 25fdbe50e7..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/Routing.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.serialization;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.doris.flink.exception.IllegalArgumentException;
-
-import static org.apache.doris.flink.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
-
-/**
- * present an Doris BE address.
- */
-public class Routing {
- private static Logger logger = LoggerFactory.getLogger(Routing.class);
-
- private String host;
- private int port;
-
- public Routing(String routing) throws IllegalArgumentException {
- parseRouting(routing);
- }
-
- private void parseRouting(String routing) throws IllegalArgumentException {
- logger.debug("Parse Doris BE address: '{}'.", routing);
- String[] hostPort = routing.split(":");
- if (hostPort.length != 2) {
- logger.error("Format of Doris BE address '{}' is illegal.", routing);
- throw new IllegalArgumentException("Doris BE", routing);
- }
- this.host = hostPort[0];
- try {
- this.port = Integer.parseInt(hostPort[1]);
- } catch (NumberFormatException e) {
- logger.error(PARSE_NUMBER_FAILED_MESSAGE, "Doris BE's port", hostPort[1]);
- throw new IllegalArgumentException("Doris BE", routing);
- }
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- @Override
- public String toString() {
- return "Doris BE{" +
- "host='" + host + '\'' +
- ", port=" + port +
- '}';
- }
-}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
deleted file mode 100644
index 3337637ff3..0000000000
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ /dev/null
@@ -1,309 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.serialization;
-
-import org.apache.arrow.memory.RootAllocator;
-
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TinyIntVector;
-import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.types.Types;
-import org.apache.doris.flink.exception.DorisException;
-import org.apache.doris.flink.rest.models.Schema;
-import org.apache.doris.thrift.TScanBatchResult;
-
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * row batch data container.
- */
-public class RowBatch {
- private static Logger logger = LoggerFactory.getLogger(RowBatch.class);
-
- public static class Row {
- private List