diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp
index 830f4b491f..572be9e733 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -261,7 +261,9 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string&
case TYPE_SMALLINT:
case TYPE_INT: {
if (type_str != "java.lang.Short" && type_str != "java.lang.Integer" &&
- type_str != "java.math.BigDecimal" && type_str != "java.lang.Byte") {
+ type_str != "java.math.BigDecimal" && type_str != "java.lang.Byte" &&
+ type_str != "com.clickhouse.data.value.UnsignedByte" &&
+ type_str != "com.clickhouse.data.value.UnsignedShort") {
return Status::InternalError(error_msg);
}
break;
@@ -269,7 +271,9 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string&
case TYPE_BIGINT:
case TYPE_LARGEINT: {
if (type_str != "java.lang.Long" && type_str != "java.math.BigDecimal" &&
- type_str != "java.math.BigInteger") {
+ type_str != "java.math.BigInteger" &&
+ type_str != "com.clickhouse.data.value.UnsignedInteger" &&
+ type_str != "com.clickhouse.data.value.UnsignedLong") {
return Status::InternalError(error_msg);
}
break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java
index 689667560e..419ce7590a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java
@@ -592,7 +592,7 @@ public class JdbcClient {
|| ckType.startsWith("FixedString")) {
return ScalarType.createStringType();
} else if (ckType.startsWith("DateTime")) {
- return ScalarType.createDatetimeV2Type(0);
+ return ScalarType.createDatetimeV2Type(6);
} else if (ckType.startsWith("Array")) {
String cktype = ckType.substring(6, ckType.length() - 1);
fieldSchema.setDataTypeName(cktype);
@@ -630,7 +630,6 @@ public class JdbcClient {
default:
return Type.UNSUPPORTED;
}
- // Todo(zyk): Wait the JDBC external table support the array type then supported clickhouse array type
}
public Type oracleTypeToDoris(JdbcFieldSchema fieldSchema) {
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index ca7db41b23..8f6ae450d7 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -86,6 +86,12 @@ under the License.
druid
1.2.5
+
+ com.clickhouse
+ clickhouse-jdbc
+ 0.4.2
+ all
+
java-udf
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index c0053fbf43..53a7e64cde 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -22,6 +22,10 @@ import org.apache.doris.thrift.TJdbcOperation;
import org.apache.doris.thrift.TOdbcTableType;
import com.alibaba.druid.pool.DruidDataSource;
+import com.clickhouse.data.value.UnsignedByte;
+import com.clickhouse.data.value.UnsignedInteger;
+import com.clickhouse.data.value.UnsignedLong;
+import com.clickhouse.data.value.UnsignedShort;
import com.google.common.base.Preconditions;
import org.apache.log4j.Logger;
import org.apache.thrift.TDeserializer;
@@ -350,6 +354,23 @@ public class JdbcExecutor {
}
}
+ private void bytePutToByte(Object[] column, boolean isNullable, int numRows, long nullMapAddr,
+ long columnAddr, int startRowForNullable) {
+ if (isNullable) {
+ for (int i = startRowForNullable; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putByte(columnAddr + i, (Byte) column[i]);
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putByte(columnAddr + i, (Byte) column[i]);
+ }
+ }
+ }
+
public void copyBatchTinyIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr,
long columnAddr) {
Object[] column = (Object[]) columnObj;
@@ -366,6 +387,8 @@ public class JdbcExecutor {
integerPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
} else if (column[firstNotNullIndex] instanceof Short) {
shortPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
+ } else if (column[firstNotNullIndex] instanceof Byte) {
+ bytePutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
}
}
@@ -420,6 +443,23 @@ public class JdbcExecutor {
}
}
+ public void clickHouseUInt8ToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr,
+ long columnAddr, int startRowForNullable) {
+ if (isNullable) {
+ for (int i = startRowForNullable; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (short) ((UnsignedByte) column[i]).intValue());
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putShort(columnAddr + (i * 2L), (short) ((UnsignedByte) column[i]).intValue());
+ }
+ }
+ }
+
public void copyBatchSmallIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr,
long columnAddr) {
Object[] column = (Object[]) columnObj;
@@ -436,6 +476,8 @@ public class JdbcExecutor {
integerPutToShort(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
} else if (column[firstNotNullIndex] instanceof Short) {
shortPutToShort(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
+ } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedByte) {
+ clickHouseUInt8ToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
}
}
@@ -474,6 +516,23 @@ public class JdbcExecutor {
}
}
+ public void clickHouseUInt16ToInt(Object[] column, boolean isNullable, int numRows, long nullMapAddr,
+ long columnAddr, int startRowForNullable) {
+ if (isNullable) {
+ for (int i = startRowForNullable; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((UnsignedShort) column[i]).intValue());
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putInt(columnAddr + (i * 4L), ((UnsignedShort) column[i]).intValue());
+ }
+ }
+ }
+
public void copyBatchIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr,
long columnAddr) {
Object[] column = (Object[]) columnObj;
@@ -488,6 +547,8 @@ public class JdbcExecutor {
bigDecimalPutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
} else if (column[firstNotNullIndex] instanceof Integer) {
integerPutToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
+ } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedShort) {
+ clickHouseUInt16ToInt(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
}
}
@@ -525,6 +586,23 @@ public class JdbcExecutor {
}
}
+ private void clickHouseUInt32ToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr,
+ long columnAddr, int startRowForNullable) {
+ if (isNullable) {
+ for (int i = startRowForNullable; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), ((UnsignedInteger) column[i]).longValue());
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 8L), ((UnsignedInteger) column[i]).longValue());
+ }
+ }
+ }
+
public void copyBatchBigIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr,
long columnAddr) {
Object[] column = (Object[]) columnObj;
@@ -539,6 +617,8 @@ public class JdbcExecutor {
bigDecimalPutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
} else if (column[firstNotNullIndex] instanceof Long) {
longPutToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
+ } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedInteger) {
+ clickHouseUInt32ToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
}
}
@@ -591,6 +671,23 @@ public class JdbcExecutor {
}
}
+ private void clickHouseUInt64ToLong(Object[] column, boolean isNullable, int numRows, long nullMapAddr,
+ long columnAddr, int startRowForNullable) {
+ if (isNullable) {
+ for (int i = startRowForNullable; i < numRows; i++) {
+ if (column[i] == null) {
+ UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+ } else {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 16L), ((UnsignedLong) column[i]).longValue());
+ }
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ UdfUtils.UNSAFE.putLong(columnAddr + (i * 16L), ((UnsignedLong) column[i]).longValue());
+ }
+ }
+ }
+
public void copyBatchLargeIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr,
long columnAddr) {
Object[] column = (Object[]) columnObj;
@@ -605,6 +702,8 @@ public class JdbcExecutor {
bigDecimalPutToBigInteger(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
} else if (column[firstNotNullIndex] instanceof BigInteger) {
bigIntegerPutToByte(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
+ } else if (column[firstNotNullIndex] instanceof com.clickhouse.data.value.UnsignedLong) {
+ clickHouseUInt64ToLong(column, isNullable, numRows, nullMapAddr, columnAddr, firstNotNullIndex);
}
}