[feature-wip](array-type) Array data can be loaded in stream load. (#8368) (#8585)

Please refer to #8367 .
This commit is contained in:
Adonis Ling
2022-03-22 15:25:40 +08:00
committed by GitHub
parent a498463ab5
commit e44038caf3
7 changed files with 371 additions and 8 deletions

View File

@ -26,6 +26,7 @@
#include "runtime/datetime_value.h"
#include "runtime/string_value.h"
#include "string_functions.h"
#include "util/array_parser.hpp"
#include "util/mysql_global.h"
#include "util/string_parser.hpp"
@ -357,4 +358,10 @@ DateTimeVal CastFunctions::cast_to_date_val(FunctionContext* ctx, const StringVa
return result;
}
CollectionVal CastFunctions::cast_to_array_val(FunctionContext* context, const StringVal& val) {
CollectionVal array_val;
Status status = ArrayParser::parse(array_val, context, val);
return status.ok() ? array_val : CollectionVal::null();
}
} // namespace doris

View File

@ -136,6 +136,8 @@ public:
static DateTimeVal cast_to_date_val(FunctionContext* context, const DoubleVal& val);
static DateTimeVal cast_to_date_val(FunctionContext* context, const DateTimeVal& val);
static DateTimeVal cast_to_date_val(FunctionContext* context, const StringVal& val);
static CollectionVal cast_to_array_val(FunctionContext* context, const StringVal& val);
};
} // namespace doris

View File

