[feature](jdbc) Support jdbc catalog to read json types (#21341)
This commit is contained in:
@ -241,6 +241,8 @@ struct TypeDescriptor {
|
||||
|
||||
bool is_variant_type() const { return type == TYPE_VARIANT; }
|
||||
|
||||
bool is_json_type() const { return type == TYPE_JSONB; }
|
||||
|
||||
static inline int get_decimal_byte_size(int precision) {
|
||||
DCHECK_GT(precision, 0);
|
||||
if (precision <= MAX_DECIMAL4_PRECISION) {
|
||||
|
||||
@ -346,6 +346,22 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string&
|
||||
->create_column());
|
||||
break;
|
||||
}
|
||||
case TYPE_JSONB: {
|
||||
if (type_str != "java.lang.String" && type_str != "org.postgresql.util.PGobject") {
|
||||
return Status::InternalError(error_msg);
|
||||
}
|
||||
|
||||
_map_column_idx_to_cast_idx_json[column_index] = _input_json_string_types.size();
|
||||
if (slot_desc->is_nullable()) {
|
||||
_input_json_string_types.push_back(make_nullable(std::make_shared<DataTypeString>()));
|
||||
} else {
|
||||
_input_json_string_types.push_back(std::make_shared<DataTypeString>());
|
||||
}
|
||||
str_json_cols.push_back(
|
||||
_input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]]
|
||||
->create_column());
|
||||
break;
|
||||
}
|
||||
case TYPE_HLL: {
|
||||
if (type_str != "java.lang.String") {
|
||||
return Status::InternalError(error_msg);
|
||||
@ -447,6 +463,8 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
|
||||
_cast_string_to_array(slot_desc, block, column_index, num_rows);
|
||||
} else if (slot_desc->type().is_hll_type()) {
|
||||
_cast_string_to_hll(slot_desc, block, column_index, num_rows);
|
||||
} else if (slot_desc->type().is_json_type()) {
|
||||
_cast_string_to_json(slot_desc, block, column_index, num_rows);
|
||||
}
|
||||
materialized_column_index++;
|
||||
}
|
||||
@ -627,6 +645,26 @@ Status JdbcConnector::_convert_batch_result_set(JNIEnv* env, jobject jcolumn_dat
|
||||
address[1], chars_addres);
|
||||
break;
|
||||
}
|
||||
case TYPE_JSONB: {
|
||||
str_json_cols[_map_column_idx_to_cast_idx_json[column_index]]->resize(num_rows);
|
||||
if (column_is_nullable) {
|
||||
auto* nullbale_column = reinterpret_cast<vectorized::ColumnNullable*>(
|
||||
str_json_cols[_map_column_idx_to_cast_idx_json[column_index]].get());
|
||||
auto& null_map = nullbale_column->get_null_map_data();
|
||||
memset(null_map.data(), 0, num_rows);
|
||||
address[0] = reinterpret_cast<int64_t>(null_map.data());
|
||||
col_ptr = &nullbale_column->get_nested_column();
|
||||
} else {
|
||||
col_ptr = str_json_cols[_map_column_idx_to_cast_idx_json[column_index]].get();
|
||||
}
|
||||
auto column_string = reinterpret_cast<vectorized::ColumnString*>(col_ptr);
|
||||
address[1] = reinterpret_cast<int64_t>(column_string->get_offsets().data());
|
||||
auto chars_addres = reinterpret_cast<int64_t>(&column_string->get_chars());
|
||||
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_json_result,
|
||||
jcolumn_data, column_is_nullable, num_rows, address[0],
|
||||
address[1], chars_addres);
|
||||
break;
|
||||
}
|
||||
case TYPE_HLL: {
|
||||
str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]]->resize(num_rows);
|
||||
if (column_is_nullable) {
|
||||
@ -704,6 +742,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
|
||||
"(Ljava/lang/Object;ZIJJJ)V", _executor_get_array_result));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchHllResult", "(Ljava/lang/Object;ZIJJJ)V",
|
||||
_executor_get_hll_result));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchJsonResult",
|
||||
"(Ljava/lang/Object;ZIJJJ)V", _executor_get_json_result));
|
||||
RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchCharResult",
|
||||
"(Ljava/lang/Object;ZIJJJZ)V", _executor_get_char_result));
|
||||
|
||||
@ -820,6 +860,42 @@ Status JdbcConnector::_cast_string_to_array(const SlotDescriptor* slot_desc, Blo
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status JdbcConnector::_cast_string_to_json(const SlotDescriptor* slot_desc, Block* block,
|
||||
int column_index, int rows) {
|
||||
DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
|
||||
std::string _target_data_type_name = _target_data_type->get_name();
|
||||
DataTypePtr _cast_param_data_type = _target_data_type;
|
||||
ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1);
|
||||
|
||||
ColumnsWithTypeAndName argument_template;
|
||||
argument_template.reserve(2);
|
||||
argument_template.emplace_back(
|
||||
std::move(str_json_cols[_map_column_idx_to_cast_idx_json[column_index]]),
|
||||
_input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]],
|
||||
"java.sql.String");
|
||||
argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
|
||||
FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
|
||||
"CAST", argument_template, make_nullable(_target_data_type));
|
||||
|
||||
Block cast_block(argument_template);
|
||||
int result_idx = cast_block.columns();
|
||||
cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
|
||||
func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows);
|
||||
|
||||
auto res_col = cast_block.get_by_position(result_idx).column;
|
||||
if (_target_data_type->is_nullable()) {
|
||||
block->replace_by_position(column_index, res_col);
|
||||
} else {
|
||||
auto nested_ptr = reinterpret_cast<const vectorized::ColumnNullable*>(res_col.get())
|
||||
->get_nested_column_ptr();
|
||||
block->replace_by_position(column_index, nested_ptr);
|
||||
}
|
||||
str_json_cols[_map_column_idx_to_cast_idx_json[column_index]] =
|
||||
_input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]]
|
||||
->create_column();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status JdbcConnector::exec_stmt_write(Block* block, const VExprContextSPtrs& output_vexpr_ctxs,
|
||||
uint32_t* num_rows_sent) {
|
||||
SCOPED_TIMER(_result_send_timer);
|
||||
|
||||
@ -104,6 +104,8 @@ private:
|
||||
int rows);
|
||||
Status _cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block, int column_index,
|
||||
int rows);
|
||||
Status _cast_string_to_json(const SlotDescriptor* slot_desc, Block* block, int column_index,
|
||||
int rows);
|
||||
Status _convert_batch_result_set(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc,
|
||||
vectorized::IColumn* column_ptr, int num_rows,
|
||||
int column_index);
|
||||
@ -142,6 +144,7 @@ private:
|
||||
jmethodID _executor_get_decimal64_result;
|
||||
jmethodID _executor_get_decimal128_result;
|
||||
jmethodID _executor_get_array_result;
|
||||
jmethodID _executor_get_json_result;
|
||||
jmethodID _executor_get_hll_result;
|
||||
jmethodID _executor_get_types_id;
|
||||
jmethodID _executor_close_id;
|
||||
@ -160,6 +163,10 @@ private:
|
||||
std::vector<DataTypePtr> _input_hll_string_types;
|
||||
std::vector<MutableColumnPtr> str_hll_cols; // for hll type to save data like string
|
||||
|
||||
std::map<int, int> _map_column_idx_to_cast_idx_json;
|
||||
std::vector<DataTypePtr> _input_json_string_types;
|
||||
std::vector<MutableColumnPtr> str_json_cols; // for json type to save data like string
|
||||
|
||||
JdbcStatistic _jdbc_statistic;
|
||||
};
|
||||
|
||||
|
||||
@ -113,4 +113,13 @@ CREATE TABLE doris_test.arr
|
||||
`arr27` Array(Datetime64)
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY id
|
||||
ORDER BY id;
|
||||
|
||||
set allow_experimental_object_type = 1;
|
||||
CREATE TABLE doris_test.json
|
||||
(
|
||||
`id` String,
|
||||
`o` JSON
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY id;
|
||||
@ -29,4 +29,6 @@ VALUES
|
||||
INSERT INTO doris_test.student values (1, 'doris', 18), (2, 'alice', 19), (3, 'bob', 20);
|
||||
|
||||
INSERT INTO doris_test.arr values
|
||||
('1',[true],['2022-01-01'],['2022-01-01'],[1.1],[1.1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[2.2],[1],['116.253.40.133'],['2a02:aa08:e000:3100::2'],['61f0c404-5cb3-11e7-907b-a6006ad3dba0'],[1],['string'],['string'],['2022-01-01 00:00:00'],['2022-01-01 00:00:00'])
|
||||
('1',[true],['2022-01-01'],['2022-01-01'],[1.1],[1.1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[2.2],[1],['116.253.40.133'],['2a02:aa08:e000:3100::2'],['61f0c404-5cb3-11e7-907b-a6006ad3dba0'],[1],['string'],['string'],['2022-01-01 00:00:00'],['2022-01-01 00:00:00']);
|
||||
|
||||
INSERT INTO doris_test.json VALUES ('1','{"a": 1, "b": { "c": 2, "d": [1, 2, 3] }}');
|
||||
|
||||
@ -166,3 +166,15 @@ CREATE TABLE catalog_pg_test.dt_test (
|
||||
ts_field TIMESTAMP(3),
|
||||
tzt_field TIMESTAMPTZ(3)
|
||||
);
|
||||
|
||||
CREATE TABLE catalog_pg_test.json_test (
|
||||
id serial PRIMARY KEY,
|
||||
type varchar(10),
|
||||
value json
|
||||
);
|
||||
|
||||
CREATE TABLE catalog_pg_test.jsonb_test (
|
||||
id serial PRIMARY KEY,
|
||||
type varchar(10),
|
||||
value jsonb
|
||||
);
|
||||
|
||||
@ -2664,3 +2664,43 @@ VALUES
|
||||
'2023-06-16 12:34:56.123',
|
||||
'2023-06-16 12:34:56.123+08'
|
||||
);
|
||||
|
||||
INSERT INTO catalog_pg_test.json_test (type,value) VALUES
|
||||
(
|
||||
'json',
|
||||
'{
|
||||
"stringKey": "stringValue",
|
||||
"integerKey": 12345,
|
||||
"floatKey": 123.45,
|
||||
"booleanKey": true,
|
||||
"nullKey": null,
|
||||
"arrayKey": ["element1", 2, false, null, {"nestedKey": "nestedValue"}],
|
||||
"objectKey": {
|
||||
"nestedStringKey": "nestedStringValue",
|
||||
"nestedIntegerKey": 67890
|
||||
}
|
||||
}'),
|
||||
(
|
||||
'json2',
|
||||
NULL
|
||||
);
|
||||
|
||||
INSERT INTO catalog_pg_test.jsonb_test (type,value) VALUES
|
||||
(
|
||||
'jsonb',
|
||||
'{
|
||||
"stringKey": "stringValue",
|
||||
"integerKey": 12345,
|
||||
"floatKey": 123.45,
|
||||
"booleanKey": true,
|
||||
"nullKey": null,
|
||||
"arrayKey": ["element1", 2, false, null, {"nestedKey": "nestedValue"}],
|
||||
"objectKey": {
|
||||
"nestedStringKey": "nestedStringValue",
|
||||
"nestedIntegerKey": 67890
|
||||
}
|
||||
}'),
|
||||
(
|
||||
'jsonb2',
|
||||
NULL
|
||||
);
|
||||
|
||||
@ -180,7 +180,7 @@ CREATE CATALOG jdbc_mysql PROPERTIES (
|
||||
| TIME | STRING | |
|
||||
| CHAR | CHAR | |
|
||||
| VARCHAR | VARCHAR | |
|
||||
| JSON | STRING | |
|
||||
| JSON | JSON | |
|
||||
| SET | STRING | |
|
||||
| BIT | BOOLEAN/STRING | BIT(1) will be mapped to BOOLEAN, and other BITs will be mapped to STRING |
|
||||
| TINYTEXT、TEXT、MEDIUMTEXT、LONGTEXT | STRING | |
|
||||
@ -236,12 +236,13 @@ Doris obtains all schemas that PG user can access through the SQL statement: `se
|
||||
| varchar/text | STRING | |
|
||||
| timestamp | DATETIME | |
|
||||
| date | DATE | |
|
||||
| json/josnb | JSON | |
|
||||
| time | STRING | |
|
||||
| interval | STRING | |
|
||||
| point/line/lseg/box/path/polygon/circle | STRING | |
|
||||
| cidr/inet/macaddr | STRING | |
|
||||
| bit | BOOLEAN/STRING | bit(1) will be mapped to BOOLEAN, and other bits will be mapped to STRING |
|
||||
| uuid/josnb | STRING | |
|
||||
| uuid | STRING | |
|
||||
| Other | UNSUPPORTED | |
|
||||
|
||||
### Oracle
|
||||
|
||||
@ -180,7 +180,7 @@ CREATE CATALOG jdbc_mysql PROPERTIES (
|
||||
| TIME | STRING | |
|
||||
| CHAR | CHAR | |
|
||||
| VARCHAR | VARCHAR | |
|
||||
| JSON | STRING | |
|
||||
| JSON | JSON | |
|
||||
| SET | STRING | |
|
||||
| BIT | BOOLEAN/STRING | BIT(1) 会映射为 BOOLEAN,其他 BIT 映射为 STRING |
|
||||
| TINYTEXT、TEXT、MEDIUMTEXT、LONGTEXT | STRING | |
|
||||
@ -236,12 +236,13 @@ Doris 通过sql 语句 `select nspname from pg_namespace where has_schema_privil
|
||||
| varchar/text | STRING | |
|
||||
| timestamp | DATETIME | |
|
||||
| date | DATE | |
|
||||
| json/josnb | JSON | |
|
||||
| time | STRING | |
|
||||
| interval | STRING | |
|
||||
| point/line/lseg/box/path/polygon/circle | STRING | |
|
||||
| cidr/inet/macaddr | STRING | |
|
||||
| bit | BOOLEAN/STRING | bit(1)会映射为 BOOLEAN,其他 bit 映射为 STRING |
|
||||
| uuid/josnb | STRING | |
|
||||
| uuid | STRING | |
|
||||
| Other | UNSUPPORTED | |
|
||||
|
||||
### Oracle
|
||||
|
||||
@ -1924,6 +1924,23 @@ public class JdbcExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
public void copyBatchJsonResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr,
|
||||
long offsetsAddr, long charsAddr) {
|
||||
Object[] column = (Object[]) columnObj;
|
||||
int firstNotNullIndex = 0;
|
||||
if (isNullable) {
|
||||
firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr);
|
||||
}
|
||||
if (firstNotNullIndex == numRows) {
|
||||
return;
|
||||
}
|
||||
if (column[firstNotNullIndex] instanceof String) {
|
||||
stringPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr);
|
||||
} else {
|
||||
objectPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr);
|
||||
}
|
||||
}
|
||||
|
||||
private int getFirstNotNullObject(Object[] column, int numRows, long nullMapAddr) {
|
||||
int i = 0;
|
||||
for (; i < numRows; ++i) {
|
||||
|
||||
@ -313,6 +313,8 @@ public class JdbcMySQLClient extends JdbcClient {
|
||||
} else {
|
||||
return ScalarType.createStringType();
|
||||
}
|
||||
case "JSON":
|
||||
return ScalarType.createJsonbType();
|
||||
case "TIME":
|
||||
case "TINYTEXT":
|
||||
case "TEXT":
|
||||
@ -326,7 +328,6 @@ public class JdbcMySQLClient extends JdbcClient {
|
||||
case "STRING":
|
||||
case "MEDIUMSTRING":
|
||||
case "LONGSTRING":
|
||||
case "JSON":
|
||||
case "SET":
|
||||
case "BINARY":
|
||||
case "VARBINARY":
|
||||
|
||||
@ -99,10 +99,12 @@ public class JdbcPostgreSQLClient extends JdbcClient {
|
||||
case "inet":
|
||||
case "macaddr":
|
||||
case "varbit":
|
||||
case "jsonb":
|
||||
case "uuid":
|
||||
case "bytea":
|
||||
return ScalarType.createStringType();
|
||||
case "json":
|
||||
case "jsonb":
|
||||
return ScalarType.createJsonbType();
|
||||
default:
|
||||
return Type.UNSUPPORTED;
|
||||
}
|
||||
|
||||
@ -292,7 +292,7 @@ sys
|
||||
|
||||
-- !mysql_all_types --
|
||||
\N 302 \N 502 602 4.14159 \N 6.14159 \N -124 -302 2013 -402 -502 -602 \N 2012-10-26T02:08:39.345700 2013-10-26T08:09:18 -5.14145 \N -7.1400 row2 \N 09:11:09.567 text2 0xe86f6c6c6f20576f726c67 \N \N 0x2f \N 0x88656c6c9f Value3
|
||||
201 301 401 501 601 3.14159 4.1415926 5.14159 true -123 -301 2012 -401 -501 -601 2012-10-30 2012-10-25T12:05:36.345700 2012-10-25T08:08:08 -4.14145 -5.1400000001 -6.1400 row1 line1 09:09:09.567 text1 0x48656c6c6f20576f726c64 {"age": 30, "city": "London", "name": "Alice"} Option1,Option3 0x2a 0x48656c6c6f00000000000000 0x48656c6c6f Value2
|
||||
202 302 402 502 602 4.14159 5.1415926 6.14159 false -124 -302 2013 -402 -502 -602 2012-11-01 2012-10-26T02:08:39.345700 2013-10-26T08:09:18 -5.14145 -6.1400000001 -7.1400 row2 line2 09:11:09.567 text2 0xe86f6c6c6f20576f726c67 {"age": 18, "city": "ChongQing", "name": "Gaoxin"} Option1,Option2 0x2f 0x58676c6c6f00000000000000 0x88656c6c9f Value3
|
||||
203 303 403 503 603 7.14159 8.1415926 9.14159 false \N -402 2017 -602 -902 -1102 2012-11-02 \N 2013-10-27T08:11:18 -5.14145 -6.1400000000001 -7.1400 row3 line3 09:11:09.567 text3 0xe86f6c6c6f20576f726c67 {"age": 24, "city": "ChongQing", "name": "ChenQi"} Option2 0x2f 0x58676c6c6f00000000000000 \N Value1
|
||||
201 301 401 501 601 3.14159 4.1415926 5.14159 true -123 -301 2012 -401 -501 -601 2012-10-30 2012-10-25T12:05:36.345700 2012-10-25T08:08:08 -4.14145 -5.1400000001 -6.1400 row1 line1 09:09:09.567 text1 0x48656c6c6f20576f726c64 {"age":30,"city":"London","name":"Alice"} Option1,Option3 0x2a 0x48656c6c6f00000000000000 0x48656c6c6f Value2
|
||||
202 302 402 502 602 4.14159 5.1415926 6.14159 false -124 -302 2013 -402 -502 -602 2012-11-01 2012-10-26T02:08:39.345700 2013-10-26T08:09:18 -5.14145 -6.1400000001 -7.1400 row2 line2 09:11:09.567 text2 0xe86f6c6c6f20576f726c67 {"age":18,"city":"ChongQing","name":"Gaoxin"} Option1,Option2 0x2f 0x58676c6c6f00000000000000 0x88656c6c9f Value3
|
||||
203 303 403 503 603 7.14159 8.1415926 9.14159 false \N -402 2017 -602 -902 -1102 2012-11-02 \N 2013-10-27T08:11:18 -5.14145 -6.1400000000001 -7.1400 row3 line3 09:11:09.567 text3 0xe86f6c6c6f20576f726c67 {"age":24,"city":"ChongQing","name":"ChenQi"} Option2 0x2f 0x58676c6c6f00000000000000 \N Value1
|
||||
|
||||
|
||||
File diff suppressed because one or more lines are too long
@ -71,6 +71,8 @@ suite("test_pg_jdbc_catalog", "p0") {
|
||||
order_qt_test14 """ select * from test12 order by id; """
|
||||
order_qt_wkb_test """ select * from wkb_test order by id; """
|
||||
order_qt_dt_test """ select * from dt_test order by 1; """
|
||||
order_qt_json_test """ select * from json_test order by 1; """
|
||||
order_qt_jsonb_test """ select * from jsonb_test order by 1; """
|
||||
|
||||
// test insert
|
||||
String uuid1 = UUID.randomUUID().toString();
|
||||
|
||||
Reference in New Issue
Block a user