[Feature] (Multi-Catalog) support query hll column in doris jdbc table - part 1 (#19413)
Issue Number: close #17895
This commit is contained in:
@ -355,6 +355,15 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
bool has_hll_slot() const {
|
||||
for (auto slot : _slots) {
|
||||
if (slot->type().is_hll_type()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
TupleId id() const { return _id; }
|
||||
|
||||
std::string debug_string() const;
|
||||
|
||||
@ -235,6 +235,8 @@ struct TypeDescriptor {
|
||||
|
||||
bool is_array_type() const { return type == TYPE_ARRAY; }
|
||||
|
||||
bool is_hll_type() const { return type == TYPE_HLL; }
|
||||
|
||||
bool is_bitmap_type() const { return type == TYPE_OBJECT; }
|
||||
|
||||
bool is_variant_type() const { return type == TYPE_VARIANT; }
|
||||
|
||||
@ -126,4 +126,16 @@ void DataTypeHLL::to_string(const class doris::vectorized::IColumn& column, size
|
||||
ostr.write(result.c_str(), result.size());
|
||||
}
|
||||
|
||||
Status DataTypeHLL::from_string(ReadBuffer& rb, IColumn* column) const {
|
||||
auto& data_column = assert_cast<ColumnHLL&>(*column);
|
||||
auto& data = data_column.get_data();
|
||||
|
||||
HyperLogLog hll;
|
||||
if (!hll.deserialize(Slice(rb.to_string()))) {
|
||||
return Status::InternalError("deserialize hll from string fail!");
|
||||
}
|
||||
data.push_back(std::move(hll));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -89,6 +89,7 @@ public:
|
||||
|
||||
std::string to_string(const IColumn& column, size_t row_num) const override { return "HLL()"; }
|
||||
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
|
||||
Status from_string(ReadBuffer& rb, IColumn* column) const override;
|
||||
|
||||
Field get_default() const override {
|
||||
LOG(FATAL) << "Method get_default() is not implemented for data type " << get_name();
|
||||
|
||||
@ -67,6 +67,7 @@ const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I";
|
||||
const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I";
|
||||
const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
|
||||
const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;";
|
||||
const char* JDBC_EXECUTOR_GET_BLOCK_WITH_TYPES_SIGNATURE = "(ILjava/lang/Object;)Ljava/util/List;";
|
||||
const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;";
|
||||
const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
|
||||
const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V";
|
||||
@ -345,6 +346,23 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string&
|
||||
->create_column());
|
||||
break;
|
||||
}
|
||||
case TYPE_HLL: {
|
||||
if (type_str != "java.lang.String") {
|
||||
return Status::InternalError(error_msg);
|
||||
}
|
||||
|
||||
_map_column_idx_to_cast_idx_hll[column_index] = _input_hll_string_types.size();
|
||||
if (slot_desc->is_nullable()) {
|
||||
_input_hll_string_types.push_back(make_nullable(std::make_shared<DataTypeString>()));
|
||||
} else {
|
||||
_input_hll_string_types.push_back(std::make_shared<DataTypeString>());
|
||||
}
|
||||
|
||||
str_hll_cols.push_back(
|
||||
_input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]]
|
||||
->create_column());
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
return Status::InternalError(error_msg);
|
||||
}
|
||||
@ -367,8 +385,45 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
jobject block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz,
|
||||
_executor_get_blocks_id, batch_size);
|
||||
jobject block_obj;
|
||||
// if contain HLL column, pass the column type to jni env
|
||||
if (_tuple_desc->has_hll_slot()) {
|
||||
auto column_size = _tuple_desc->slots().size();
|
||||
// Find ArrayList and Integer
|
||||
jclass arrayListClass = env->FindClass("java/util/ArrayList");
|
||||
jclass integerClass = env->FindClass("java/lang/Integer");
|
||||
|
||||
// Get method id of the constructor and the add in ArrayList
|
||||
jmethodID arrayListConstructor = env->GetMethodID(arrayListClass, "<init>", "()V");
|
||||
jmethodID arrayListAddMethod =
|
||||
env->GetMethodID(arrayListClass, "add", "(Ljava/lang/Object;)Z");
|
||||
|
||||
// Create an ArrayList object
|
||||
jobject arrayListObject = env->NewObject(arrayListClass, arrayListConstructor);
|
||||
for (int column_index = 0; column_index < column_size; ++column_index) {
|
||||
auto slot_desc = _tuple_desc->slots()[column_index];
|
||||
if (slot_desc->type().is_hll_type()) {
|
||||
// Create an Integer object
|
||||
jobject integerObject = env->NewObject(
|
||||
integerClass, env->GetMethodID(integerClass, "<init>", "(I)V"),
|
||||
(int)slot_desc->type().type);
|
||||
// Add Integer into ArrayList
|
||||
env->CallBooleanMethod(arrayListObject, arrayListAddMethod, integerObject);
|
||||
|
||||
} else {
|
||||
jobject integerObject = env->NewObject(
|
||||
integerClass, env->GetMethodID(integerClass, "<init>", "(I)V"), 0);
|
||||
env->CallBooleanMethod(arrayListObject, arrayListAddMethod, integerObject);
|
||||
}
|
||||
}
|
||||
|
||||
block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz,
|
||||
_executor_get_blocks_new_id, batch_size,
|
||||
arrayListObject);
|
||||
} else {
|
||||
block_obj = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz,
|
||||
_executor_get_blocks_id, batch_size);
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
|
||||
|
||||
@ -390,6 +445,8 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
|
||||
//here need to cast string to array type
|
||||
if (slot_desc->type().is_array_type()) {
|
||||
_cast_string_to_array(slot_desc, block, column_index, num_rows);
|
||||
} else if (slot_desc->type().is_hll_type()) {
|
||||
_cast_string_to_hll(slot_desc, block, column_index, num_rows);
|
||||
}
|
||||
materialized_column_index++;
|
||||
}
|
||||
@ -570,6 +627,26 @@ Status JdbcConnector::_convert_batch_result_set(JNIEnv* env, jobject jcolumn_dat
|
||||
address[1], chars_addres);
|
||||
break;
|
||||
}
|
||||
case TYPE_HLL: {
|
||||
str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]]->resize(num_rows);
|
||||
if (column_is_nullable) {
|
||||
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
|
||||
str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]].get());
|
||||
auto& null_map = nullable_column->get_null_map_data();
|
||||
memset(null_map.data(), 0, num_rows);
|
||||
address[0] = reinterpret_cast<int64_t>(null_map.data());
|
||||
col_ptr = &nullable_column->get_nested_column();
|
||||
} else {
|
||||
col_ptr = str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]].get();
|
||||
}
|
||||
auto column_string = reinterpret_cast<vectorized::ColumnString*>(col_ptr);
|
||||
address[1] = reinterpret_cast<int64_t>(column_string->get_offsets().data());
|
||||
auto chars_addres = reinterpret_cast<int64_t>(&column_string->get_chars());
|
||||
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_hll_result,
|
||||
jcolumn_data, column_is_nullable, num_rows, address[0],
|
||||
address[1], chars_addres);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
const std::string& error_msg =
|
||||
fmt::format("Fail to convert jdbc value to {} on column: {}",
|
||||
@ -625,6 +702,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
|
||||
"(Ljava/lang/Object;ZIJJJ)V", _executor_get_string_result));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchArrayResult",
|
||||
"(Ljava/lang/Object;ZIJJJ)V", _executor_get_array_result));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchHllResult", "(Ljava/lang/Object;ZIJJJ)V",
|
||||
_executor_get_hll_result));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchCharResult",
|
||||
"(Ljava/lang/Object;ZIJJJZ)V", _executor_get_char_result));
|
||||
|
||||
@ -650,6 +729,9 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
|
||||
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE,
|
||||
_executor_get_blocks_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock",
|
||||
JDBC_EXECUTOR_GET_BLOCK_WITH_TYPES_SIGNATURE,
|
||||
_executor_get_blocks_new_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", "(I)Ljava/lang/Object;",
|
||||
_executor_get_list_id));
|
||||
RETURN_IF_ERROR(register_id(_executor_string_clazz, "getBytes", "(Ljava/lang/String;)[B",
|
||||
@ -668,6 +750,41 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status JdbcConnector::_cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block,
|
||||
int column_index, int rows) {
|
||||
DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
|
||||
std::string _target_data_type_name = _target_data_type->get_name();
|
||||
DataTypePtr _cast_param_data_type = _target_data_type;
|
||||
ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1);
|
||||
|
||||
ColumnsWithTypeAndName argument_template;
|
||||
argument_template.reserve(2);
|
||||
argument_template.emplace_back(
|
||||
std::move(str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]]),
|
||||
_input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]],
|
||||
"java.sql.String");
|
||||
argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
|
||||
FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
|
||||
"CAST", argument_template, make_nullable(_target_data_type));
|
||||
|
||||
Block cast_block(argument_template);
|
||||
int result_idx = cast_block.columns();
|
||||
cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
|
||||
func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows);
|
||||
|
||||
auto res_col = cast_block.get_by_position(result_idx).column;
|
||||
if (_target_data_type->is_nullable()) {
|
||||
block->replace_by_position(column_index, res_col);
|
||||
} else {
|
||||
auto nested_ptr = reinterpret_cast<const vectorized::ColumnNullable*>(res_col.get())
|
||||
->get_nested_column_ptr();
|
||||
block->replace_by_position(column_index, nested_ptr);
|
||||
}
|
||||
str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]] =
|
||||
_input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]]->create_column();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status JdbcConnector::_cast_string_to_array(const SlotDescriptor* slot_desc, Block* block,
|
||||
int column_index, int rows) {
|
||||
DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
|
||||
|
||||
@ -102,6 +102,8 @@ private:
|
||||
std::string _jobject_to_string(JNIEnv* env, jobject jobj);
|
||||
Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index,
|
||||
int rows);
|
||||
Status _cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block, int column_index,
|
||||
int rows);
|
||||
Status _convert_batch_result_set(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc,
|
||||
vectorized::IColumn* column_ptr, int num_rows,
|
||||
int column_index);
|
||||
@ -120,6 +122,7 @@ private:
|
||||
jmethodID _executor_has_next_id;
|
||||
jmethodID _executor_block_rows_id;
|
||||
jmethodID _executor_get_blocks_id;
|
||||
jmethodID _executor_get_blocks_new_id;
|
||||
jmethodID _executor_get_boolean_result;
|
||||
jmethodID _executor_get_tinyint_result;
|
||||
jmethodID _executor_get_smallint_result;
|
||||
@ -139,6 +142,7 @@ private:
|
||||
jmethodID _executor_get_decimal64_result;
|
||||
jmethodID _executor_get_decimal128_result;
|
||||
jmethodID _executor_get_array_result;
|
||||
jmethodID _executor_get_hll_result;
|
||||
jmethodID _executor_get_types_id;
|
||||
jmethodID _executor_close_id;
|
||||
jmethodID _executor_get_list_id;
|
||||
@ -152,6 +156,10 @@ private:
|
||||
std::vector<MutableColumnPtr>
|
||||
str_array_cols; // for array type to save data like big string [1,2,3]
|
||||
|
||||
std::map<int, int> _map_column_idx_to_cast_idx_hll;
|
||||
std::vector<DataTypePtr> _input_hll_string_types;
|
||||
std::vector<MutableColumnPtr> str_hll_cols; // for hll type to save data like string
|
||||
|
||||
JdbcStatistic _jdbc_statistic;
|
||||
};
|
||||
|
||||
|
||||
@ -70,6 +70,7 @@
|
||||
#include "vec/data_types/data_type_date.h"
|
||||
#include "vec/data_types/data_type_date_time.h"
|
||||
#include "vec/data_types/data_type_decimal.h"
|
||||
#include "vec/data_types/data_type_hll.h"
|
||||
#include "vec/data_types/data_type_jsonb.h"
|
||||
#include "vec/data_types/data_type_map.h"
|
||||
#include "vec/data_types/data_type_nullable.h"
|
||||
@ -1638,6 +1639,25 @@ private:
|
||||
return create_unsupport_wrapper(error_msg);
|
||||
}
|
||||
|
||||
WrapperType create_hll_wrapper(FunctionContext* context, const DataTypePtr& from_type_untyped,
|
||||
const DataTypeHLL& to_type) const {
|
||||
/// Conversion from String through parsing.
|
||||
if (check_and_get_data_type<DataTypeString>(from_type_untyped.get())) {
|
||||
return &ConvertImplGenericFromString<ColumnString>::execute;
|
||||
}
|
||||
|
||||
//TODO if from is not string, it must be HLL?
|
||||
const auto* from_type = check_and_get_data_type<DataTypeHLL>(from_type_untyped.get());
|
||||
|
||||
if (!from_type) {
|
||||
return create_unsupport_wrapper(
|
||||
"CAST AS HLL can only be performed between HLL, String "
|
||||
"types");
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
WrapperType create_array_wrapper(FunctionContext* context, const DataTypePtr& from_type_untyped,
|
||||
const DataTypeArray& to_type) const {
|
||||
/// Conversion from String through parsing.
|
||||
@ -2021,6 +2041,9 @@ private:
|
||||
static_cast<const DataTypeStruct&>(*to_type));
|
||||
case TypeIndex::Map:
|
||||
return create_map_wrapper(from_type, static_cast<const DataTypeMap&>(*to_type));
|
||||
case TypeIndex::HLL:
|
||||
return create_hll_wrapper(context, from_type,
|
||||
static_cast<const DataTypeHLL&>(*to_type));
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
@ -421,6 +421,7 @@ The transaction mechanism ensures the atomicity of data writing to JDBC External
|
||||
| VARCHAR | VARCHAR | |
|
||||
| STRING | STRING | |
|
||||
| TEXT | STRING | |
|
||||
| HLL | HLL | `return_object_data_as_binary=true` is required when query HLL column |
|
||||
|Other| UNSUPPORTED |
|
||||
|
||||
### SAP HANA
|
||||
|
||||
@ -420,6 +420,7 @@ set enable_odbc_transcation = true;
|
||||
| VARCHAR | VARCHAR | |
|
||||
| STRING | STRING | |
|
||||
| TEXT | STRING | |
|
||||
| HLL | HLL | 查询HLL需要设置`return_object_data_as_binary=true` |
|
||||
|Other| UNSUPPORTED |
|
||||
|
||||
### SAP HANA
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.external.jdbc;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.util.Util;
|
||||
|
||||
import avro.shaded.com.google.common.collect.Lists;
|
||||
|
||||
@ -27,7 +28,10 @@ import java.sql.Connection;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class JdbcMySQLClient extends JdbcClient {
|
||||
@ -76,6 +80,103 @@ public class JdbcMySQLClient extends JdbcClient {
|
||||
return databaseMetaData.getColumns(schemaName, null, tableName, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* get all columns like DatabaseMetaData.getColumns in mysql-jdbc-connector
|
||||
*/
|
||||
private Map<String, String> getJdbcColumnsTypeInfo(String dbName, String tableName) {
|
||||
Connection conn = getConnection();
|
||||
ResultSet resultSet = null;
|
||||
Map<String, String> fieldtoType = new HashMap<String, String>();
|
||||
|
||||
StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM ");
|
||||
queryBuf.append(tableName);
|
||||
queryBuf.append(" FROM ");
|
||||
queryBuf.append(dbName);
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
resultSet = stmt.executeQuery(queryBuf.toString());
|
||||
while (resultSet.next()) {
|
||||
// get column name
|
||||
String fieldName = resultSet.getString("Field");
|
||||
// get original type name
|
||||
String typeName = resultSet.getString("Type");
|
||||
fieldtoType.put(fieldName, typeName);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new JdbcClientException("failed to get column list from jdbc for table %s:%s", tableName,
|
||||
Util.getRootCauseMessage(e));
|
||||
} finally {
|
||||
close(resultSet, conn);
|
||||
}
|
||||
|
||||
return fieldtoType;
|
||||
}
|
||||
|
||||
/**
|
||||
* get all columns of one table
|
||||
*/
|
||||
@Override
|
||||
public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String tableName) {
|
||||
Connection conn = getConnection();
|
||||
ResultSet rs = null;
|
||||
List<JdbcFieldSchema> tableSchema = com.google.common.collect.Lists.newArrayList();
|
||||
// if isLowerCaseTableNames == true, tableName is lower case
|
||||
// but databaseMetaData.getColumns() is case sensitive
|
||||
if (isLowerCaseTableNames) {
|
||||
dbName = lowerDBToRealDB.get(dbName);
|
||||
tableName = lowerTableToRealTable.get(tableName);
|
||||
}
|
||||
try {
|
||||
DatabaseMetaData databaseMetaData = conn.getMetaData();
|
||||
String catalogName = getCatalogName(conn);
|
||||
tableName = modifyTableNameIfNecessary(tableName);
|
||||
rs = getColumns(databaseMetaData, catalogName, dbName, tableName);
|
||||
boolean needGetDorisColumns = true;
|
||||
Map<String, String> mapFieldtoType = null;
|
||||
while (rs.next()) {
|
||||
if (isTableModified(tableName, rs.getString("TABLE_NAME"))) {
|
||||
continue;
|
||||
}
|
||||
JdbcFieldSchema field = new JdbcFieldSchema();
|
||||
field.setColumnName(rs.getString("COLUMN_NAME"));
|
||||
field.setDataType(rs.getInt("DATA_TYPE"));
|
||||
|
||||
// in mysql-jdbc-connector-8.0.*, TYPE_NAME of the HLL column in doris will be "UNKNOWN"
|
||||
// in mysql-jdbc-connector-5.1.*, TYPE_NAME of the HLL column in doris will be "HLL"
|
||||
field.setDataTypeName(rs.getString("TYPE_NAME"));
|
||||
if (rs.getString("TYPE_NAME").equalsIgnoreCase("UNKNOWN")) {
|
||||
if (needGetDorisColumns) {
|
||||
mapFieldtoType = getJdbcColumnsTypeInfo(dbName, tableName);
|
||||
needGetDorisColumns = false;
|
||||
}
|
||||
|
||||
if (mapFieldtoType != null) {
|
||||
field.setDataTypeName(mapFieldtoType.get(rs.getString("COLUMN_NAME")));
|
||||
}
|
||||
}
|
||||
|
||||
field.setColumnSize(rs.getInt("COLUMN_SIZE"));
|
||||
field.setDecimalDigits(rs.getInt("DECIMAL_DIGITS"));
|
||||
field.setNumPrecRadix(rs.getInt("NUM_PREC_RADIX"));
|
||||
/*
|
||||
Whether it is allowed to be NULL
|
||||
0 (columnNoNulls)
|
||||
1 (columnNullable)
|
||||
2 (columnNullableUnknown)
|
||||
*/
|
||||
field.setAllowNull(rs.getInt("NULLABLE") != 0);
|
||||
field.setRemarks(rs.getString("REMARKS"));
|
||||
field.setCharOctetLength(rs.getInt("CHAR_OCTET_LENGTH"));
|
||||
tableSchema.add(field);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new JdbcClientException("failed to get table name list from jdbc for table %s:%s", e, tableName,
|
||||
Util.getRootCauseMessage(e));
|
||||
} finally {
|
||||
close(rs, conn);
|
||||
}
|
||||
return tableSchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
|
||||
// For mysql type: "INT UNSIGNED":
|
||||
@ -170,6 +271,8 @@ public class JdbcMySQLClient extends JdbcClient {
|
||||
case "VARBINARY":
|
||||
case "ENUM":
|
||||
return ScalarType.createStringType();
|
||||
case "HLL":
|
||||
return ScalarType.createHllType();
|
||||
default:
|
||||
return Type.UNSUPPORTED;
|
||||
}
|
||||
|
||||
@ -328,6 +328,31 @@ public class JdbcExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
public List<Object[]> getBlock(int batchSize, Object colsArray) throws UdfRuntimeException {
|
||||
try {
|
||||
ArrayList<Integer> colsTypes = (ArrayList<Integer>) colsArray;
|
||||
Integer[] colArray = new Integer[colsTypes.size()];
|
||||
colArray = colsTypes.toArray(colArray);
|
||||
int columnCount = resultSetMetaData.getColumnCount();
|
||||
curBlockRows = 0;
|
||||
do {
|
||||
for (int i = 0; i < columnCount; ++i) {
|
||||
// colArray[i] > 0, means the type is Hll/Bitmap, we should read it with getBytes
|
||||
// instead of getObject, as Hll/Bitmap in JDBC will map to String by default.
|
||||
if (colArray[i] > 0) {
|
||||
block.get(i)[curBlockRows] = resultSet.getBytes(i + 1);
|
||||
} else {
|
||||
block.get(i)[curBlockRows] = resultSet.getObject(i + 1);
|
||||
}
|
||||
}
|
||||
curBlockRows++;
|
||||
} while (curBlockRows < batchSize && resultSet.next());
|
||||
} catch (SQLException e) {
|
||||
throw new UdfRuntimeException("get next block failed: ", e);
|
||||
}
|
||||
return block;
|
||||
}
|
||||
|
||||
public List<Object[]> getBlock(int batchSize) throws UdfRuntimeException {
|
||||
try {
|
||||
int columnCount = resultSetMetaData.getColumnCount();
|
||||
@ -1273,6 +1298,19 @@ public class JdbcExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
public void copyBatchHllResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr,
|
||||
long offsetsAddr, long charsAddr) {
|
||||
Object[] column = (Object[]) columnObj;
|
||||
int firstNotNullIndex = 0;
|
||||
if (isNullable) {
|
||||
firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr);
|
||||
}
|
||||
if (firstNotNullIndex == numRows) {
|
||||
return;
|
||||
}
|
||||
hllPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr);
|
||||
}
|
||||
|
||||
public void copyBatchDateTimeV2Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr,
|
||||
long columnAddr) throws SQLException {
|
||||
Object[] column = (Object[]) columnObj;
|
||||
@ -1315,6 +1353,43 @@ public class JdbcExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
private void hllPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr,
|
||||
long offsetsAddr, long charsAddr) {
|
||||
int[] offsets = new int[numRows];
|
||||
byte[][] byteRes = new byte[numRows][];
|
||||
int offset = 0;
|
||||
if (isNullable == true) {
|
||||
// Here can not loop from startRowForNullable,
|
||||
// because byteRes will be used later
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
if (column[i] == null) {
|
||||
byteRes[i] = emptyBytes;
|
||||
UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
|
||||
} else {
|
||||
byteRes[i] = (byte[]) column[i];
|
||||
}
|
||||
offset += byteRes[i].length;
|
||||
offsets[i] = offset;
|
||||
}
|
||||
} else {
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
byteRes[i] = (byte[]) column[i];
|
||||
offset += byteRes[i].length;
|
||||
offsets[i] = offset;
|
||||
}
|
||||
}
|
||||
byte[] bytes = new byte[offsets[numRows - 1]];
|
||||
long bytesAddr = JNINativeMethod.resizeStringColumn(charsAddr, offsets[numRows - 1]);
|
||||
int dst = 0;
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
for (int j = 0; j < byteRes[i].length; j++) {
|
||||
bytes[dst++] = byteRes[i][j];
|
||||
}
|
||||
}
|
||||
UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L);
|
||||
UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]);
|
||||
}
|
||||
|
||||
private void objectPutToString(Object[] column, boolean isNullable, int numRows, long nullMapAddr,
|
||||
long offsetsAddr, long charsAddr) {
|
||||
int[] offsets = new int[numRows];
|
||||
|
||||
@ -0,0 +1,57 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql --
|
||||
internal
|
||||
|
||||
-- !sql --
|
||||
internal
|
||||
|
||||
-- !ex_tb1 --
|
||||
1 doris1
|
||||
2 doris2
|
||||
3 doris3
|
||||
4 doris4
|
||||
5 doris5
|
||||
6 doris6
|
||||
|
||||
-- !sql --
|
||||
internal
|
||||
|
||||
-- !sql --
|
||||
doris_jdbc_catalog
|
||||
|
||||
-- !ex_tb1 --
|
||||
1 doris1
|
||||
2 doris2
|
||||
3 doris3
|
||||
4 doris4
|
||||
5 doris5
|
||||
6 doris6
|
||||
|
||||
-- !tb1 --
|
||||
1 1
|
||||
2 1
|
||||
3 1
|
||||
4 1
|
||||
5 1
|
||||
6 1
|
||||
|
||||
-- !sql --
|
||||
internal
|
||||
|
||||
-- !sql --
|
||||
doris_jdbc_catalog
|
||||
|
||||
-- !tb2 --
|
||||
1 1
|
||||
2 1
|
||||
3 1
|
||||
4 1
|
||||
5 1
|
||||
6 1
|
||||
|
||||
-- !sql --
|
||||
doris_jdbc_catalog
|
||||
|
||||
-- !sql --
|
||||
internal
|
||||
|
||||
@ -0,0 +1,163 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_doris_jdbc_catalog", "p0") {
|
||||
qt_sql """select current_catalog()"""
|
||||
|
||||
String jdbcUrl = context.config.jdbcUrl + "&sessionVariables=return_object_data_as_binary=true"
|
||||
String jdbcUser = context.config.jdbcUser
|
||||
String jdbcPassword = context.config.jdbcPassword
|
||||
String driver_url = "https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar"
|
||||
|
||||
String resource_name = "jdbc_resource_catalog_doris"
|
||||
String catalog_name = "doris_jdbc_catalog";
|
||||
String internal_db_name = "regression_test_jdbc_catalog_p0";
|
||||
String doris_port = 9030;
|
||||
String inDorisTable = "doris_in_tb";
|
||||
String hllTable = "bowen_hll_test"
|
||||
|
||||
qt_sql """select current_catalog()"""
|
||||
sql """drop catalog if exists ${catalog_name} """
|
||||
|
||||
sql """ CREATE CATALOG `${catalog_name}` PROPERTIES (
|
||||
"user" = "${jdbcUser}",
|
||||
"type" = "jdbc",
|
||||
"password" = "${jdbcPassword}",
|
||||
"jdbc_url" = "${jdbcUrl}",
|
||||
"driver_url" = "${driver_url}",
|
||||
"driver_class" = "com.mysql.jdbc.Driver"
|
||||
)"""
|
||||
|
||||
sql """ drop table if exists ${inDorisTable} """
|
||||
sql """
|
||||
CREATE TABLE ${inDorisTable} (
|
||||
`id` INT NULL COMMENT "主键id",
|
||||
`name` string NULL COMMENT "名字"
|
||||
) DISTRIBUTED BY HASH(id) BUCKETS 10
|
||||
PROPERTIES("replication_num" = "1");
|
||||
"""
|
||||
sql """ insert into ${inDorisTable} values (1, 'doris1')"""
|
||||
sql """ insert into ${inDorisTable} values (2, 'doris2')"""
|
||||
sql """ insert into ${inDorisTable} values (3, 'doris3')"""
|
||||
sql """ insert into ${inDorisTable} values (4, 'doris4')"""
|
||||
sql """ insert into ${inDorisTable} values (5, 'doris5')"""
|
||||
sql """ insert into ${inDorisTable} values (6, 'doris6')"""
|
||||
|
||||
order_qt_ex_tb1 """ select * from internal.${internal_db_name}.${inDorisTable} order by id; """
|
||||
|
||||
qt_sql """select current_catalog()"""
|
||||
sql "switch ${catalog_name}"
|
||||
qt_sql """select current_catalog()"""
|
||||
sql """ use ${internal_db_name}"""
|
||||
order_qt_ex_tb1 """ select * from ${inDorisTable} order by id; """
|
||||
|
||||
// test hll query
|
||||
sql "switch internal"
|
||||
sql "use ${internal_db_name}"
|
||||
|
||||
sql """ drop table if exists ${hllTable} """
|
||||
sql """ CREATE TABLE `${hllTable}` (
|
||||
`pin_id` bigint(20) NOT NULL COMMENT "",
|
||||
`pv_date` datev2 NOT NULL COMMENT "",
|
||||
`user_log_acct` hll HLL_UNION NULL COMMENT ""
|
||||
) ENGINE=OLAP
|
||||
AGGREGATE KEY(`pin_id`, `pv_date`)
|
||||
COMMENT "OLAP"
|
||||
PARTITION BY RANGE(`pv_date`)
|
||||
(PARTITION pbefore201910 VALUES [('1900-01-01'), ('2019-10-01')),
|
||||
PARTITION p201910 VALUES [('2019-10-01'), ('2019-11-01')),
|
||||
PARTITION p201911 VALUES [('2019-11-01'), ('2019-12-01')),
|
||||
PARTITION p201912 VALUES [('2019-12-01'), ('2020-01-01')),
|
||||
PARTITION p202001 VALUES [('2020-01-01'), ('2020-02-01')),
|
||||
PARTITION p202002 VALUES [('2020-02-01'), ('2020-03-01')),
|
||||
PARTITION p202003 VALUES [('2020-03-01'), ('2020-04-01')),
|
||||
PARTITION p202004 VALUES [('2020-04-01'), ('2020-05-01')),
|
||||
PARTITION p202005 VALUES [('2020-05-01'), ('2020-06-01')),
|
||||
PARTITION p202006 VALUES [('2020-06-01'), ('2020-07-01')),
|
||||
PARTITION p202007 VALUES [('2020-07-01'), ('2020-08-01')),
|
||||
PARTITION p202008 VALUES [('2020-08-01'), ('2020-09-01')),
|
||||
PARTITION p202009 VALUES [('2020-09-01'), ('2020-10-01')),
|
||||
PARTITION p202010 VALUES [('2020-10-01'), ('2020-11-01')),
|
||||
PARTITION p202011 VALUES [('2020-11-01'), ('2020-12-01')),
|
||||
PARTITION p202012 VALUES [('2020-12-01'), ('2021-01-01')),
|
||||
PARTITION p202101 VALUES [('2021-01-01'), ('2021-02-01')),
|
||||
PARTITION p202102 VALUES [('2021-02-01'), ('2021-03-01')),
|
||||
PARTITION p202103 VALUES [('2021-03-01'), ('2021-04-01')),
|
||||
PARTITION p202104 VALUES [('2021-04-01'), ('2021-05-01')),
|
||||
PARTITION p202105 VALUES [('2021-05-01'), ('2021-06-01')),
|
||||
PARTITION p202106 VALUES [('2021-06-01'), ('2021-07-01')),
|
||||
PARTITION p202107 VALUES [('2021-07-01'), ('2021-08-01')),
|
||||
PARTITION p202108 VALUES [('2021-08-01'), ('2021-09-01')),
|
||||
PARTITION p202109 VALUES [('2021-09-01'), ('2021-10-01')),
|
||||
PARTITION p202110 VALUES [('2021-10-01'), ('2021-11-01')),
|
||||
PARTITION p202111 VALUES [('2021-11-01'), ('2021-12-01')),
|
||||
PARTITION p202112 VALUES [('2021-12-01'), ('2022-01-01')),
|
||||
PARTITION p202201 VALUES [('2022-01-01'), ('2022-02-01')),
|
||||
PARTITION p202202 VALUES [('2022-02-01'), ('2022-03-01')),
|
||||
PARTITION p202203 VALUES [('2022-03-01'), ('2022-04-01')),
|
||||
PARTITION p202204 VALUES [('2022-04-01'), ('2022-05-01')),
|
||||
PARTITION p202205 VALUES [('2022-05-01'), ('2022-06-01')),
|
||||
PARTITION p202206 VALUES [('2022-06-01'), ('2022-07-01')),
|
||||
PARTITION p202207 VALUES [('2022-07-01'), ('2022-08-01')),
|
||||
PARTITION p202208 VALUES [('2022-08-01'), ('2022-09-01')),
|
||||
PARTITION p202209 VALUES [('2022-09-01'), ('2022-10-01')),
|
||||
PARTITION p202210 VALUES [('2022-10-01'), ('2022-11-01')),
|
||||
PARTITION p202211 VALUES [('2022-11-01'), ('2022-12-01')),
|
||||
PARTITION p202212 VALUES [('2022-12-01'), ('2023-01-01')),
|
||||
PARTITION p202301 VALUES [('2023-01-01'), ('2023-02-01')),
|
||||
PARTITION p202302 VALUES [('2023-02-01'), ('2023-03-01')),
|
||||
PARTITION p202303 VALUES [('2023-03-01'), ('2023-04-01')),
|
||||
PARTITION p202304 VALUES [('2023-04-01'), ('2023-05-01')),
|
||||
PARTITION p202305 VALUES [('2023-05-01'), ('2023-06-01')),
|
||||
PARTITION p202306 VALUES [('2023-06-01'), ('2023-07-01')),
|
||||
PARTITION p202307 VALUES [('2023-07-01'), ('2023-08-01')),
|
||||
PARTITION p202308 VALUES [('2023-08-01'), ('2023-09-01')),
|
||||
PARTITION p202309 VALUES [('2023-09-01'), ('2023-10-01')))
|
||||
DISTRIBUTED BY HASH(`pin_id`) BUCKETS 16
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "DEFAULT"
|
||||
); """
|
||||
|
||||
sql """ insert into ${hllTable} values(1, "2023-01-01", hll_hash("1"));"""
|
||||
sql """ insert into ${hllTable} values(2, "2023-01-02", hll_hash("2"));"""
|
||||
sql """ insert into ${hllTable} values(3, "2023-01-03", hll_hash("3"));"""
|
||||
sql """ insert into ${hllTable} values(4, "2023-01-04", hll_hash("4"));"""
|
||||
sql """ insert into ${hllTable} values(5, "2023-01-05", hll_hash("5"));"""
|
||||
sql """ insert into ${hllTable} values(6, "2023-01-06", hll_hash("6"));"""
|
||||
|
||||
sql """ set return_object_data_as_binary=true """
|
||||
order_qt_tb1 """ select pin_id, hll_union_agg(user_log_acct) from ${hllTable} group by pin_id; """
|
||||
|
||||
// query with jdbc external table
|
||||
sql """ refresh catalog ${catalog_name} """
|
||||
qt_sql """select current_catalog()"""
|
||||
sql """ switch ${catalog_name} """
|
||||
qt_sql """select current_catalog()"""
|
||||
sql """ use ${internal_db_name} """
|
||||
order_qt_tb2 """ select pin_id, hll_union_agg(user_log_acct) from ${catalog_name}.${internal_db_name}.${hllTable} group by pin_id; """
|
||||
|
||||
//clean
|
||||
qt_sql """select current_catalog()"""
|
||||
sql "switch internal"
|
||||
qt_sql """select current_catalog()"""
|
||||
sql "use ${internal_db_name}"
|
||||
sql """ drop table if exists ${inDorisTable} """
|
||||
sql """ drop table if exists ${hllTable} """
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user