@ -0,0 +1,212 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <rapidjson/document.h>
#include <unordered_map>
#include "common/status.h"
#include "exprs/anyval_util.h"
#include "runtime/collection_value.h"
#include "runtime/primitive_type.h"
#include "runtime/types.h"
#include "util/mem_util.hpp"
namespace doris {
template <typename Encoding>
using ConstArray = typename rapidjson::GenericValue<Encoding>::ConstArray;
template <typename Encoding>
using ConstArrayIterator = typename ConstArray<Encoding>::ValueIterator;
class ArrayParser {
public:
static Status parse(CollectionVal& array_val, FunctionContext* context,
const StringVal& str_val) {
rapidjson::Document document;
if (document.Parse(reinterpret_cast<char*>(str_val.ptr), str_val.len).HasParseError() ||
!document.IsArray()) {
return Status::RuntimeError("Failed to parse the json to array.");
}
if (document.IsNull()) {
array_val = CollectionVal::null();
return Status::OK();
}
auto type_desc = _convert_to_type_descriptor(context->get_return_type());
return _parse<rapidjson::UTF8<>>(
array_val, context,
reinterpret_cast<const rapidjson::Document*>(&document)->GetArray(), type_desc);
}
private:
static TypeDescriptor _convert_to_type_descriptor(
FunctionContext::TypeDesc function_type_desc) {
auto iterator = _types_mapping.find(function_type_desc.type);
if (iterator == _types_mapping.end()) {
return TypeDescriptor();
}
auto type_desc = TypeDescriptor(iterator->second);
type_desc.len = function_type_desc.len;
type_desc.precision = function_type_desc.precision;
type_desc.scale = function_type_desc.scale;
for (auto child_type_desc : function_type_desc.children) {
type_desc.children.push_back(_convert_to_type_descriptor(child_type_desc));
}
return type_desc;
}
template <typename Encoding>
static Status _parse(CollectionVal& array_val, FunctionContext* context,
const ConstArray<Encoding>& array, const TypeDescriptor& type_desc) {
if (array.Empty()) {
CollectionValue(0).to_collection_val(&array_val);
return Status::OK();
}
auto child_type_desc = type_desc.children[0];
auto item_type = child_type_desc.type;
CollectionValue collection_value;
CollectionValue::init_collection(context, array.Size(), item_type, &collection_value);
int index = 0;
for (auto it = array.Begin(); it != array.End(); ++it) {
if (it->IsNull()) {
auto null = AnyVal(true);
collection_value.set(index++, item_type, &null);
continue;
} else if (!_is_type_valid<Encoding>(it, item_type)) {
return Status::RuntimeError("Failed to parse the json to array.");
}
AnyVal* val;
Status status = _parse<Encoding>(&val, context, it, child_type_desc);
if (!status.ok()) {
return status;
}
collection_value.set(index++, item_type, val);
}
collection_value.to_collection_val(&array_val);
return Status::OK();
}
template <typename Encoding>
static bool _is_type_valid(const ConstArrayIterator<Encoding> iterator,
const PrimitiveType type) {
switch (type) {
case TYPE_NULL:
return iterator->IsNull();
case TYPE_BOOLEAN:
return iterator->IsBool();
case TYPE_TINYINT:
case TYPE_SMALLINT:
case TYPE_INT:
case TYPE_BIGINT:
case TYPE_LARGEINT:
case TYPE_FLOAT:
case TYPE_DOUBLE:
return iterator->IsNumber();
case TYPE_DATE:
case TYPE_DATETIME:
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_HLL:
case TYPE_STRING:
return iterator->IsString();
case TYPE_OBJECT:
return iterator->IsObject();
case TYPE_ARRAY:
return iterator->IsArray();
default:
return false;
}
}
template <typename Encoding>
static Status _parse(AnyVal** val, FunctionContext* context,
const ConstArrayIterator<Encoding> iterator,
const TypeDescriptor& type_desc) {
switch (type_desc.type) {
case TYPE_ARRAY:
*val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(CollectionVal)));
new (*val) CollectionVal();
return _parse<Encoding>(*reinterpret_cast<CollectionVal*>(*val), context,
iterator->GetArray(), type_desc);
case TYPE_BOOLEAN:
*val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(BooleanVal)));
new (*val) BooleanVal(iterator->GetBool());
break;
case TYPE_TINYINT:
*val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(TinyIntVal)));
new (*val) TinyIntVal(iterator->GetInt());
break;
case TYPE_SMALLINT:
*val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(SmallIntVal)));
new (*val) SmallIntVal(iterator->GetInt());
break;
case TYPE_INT:
*val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(IntVal)));
new (*val) IntVal(iterator->GetInt());
break;
case TYPE_BIGINT:
*val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(BigIntVal)));
new (*val) BigIntVal(iterator->GetInt64());
break;
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
*val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(StringVal)));
new (*val) StringVal(context->allocate(iterator->GetStringLength()),
iterator->GetStringLength());
auto string_val = reinterpret_cast<StringVal*>(*val);
memory_copy(string_val->ptr, iterator->GetString(), iterator->GetStringLength());
break;
}
default:
return Status::RuntimeError("Failed to parse json to type (" +
std::to_string(type_desc.type) + ").");
}
return Status::OK();
}
private:
static std::unordered_map<FunctionContext::Type, PrimitiveType> _types_mapping;
};
std::unordered_map<FunctionContext::Type, PrimitiveType> ArrayParser::_types_mapping = {
{FunctionContext::INVALID_TYPE, PrimitiveType::INVALID_TYPE},
{FunctionContext::TYPE_NULL, PrimitiveType::TYPE_NULL},
{FunctionContext::TYPE_BOOLEAN, PrimitiveType::TYPE_BOOLEAN},
{FunctionContext::TYPE_TINYINT, PrimitiveType::TYPE_TINYINT},
{FunctionContext::TYPE_SMALLINT, PrimitiveType::TYPE_SMALLINT},
{FunctionContext::TYPE_INT, PrimitiveType::TYPE_INT},
{FunctionContext::TYPE_BIGINT, PrimitiveType::TYPE_BIGINT},
{FunctionContext::TYPE_LARGEINT, PrimitiveType::TYPE_LARGEINT},
{FunctionContext::TYPE_FLOAT, PrimitiveType::TYPE_FLOAT},
{FunctionContext::TYPE_DOUBLE, PrimitiveType::TYPE_DOUBLE},
{FunctionContext::TYPE_DECIMAL_DEPRACTED, PrimitiveType::TYPE_DECIMAL_DEPRACTED},
{FunctionContext::TYPE_DATE, PrimitiveType::TYPE_DATE},
{FunctionContext::TYPE_DATETIME, PrimitiveType::TYPE_DATETIME},
{FunctionContext::TYPE_CHAR, PrimitiveType::TYPE_CHAR},
{FunctionContext::TYPE_VARCHAR, PrimitiveType::TYPE_VARCHAR},
{FunctionContext::TYPE_HLL, PrimitiveType::TYPE_HLL},
{FunctionContext::TYPE_STRING, PrimitiveType::TYPE_STRING},
{FunctionContext::TYPE_DECIMALV2, PrimitiveType::TYPE_DECIMALV2},
{FunctionContext::TYPE_OBJECT, PrimitiveType::TYPE_OBJECT},
{FunctionContext::TYPE_ARRAY, PrimitiveType::TYPE_ARRAY},
};
} // namespace doris

