diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index cdb20f0fb2..04e6571ef5 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -355,6 +355,15 @@ public: return false; } + bool has_hll_slot() const { + for (auto slot : _slots) { + if (slot->type().is_hll_type()) { + return true; + } + } + return false; + } + TupleId id() const { return _id; } std::string debug_string() const; diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h index e4ae6f9461..1e382f78c7 100644 --- a/be/src/runtime/types.h +++ b/be/src/runtime/types.h @@ -235,6 +235,8 @@ struct TypeDescriptor { bool is_array_type() const { return type == TYPE_ARRAY; } + bool is_hll_type() const { return type == TYPE_HLL; } + bool is_bitmap_type() const { return type == TYPE_OBJECT; } bool is_variant_type() const { return type == TYPE_VARIANT; } diff --git a/be/src/vec/data_types/data_type_hll.cpp b/be/src/vec/data_types/data_type_hll.cpp index ee8b3a381b..efb3e735a2 100644 --- a/be/src/vec/data_types/data_type_hll.cpp +++ b/be/src/vec/data_types/data_type_hll.cpp @@ -126,4 +126,16 @@ void DataTypeHLL::to_string(const class doris::vectorized::IColumn& column, size ostr.write(result.c_str(), result.size()); } +Status DataTypeHLL::from_string(ReadBuffer& rb, IColumn* column) const { + auto& data_column = assert_cast(*column); + auto& data = data_column.get_data(); + + HyperLogLog hll; + if (!hll.deserialize(Slice(rb.to_string()))) { + return Status::InternalError("deserialize hll from string fail!"); + } + data.push_back(std::move(hll)); + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_hll.h b/be/src/vec/data_types/data_type_hll.h index 75170242ec..2d397b5832 100644 --- a/be/src/vec/data_types/data_type_hll.h +++ b/be/src/vec/data_types/data_type_hll.h @@ -89,6 +89,7 @@ public: std::string to_string(const IColumn& column, size_t row_num) const override { return "HLL()"; } void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; + Status from_string(ReadBuffer& rb, IColumn* column) const override; Field get_default() const override { LOG(FATAL) << "Method get_default() is not implemented for data type " << get_name(); diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index df1bba2d83..055ea1e90d 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -67,6 +67,7 @@ const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I"; const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I"; const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z"; const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;"; +const char* JDBC_EXECUTOR_GET_BLOCK_WITH_TYPES_SIGNATURE = "(ILjava/lang/Object;)Ljava/util/List;"; const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;"; const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V"; const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V"; @@ -345,6 +346,23 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& ->create_column()); break; } + case TYPE_HLL: { + if (type_str != "java.lang.String") { + return Status::InternalError(error_msg); + } + + _map_column_idx_to_cast_idx_hll[column_index] = _input_hll_string_types.size(); + if (slot_desc->is_nullable()) { + _input_hll_string_types.push_back(make_nullable(std::make_shared())); + } else { + _input_hll_string_types.push_back(std::make_shared()); + } + + str_hll_cols.push_back( + _input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]] + ->create_column()); + break; + } default: { return Status::InternalError(error_msg); } @@ -367,8 +385,45 @@ Status JdbcConnector::get_next(bool* eos, std::vector& columns return Status::OK(); } - jobject block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz, - _executor_get_blocks_id, batch_size); + jobject block_obj; + // if contain HLL column, pass the column type to jni env + if (_tuple_desc->has_hll_slot()) { + auto column_size = _tuple_desc->slots().size(); + // Find ArrayList and Integer + jclass arrayListClass = env->FindClass("java/util/ArrayList"); + jclass integerClass = env->FindClass("java/lang/Integer"); + + // Get method id of the constructor and the add in ArrayList + jmethodID arrayListConstructor = env->GetMethodID(arrayListClass, "", "()V"); + jmethodID arrayListAddMethod = + env->GetMethodID(arrayListClass, "add", "(Ljava/lang/Object;)Z"); + + // Create an ArrayList object + jobject arrayListObject = env->NewObject(arrayListClass, arrayListConstructor); + for (int column_index = 0; column_index < column_size; ++column_index) { + auto slot_desc = _tuple_desc->slots()[column_index]; + if (slot_desc->type().is_hll_type()) { + // Create an Integer object + jobject integerObject = env->NewObject( + integerClass, env->GetMethodID(integerClass, "", "(I)V"), + (int)slot_desc->type().type); + // Add Integer into ArrayList + env->CallBooleanMethod(arrayListObject, arrayListAddMethod, integerObject); + + } else { + jobject integerObject = env->NewObject( + integerClass, env->GetMethodID(integerClass, "", "(I)V"), 0); + env->CallBooleanMethod(arrayListObject, arrayListAddMethod, integerObject); + } + } + + block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz, + _executor_get_blocks_new_id, batch_size, + arrayListObject); + } else { + block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz, + _executor_get_blocks_id, batch_size); + } RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); @@ -390,6 +445,8 @@ Status JdbcConnector::get_next(bool* eos, std::vector& columns //here need to cast string to array type if (slot_desc->type().is_array_type()) { _cast_string_to_array(slot_desc, block, column_index, num_rows); + } else if (slot_desc->type().is_hll_type()) { + _cast_string_to_hll(slot_desc, block, column_index, num_rows); } materialized_column_index++; } @@ -570,6 +627,26 @@ Status JdbcConnector::_convert_batch_result_set(JNIEnv* env, jobject jcolumn_dat address[1], chars_addres); break; } + case TYPE_HLL: { + str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]]->resize(num_rows); + if (column_is_nullable) { + auto* nullable_column = reinterpret_cast( + str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]].get()); + auto& null_map = nullable_column->get_null_map_data(); + memset(null_map.data(), 0, num_rows); + address[0] = reinterpret_cast(null_map.data()); + col_ptr = &nullable_column->get_nested_column(); + } else { + col_ptr = str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]].get(); + } + auto column_string = reinterpret_cast(col_ptr); + address[1] = reinterpret_cast(column_string->get_offsets().data()); + auto chars_addres = reinterpret_cast(&column_string->get_chars()); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_hll_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1], chars_addres); + break; + } default: { const std::string& error_msg = fmt::format("Fail to convert jdbc value to {} on column: {}", @@ -625,6 +702,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { "(Ljava/lang/Object;ZIJJJ)V", _executor_get_string_result)); RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchArrayResult", "(Ljava/lang/Object;ZIJJJ)V", _executor_get_array_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchHllResult", "(Ljava/lang/Object;ZIJJJ)V", + _executor_get_hll_result)); RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchCharResult", "(Ljava/lang/Object;ZIJJJZ)V", _executor_get_char_result)); @@ -650,6 +729,9 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE, _executor_get_blocks_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", + JDBC_EXECUTOR_GET_BLOCK_WITH_TYPES_SIGNATURE, + _executor_get_blocks_new_id)); RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", "(I)Ljava/lang/Object;", _executor_get_list_id)); RETURN_IF_ERROR(register_id(_executor_string_clazz, "getBytes", "(Ljava/lang/String;)[B", @@ -668,6 +750,41 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { return Status::OK(); } +Status JdbcConnector::_cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block, + int column_index, int rows) { + DataTypePtr _target_data_type = slot_desc->get_data_type_ptr(); + std::string _target_data_type_name = _target_data_type->get_name(); + DataTypePtr _cast_param_data_type = _target_data_type; + ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1); + + ColumnsWithTypeAndName argument_template; + argument_template.reserve(2); + argument_template.emplace_back( + std::move(str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]]), + _input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]], + "java.sql.String"); + argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name); + FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function( + "CAST", argument_template, make_nullable(_target_data_type)); + + Block cast_block(argument_template); + int result_idx = cast_block.columns(); + cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"}); + func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows); + + auto res_col = cast_block.get_by_position(result_idx).column; + if (_target_data_type->is_nullable()) { + block->replace_by_position(column_index, res_col); + } else { + auto nested_ptr = reinterpret_cast(res_col.get()) + ->get_nested_column_ptr(); + block->replace_by_position(column_index, nested_ptr); + } + str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]] = + _input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]]->create_column(); + return Status::OK(); +} + Status JdbcConnector::_cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows) { DataTypePtr _target_data_type = slot_desc->get_data_type_ptr(); diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index fc21078305..2e25eb3353 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -102,6 +102,8 @@ private: std::string _jobject_to_string(JNIEnv* env, jobject jobj); Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows); + Status _cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block, int column_index, + int rows); Status _convert_batch_result_set(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr, int num_rows, int column_index); @@ -120,6 +122,7 @@ private: jmethodID _executor_has_next_id; jmethodID _executor_block_rows_id; jmethodID _executor_get_blocks_id; + jmethodID _executor_get_blocks_new_id; jmethodID _executor_get_boolean_result; jmethodID _executor_get_tinyint_result; jmethodID _executor_get_smallint_result; @@ -139,6 +142,7 @@ private: jmethodID _executor_get_decimal64_result; jmethodID _executor_get_decimal128_result; jmethodID _executor_get_array_result; + jmethodID _executor_get_hll_result; jmethodID _executor_get_types_id; jmethodID _executor_close_id; jmethodID _executor_get_list_id; @@ -152,6 +156,10 @@ private: std::vector str_array_cols; // for array type to save data like big string [1,2,3] + std::map _map_column_idx_to_cast_idx_hll; + std::vector _input_hll_string_types; + std::vector str_hll_cols; // for hll type to save data like string + JdbcStatistic _jdbc_statistic; }; diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h index a716a01a75..0bf44e43cd 100644 --- a/be/src/vec/functions/function_cast.h +++ b/be/src/vec/functions/function_cast.h @@ -70,6 +70,7 @@ #include "vec/data_types/data_type_date.h" #include "vec/data_types/data_type_date_time.h" #include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_hll.h" #include "vec/data_types/data_type_jsonb.h" #include "vec/data_types/data_type_map.h" #include "vec/data_types/data_type_nullable.h" @@ -1638,6 +1639,25 @@ private: return create_unsupport_wrapper(error_msg); } + WrapperType create_hll_wrapper(FunctionContext* context, const DataTypePtr& from_type_untyped, + const DataTypeHLL& to_type) const { + /// Conversion from String through parsing. + if (check_and_get_data_type(from_type_untyped.get())) { + return &ConvertImplGenericFromString::execute; + } + + //TODO if from is not string, it must be HLL? + const auto* from_type = check_and_get_data_type(from_type_untyped.get()); + + if (!from_type) { + return create_unsupport_wrapper( + "CAST AS HLL can only be performed between HLL, String " + "types"); + } + + return nullptr; + } + WrapperType create_array_wrapper(FunctionContext* context, const DataTypePtr& from_type_untyped, const DataTypeArray& to_type) const { /// Conversion from String through parsing. @@ -2021,6 +2041,9 @@ private: static_cast(*to_type)); case TypeIndex::Map: return create_map_wrapper(from_type, static_cast(*to_type)); + case TypeIndex::HLL: + return create_hll_wrapper(context, from_type, + static_cast(*to_type)); default: break; } diff --git a/docs/en/docs/lakehouse/multi-catalog/jdbc.md b/docs/en/docs/lakehouse/multi-catalog/jdbc.md index c61ca75845..d77902ffac 100644 --- a/docs/en/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/en/docs/lakehouse/multi-catalog/jdbc.md @@ -421,6 +421,7 @@ The transaction mechanism ensures the atomicity of data writing to JDBC External | VARCHAR | VARCHAR | | | STRING | STRING | | | TEXT | STRING | | +| HLL | HLL | `return_object_data_as_binary=true` is required when query HLL column | |Other| UNSUPPORTED | ### SAP HANA diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md index a73b70170b..e457b94b42 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md @@ -420,6 +420,7 @@ set enable_odbc_transcation = true; | VARCHAR | VARCHAR | | | STRING | STRING | | | TEXT | STRING | | +| HLL | HLL | 查询HLL需要设置`return_object_data_as_binary=true` | |Other| UNSUPPORTED | ### SAP HANA diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcMySQLClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcMySQLClient.java index d3c088cca7..377b92843b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcMySQLClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcMySQLClient.java @@ -20,6 +20,7 @@ package org.apache.doris.external.jdbc; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; +import org.apache.doris.common.util.Util; import avro.shaded.com.google.common.collect.Lists; @@ -27,7 +28,10 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Consumer; public class JdbcMySQLClient extends JdbcClient { @@ -76,6 +80,103 @@ public class JdbcMySQLClient extends JdbcClient { return databaseMetaData.getColumns(schemaName, null, tableName, null); } + /** + * get all columns like DatabaseMetaData.getColumns in mysql-jdbc-connector + */ + private Map getJdbcColumnsTypeInfo(String dbName, String tableName) { + Connection conn = getConnection(); + ResultSet resultSet = null; + Map fieldtoType = new HashMap(); + + StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM "); + queryBuf.append(tableName); + queryBuf.append(" FROM "); + queryBuf.append(dbName); + try (Statement stmt = conn.createStatement()) { + resultSet = stmt.executeQuery(queryBuf.toString()); + while (resultSet.next()) { + // get column name + String fieldName = resultSet.getString("Field"); + // get original type name + String typeName = resultSet.getString("Type"); + fieldtoType.put(fieldName, typeName); + } + } catch (SQLException e) { + throw new JdbcClientException("failed to get column list from jdbc for table %s:%s", tableName, + Util.getRootCauseMessage(e)); + } finally { + close(resultSet, conn); + } + + return fieldtoType; + } + + /** + * get all columns of one table + */ + @Override + public List getJdbcColumnsInfo(String dbName, String tableName) { + Connection conn = getConnection(); + ResultSet rs = null; + List tableSchema = com.google.common.collect.Lists.newArrayList(); + // if isLowerCaseTableNames == true, tableName is lower case + // but databaseMetaData.getColumns() is case sensitive + if (isLowerCaseTableNames) { + dbName = lowerDBToRealDB.get(dbName); + tableName = lowerTableToRealTable.get(tableName); + } + try { + DatabaseMetaData databaseMetaData = conn.getMetaData(); + String catalogName = getCatalogName(conn); + tableName = modifyTableNameIfNecessary(tableName); + rs = getColumns(databaseMetaData, catalogName, dbName, tableName); + boolean needGetDorisColumns = true; + Map mapFieldtoType = null; + while (rs.next()) { + if (isTableModified(tableName, rs.getString("TABLE_NAME"))) { + continue; + } + JdbcFieldSchema field = new JdbcFieldSchema(); + field.setColumnName(rs.getString("COLUMN_NAME")); + field.setDataType(rs.getInt("DATA_TYPE")); + + // in mysql-jdbc-connector-8.0.*, TYPE_NAME of the HLL column in doris will be "UNKNOWN" + // in mysql-jdbc-connector-5.1.*, TYPE_NAME of the HLL column in doris will be "HLL" + field.setDataTypeName(rs.getString("TYPE_NAME")); + if (rs.getString("TYPE_NAME").equalsIgnoreCase("UNKNOWN")) { + if (needGetDorisColumns) { + mapFieldtoType = getJdbcColumnsTypeInfo(dbName, tableName); + needGetDorisColumns = false; + } + + if (mapFieldtoType != null) { + field.setDataTypeName(mapFieldtoType.get(rs.getString("COLUMN_NAME"))); + } + } + + field.setColumnSize(rs.getInt("COLUMN_SIZE")); + field.setDecimalDigits(rs.getInt("DECIMAL_DIGITS")); + field.setNumPrecRadix(rs.getInt("NUM_PREC_RADIX")); + /* + Whether it is allowed to be NULL + 0 (columnNoNulls) + 1 (columnNullable) + 2 (columnNullableUnknown) + */ + field.setAllowNull(rs.getInt("NULLABLE") != 0); + field.setRemarks(rs.getString("REMARKS")); + field.setCharOctetLength(rs.getInt("CHAR_OCTET_LENGTH")); + tableSchema.add(field); + } + } catch (SQLException e) { + throw new JdbcClientException("failed to get table name list from jdbc for table %s:%s", e, tableName, + Util.getRootCauseMessage(e)); + } finally { + close(rs, conn); + } + return tableSchema; + } + @Override protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { // For mysql type: "INT UNSIGNED": @@ -170,6 +271,8 @@ public class JdbcMySQLClient extends JdbcClient { case "VARBINARY": case "ENUM": return ScalarType.createStringType(); + case "HLL": + return ScalarType.createHllType(); default: return Type.UNSUPPORTED; } diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index ebeb68085b..4f8eb2199d 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -328,6 +328,31 @@ public class JdbcExecutor { } } + public List getBlock(int batchSize, Object colsArray) throws UdfRuntimeException { + try { + ArrayList colsTypes = (ArrayList) colsArray; + Integer[] colArray = new Integer[colsTypes.size()]; + colArray = colsTypes.toArray(colArray); + int columnCount = resultSetMetaData.getColumnCount(); + curBlockRows = 0; + do { + for (int i = 0; i < columnCount; ++i) { + // colArray[i] > 0, means the type is Hll/Bitmap, we should read it with getBytes + // instead of getObject, as Hll/Bitmap in JDBC will map to String by default. + if (colArray[i] > 0) { + block.get(i)[curBlockRows] = resultSet.getBytes(i + 1); + } else { + block.get(i)[curBlockRows] = resultSet.getObject(i + 1); + } + } + curBlockRows++; + } while (curBlockRows < batchSize && resultSet.next()); + } catch (SQLException e) { + throw new UdfRuntimeException("get next block failed: ", e); + } + return block; + } + public List getBlock(int batchSize) throws UdfRuntimeException { try { int columnCount = resultSetMetaData.getColumnCount(); @@ -1273,6 +1298,19 @@ public class JdbcExecutor { } } + public void copyBatchHllResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long offsetsAddr, long charsAddr) { + Object[] column = (Object[]) columnObj; + int firstNotNullIndex = 0; + if (isNullable) { + firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr); + } + if (firstNotNullIndex == numRows) { + return; + } + hllPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr); + } + public void copyBatchDateTimeV2Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, long columnAddr) throws SQLException { Object[] column = (Object[]) columnObj; @@ -1315,6 +1353,43 @@ public class JdbcExecutor { } } + private void hllPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, + long offsetsAddr, long charsAddr) { + int[] offsets = new int[numRows]; + byte[][] byteRes = new byte[numRows][]; + int offset = 0; + if (isNullable == true) { + // Here can not loop from startRowForNullable, + // because byteRes will be used later + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + byteRes[i] = emptyBytes; + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + byteRes[i] = (byte[]) column[i]; + } + offset += byteRes[i].length; + offsets[i] = offset; + } + } else { + for (int i = 0; i < numRows; i++) { + byteRes[i] = (byte[]) column[i]; + offset += byteRes[i].length; + offsets[i] = offset; + } + } + byte[] bytes = new byte[offsets[numRows - 1]]; + long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]); + int dst = 0; + for (int i = 0; i < numRows; i++) { + for (int j = 0; j < byteRes[i].length; j++) { + bytes[dst++] = byteRes[i][j]; + } + } + UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); + UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); + } + private void objectPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr, long offsetsAddr, long charsAddr) { int[] offsets = new int[numRows]; diff --git a/regression-test/data/jdbc_catalog_p0/test_doris_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_doris_jdbc_catalog.out new file mode 100644 index 0000000000..cb78c08955 --- /dev/null +++ b/regression-test/data/jdbc_catalog_p0/test_doris_jdbc_catalog.out @@ -0,0 +1,57 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +internal + +-- !sql -- +internal + +-- !ex_tb1 -- +1 doris1 +2 doris2 +3 doris3 +4 doris4 +5 doris5 +6 doris6 + +-- !sql -- +internal + +-- !sql -- +doris_jdbc_catalog + +-- !ex_tb1 -- +1 doris1 +2 doris2 +3 doris3 +4 doris4 +5 doris5 +6 doris6 + +-- !tb1 -- +1 1 +2 1 +3 1 +4 1 +5 1 +6 1 + +-- !sql -- +internal + +-- !sql -- +doris_jdbc_catalog + +-- !tb2 -- +1 1 +2 1 +3 1 +4 1 +5 1 +6 1 + +-- !sql -- +doris_jdbc_catalog + +-- !sql -- +internal + diff --git a/regression-test/suites/jdbc_catalog_p0/test_doris_jdbc_catalog.groovy b/regression-test/suites/jdbc_catalog_p0/test_doris_jdbc_catalog.groovy new file mode 100644 index 0000000000..6ec3e77948 --- /dev/null +++ b/regression-test/suites/jdbc_catalog_p0/test_doris_jdbc_catalog.groovy @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_doris_jdbc_catalog", "p0") { + qt_sql """select current_catalog()""" + + String jdbcUrl = context.config.jdbcUrl + "&sessionVariables=return_object_data_as_binary=true" + String jdbcUser = context.config.jdbcUser + String jdbcPassword = context.config.jdbcPassword + String driver_url = "https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar" + + String resource_name = "jdbc_resource_catalog_doris" + String catalog_name = "doris_jdbc_catalog"; + String internal_db_name = "regression_test_jdbc_catalog_p0"; + String doris_port = 9030; + String inDorisTable = "doris_in_tb"; + String hllTable = "bowen_hll_test" + + qt_sql """select current_catalog()""" + sql """drop catalog if exists ${catalog_name} """ + + sql """ CREATE CATALOG `${catalog_name}` PROPERTIES ( + "user" = "${jdbcUser}", + "type" = "jdbc", + "password" = "${jdbcPassword}", + "jdbc_url" = "${jdbcUrl}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.jdbc.Driver" + )""" + + sql """ drop table if exists ${inDorisTable} """ + sql """ + CREATE TABLE ${inDorisTable} ( + `id` INT NULL COMMENT "主键id", + `name` string NULL COMMENT "名字" + ) DISTRIBUTED BY HASH(id) BUCKETS 10 + PROPERTIES("replication_num" = "1"); + """ + sql """ insert into ${inDorisTable} values (1, 'doris1')""" + sql """ insert into ${inDorisTable} values (2, 'doris2')""" + sql """ insert into ${inDorisTable} values (3, 'doris3')""" + sql """ insert into ${inDorisTable} values (4, 'doris4')""" + sql """ insert into ${inDorisTable} values (5, 'doris5')""" + sql """ insert into ${inDorisTable} values (6, 'doris6')""" + + order_qt_ex_tb1 """ select * from internal.${internal_db_name}.${inDorisTable} order by id; """ + + qt_sql """select current_catalog()""" + sql "switch ${catalog_name}" + qt_sql """select current_catalog()""" + sql """ use ${internal_db_name}""" + order_qt_ex_tb1 """ select * from ${inDorisTable} order by id; """ + + // test hll query + sql "switch internal" + sql "use ${internal_db_name}" + + sql """ drop table if exists ${hllTable} """ + sql """ CREATE TABLE `${hllTable}` ( + `pin_id` bigint(20) NOT NULL COMMENT "", + `pv_date` datev2 NOT NULL COMMENT "", + `user_log_acct` hll HLL_UNION NULL COMMENT "" + ) ENGINE=OLAP + AGGREGATE KEY(`pin_id`, `pv_date`) + COMMENT "OLAP" + PARTITION BY RANGE(`pv_date`) + (PARTITION pbefore201910 VALUES [('1900-01-01'), ('2019-10-01')), + PARTITION p201910 VALUES [('2019-10-01'), ('2019-11-01')), + PARTITION p201911 VALUES [('2019-11-01'), ('2019-12-01')), + PARTITION p201912 VALUES [('2019-12-01'), ('2020-01-01')), + PARTITION p202001 VALUES [('2020-01-01'), ('2020-02-01')), + PARTITION p202002 VALUES [('2020-02-01'), ('2020-03-01')), + PARTITION p202003 VALUES [('2020-03-01'), ('2020-04-01')), + PARTITION p202004 VALUES [('2020-04-01'), ('2020-05-01')), + PARTITION p202005 VALUES [('2020-05-01'), ('2020-06-01')), + PARTITION p202006 VALUES [('2020-06-01'), ('2020-07-01')), + PARTITION p202007 VALUES [('2020-07-01'), ('2020-08-01')), + PARTITION p202008 VALUES [('2020-08-01'), ('2020-09-01')), + PARTITION p202009 VALUES [('2020-09-01'), ('2020-10-01')), + PARTITION p202010 VALUES [('2020-10-01'), ('2020-11-01')), + PARTITION p202011 VALUES [('2020-11-01'), ('2020-12-01')), + PARTITION p202012 VALUES [('2020-12-01'), ('2021-01-01')), + PARTITION p202101 VALUES [('2021-01-01'), ('2021-02-01')), + PARTITION p202102 VALUES [('2021-02-01'), ('2021-03-01')), + PARTITION p202103 VALUES [('2021-03-01'), ('2021-04-01')), + PARTITION p202104 VALUES [('2021-04-01'), ('2021-05-01')), + PARTITION p202105 VALUES [('2021-05-01'), ('2021-06-01')), + PARTITION p202106 VALUES [('2021-06-01'), ('2021-07-01')), + PARTITION p202107 VALUES [('2021-07-01'), ('2021-08-01')), + PARTITION p202108 VALUES [('2021-08-01'), ('2021-09-01')), + PARTITION p202109 VALUES [('2021-09-01'), ('2021-10-01')), + PARTITION p202110 VALUES [('2021-10-01'), ('2021-11-01')), + PARTITION p202111 VALUES [('2021-11-01'), ('2021-12-01')), + PARTITION p202112 VALUES [('2021-12-01'), ('2022-01-01')), + PARTITION p202201 VALUES [('2022-01-01'), ('2022-02-01')), + PARTITION p202202 VALUES [('2022-02-01'), ('2022-03-01')), + PARTITION p202203 VALUES [('2022-03-01'), ('2022-04-01')), + PARTITION p202204 VALUES [('2022-04-01'), ('2022-05-01')), + PARTITION p202205 VALUES [('2022-05-01'), ('2022-06-01')), + PARTITION p202206 VALUES [('2022-06-01'), ('2022-07-01')), + PARTITION p202207 VALUES [('2022-07-01'), ('2022-08-01')), + PARTITION p202208 VALUES [('2022-08-01'), ('2022-09-01')), + PARTITION p202209 VALUES [('2022-09-01'), ('2022-10-01')), + PARTITION p202210 VALUES [('2022-10-01'), ('2022-11-01')), + PARTITION p202211 VALUES [('2022-11-01'), ('2022-12-01')), + PARTITION p202212 VALUES [('2022-12-01'), ('2023-01-01')), + PARTITION p202301 VALUES [('2023-01-01'), ('2023-02-01')), + PARTITION p202302 VALUES [('2023-02-01'), ('2023-03-01')), + PARTITION p202303 VALUES [('2023-03-01'), ('2023-04-01')), + PARTITION p202304 VALUES [('2023-04-01'), ('2023-05-01')), + PARTITION p202305 VALUES [('2023-05-01'), ('2023-06-01')), + PARTITION p202306 VALUES [('2023-06-01'), ('2023-07-01')), + PARTITION p202307 VALUES [('2023-07-01'), ('2023-08-01')), + PARTITION p202308 VALUES [('2023-08-01'), ('2023-09-01')), + PARTITION p202309 VALUES [('2023-09-01'), ('2023-10-01'))) + DISTRIBUTED BY HASH(`pin_id`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "DEFAULT" + ); """ + + sql """ insert into ${hllTable} values(1, "2023-01-01", hll_hash("1"));""" + sql """ insert into ${hllTable} values(2, "2023-01-02", hll_hash("2"));""" + sql """ insert into ${hllTable} values(3, "2023-01-03", hll_hash("3"));""" + sql """ insert into ${hllTable} values(4, "2023-01-04", hll_hash("4"));""" + sql """ insert into ${hllTable} values(5, "2023-01-05", hll_hash("5"));""" + sql """ insert into ${hllTable} values(6, "2023-01-06", hll_hash("6"));""" + + sql """ set return_object_data_as_binary=true """ + order_qt_tb1 """ select pin_id, hll_union_agg(user_log_acct) from ${hllTable} group by pin_id; """ + + // query with jdbc external table + sql """ refresh catalog ${catalog_name} """ + qt_sql """select current_catalog()""" + sql """ switch ${catalog_name} """ + qt_sql """select current_catalog()""" + sql """ use ${internal_db_name} """ + order_qt_tb2 """ select pin_id, hll_union_agg(user_log_acct) from ${catalog_name}.${internal_db_name}.${hllTable} group by pin_id; """ + + //clean + qt_sql """select current_catalog()""" + sql "switch internal" + qt_sql """select current_catalog()""" + sql "use ${internal_db_name}" + sql """ drop table if exists ${inDorisTable} """ + sql """ drop table if exists ${hllTable} """ + +}