View File

@ -75,5 +75,6 @@ ADD_BE_TEST(sort_heap_test)
ADD_BE_TEST(counts_test)
ADD_BE_TEST(date_func_test)
ADD_BE_TEST(tuple_row_zorder_compare_test)
ADD_BE_TEST(array_parser_test)
target_link_libraries(Test_util Common Util Gutil ${Boost_LIBRARIES} glog gflags fmt protobuf)

View File

@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <memory>
#include <string>
#include <util/array_parser.hpp>
#include "gutil/casts.h"
#include "olap/types.h"
#include "runtime/free_pool.hpp"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "runtime/string_value.h"
#include "udf/udf.h"
#include "udf/udf_internal.h"
namespace doris {
using TypeDesc = FunctionContext::TypeDesc;
template <typename... Ts>
TypeDesc create_function_type_desc(FunctionContext::Type type, Ts... sub_types) {
TypeDesc type_desc = {.type = type,
.len = (type == FunctionContext::TYPE_ARRAY) ? OLAP_ARRAY_MAX_BYTES : 0};
if constexpr (sizeof...(sub_types)) {
type_desc.children.push_back(create_function_type_desc(sub_types...));
}
return type_desc;
}
ColumnPB create_column_pb(const TypeDesc& function_type_desc) {
ColumnPB column_pb;
column_pb.set_length(function_type_desc.len);
switch (function_type_desc.type) {
case FunctionContext::TYPE_ARRAY:
column_pb.set_type("ARRAY");
break;
case FunctionContext::TYPE_INT:
column_pb.set_type("INT");
break;
case FunctionContext::TYPE_VARCHAR:
column_pb.set_type("VARCHAR");
break;
default:
break;
}
for (auto child_type_desc : function_type_desc.children) {
auto sub_column_pb = create_column_pb(child_type_desc);
column_pb.add_children_columns()->Swap(&sub_column_pb);
}
return column_pb;
}
std::shared_ptr<const TypeInfo> get_type_info(const TypeDesc& function_type_desc) {
auto column_pb = create_column_pb(function_type_desc);
TabletColumn tablet_column;
tablet_column.init_from_pb(column_pb);
return get_type_info(&tablet_column);
}
void test_array_parser(const TypeDesc& function_type_desc, const std::string& json,
const CollectionValue& expect) {
MemTracker tracker(1024 * 1024, "ArrayParserTest");
MemPool mem_pool(&tracker);
std::unique_ptr<FunctionContext> function_context(new FunctionContext());
function_context->impl()->_return_type = function_type_desc;
function_context->impl()->_pool = new FreePool(&mem_pool);
CollectionVal collection_val;
auto status =
ArrayParser::parse(collection_val, function_context.get(), StringVal(json.c_str()));
EXPECT_TRUE(status.ok());
auto actual = CollectionValue::from_collection_val(collection_val);
EXPECT_TRUE(get_type_info(function_type_desc)->equal(&expect, &actual));
}
TEST(ArrayParserTest, TestParseIntArray) {
auto function_type_desc =
create_function_type_desc(FunctionContext::TYPE_ARRAY, FunctionContext::TYPE_INT);
test_array_parser(function_type_desc, "[]", CollectionValue(0));
int num_items = 3;
std::unique_ptr<int32_t[]> data(new int32_t[num_items] {1, 2, 3});
CollectionValue value(data.get(), num_items, false, nullptr);
test_array_parser(function_type_desc, "[1, 2, 3]", value);
std::unique_ptr<bool[]> null_signs(new bool[num_items] {false, true, false});
value.set_has_null(true);
value.set_null_signs(null_signs.get());
test_array_parser(function_type_desc, "[1, null, 3]", value);
}
TEST(ArrayParserTest, TestParseVarcharArray) {
auto function_type_desc =
create_function_type_desc(FunctionContext::TYPE_ARRAY, FunctionContext::TYPE_VARCHAR);
test_array_parser(function_type_desc, "[]", CollectionValue(0));
int num_items = 3;
std::unique_ptr<char[]> data(new char[num_items] {'a', 'b', 'c'});
std::unique_ptr<StringValue[]> string_values(new StringValue[num_items] {
{&data[0], 1},
{&data[1], 1},
{&data[2], 1},
});
CollectionValue value(string_values.get(), num_items, false, nullptr);
test_array_parser(function_type_desc, "[\"a\", \"b\", \"c\"]", value);
std::unique_ptr<bool[]> null_signs(new bool[num_items] {false, true, false});
value.set_has_null(true);
value.set_null_signs(null_signs.get());
test_array_parser(function_type_desc, "[\"a\", null, \"c\"]", value);
}
} // namespace doris
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -241,12 +241,19 @@ public class CastExpr extends Expr {
this.opcode = TExprOpcode.CAST;
FunctionName fnName = new FunctionName(getFnName(type));
Function searchDesc = new Function(fnName, Arrays.asList(collectChildReturnTypes()), Type.INVALID, false);
if (isImplicit) {
fn = Catalog.getCurrentCatalog().getFunction(
searchDesc, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
} else {
fn = Catalog.getCurrentCatalog().getFunction(
searchDesc, Function.CompareMode.IS_IDENTICAL);
if (type.isScalarType()) {
if (isImplicit) {
fn = Catalog.getCurrentCatalog().getFunction(
searchDesc, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
} else {
fn = Catalog.getCurrentCatalog().getFunction(
searchDesc, Function.CompareMode.IS_IDENTICAL);
}
} else if (type.isArrayType()){
fn = ScalarFunction.createBuiltin(getFnName(Type.ARRAY),
type, Function.NullableMode.ALWAYS_NULLABLE,
Lists.newArrayList(Type.VARCHAR), false ,
"doris::CastFunctions::cast_to_array_val", null, null, true);
}
if (fn == null) {

View File

@ -81,6 +81,7 @@ public abstract class Type {
// Only used for alias function, to represent any type in function args
public static final ScalarType ALL = new ScalarType(PrimitiveType.ALL);
public static final MapType Map = new MapType();
public static final ArrayType ARRAY = ArrayType.create();
private static ArrayList<ScalarType> integerTypes;
private static ArrayList<ScalarType> numericTypes;
@ -123,7 +124,6 @@ public abstract class Type {
supportedTypes.add(DECIMALV2);
supportedTypes.add(TIME);
supportedTypes.add(STRING);
}
public static ArrayList<ScalarType> getIntegerTypes() {
@ -387,7 +387,7 @@ public abstract class Type {
} else if (t1.isArrayType() && t2.isArrayType()) {
return ArrayType.canCastTo((ArrayType)t1, (ArrayType)t2);
}
return t1.isNull();
return t1.isNull() || t1.getPrimitiveType() == PrimitiveType.VARCHAR;
}
/**