[WIP](dynamic-table) support dynamic schema table (#16335)
Issue Number: close #16351 Dynamic schema table is a special type of table, it's schema change with loading procedure.Now we implemented this feature mainly for semi-structure data such as JSON, since JSON is schema self-described we could extract schema info from the original documents and inference the final type infomation.This speical table could reduce manual schema change operation and easily import semi-structure data and extends it's schema automatically.
This commit is contained in:
265
be/src/vec/json/json_parser.cpp
Normal file
265
be/src/vec/json/json_parser.cpp
Normal file
@ -0,0 +1,265 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/JSONParsers/SimdJSONParser.cpp
|
||||
// and modified by Doris
|
||||
|
||||
#include "vec/json/json_parser.h"
|
||||
|
||||
#include "vec/json/simd_json_parser.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
template <typename ParserImpl>
|
||||
bool JSONDataParser<ParserImpl>::extract_key(MutableColumns& columns, StringRef json,
|
||||
const std::vector<StringRef>& keys,
|
||||
const std::vector<ExtractType>& types) {
|
||||
assert(types.size() == keys.size());
|
||||
assert(columns.size() >= keys.size());
|
||||
Element document;
|
||||
if (!parser.parse(json.to_string_view(), document) || !document.isObject()) {
|
||||
return false;
|
||||
}
|
||||
const auto& obj = document.getObject();
|
||||
for (size_t x = 0; x < types.size(); ++x) {
|
||||
Element element;
|
||||
PathInData key_path(keys[x].to_string_view());
|
||||
if (!obj.find(key_path, element) || element.isNull()) {
|
||||
columns[x]->insert_default();
|
||||
continue;
|
||||
}
|
||||
switch (types[x]) {
|
||||
case ExtractType::ToString: {
|
||||
if (element.isString()) {
|
||||
auto str = element.getString();
|
||||
columns[x]->insert_data(str.data(), str.size());
|
||||
break;
|
||||
}
|
||||
auto str = castValueAsString(element);
|
||||
columns[x]->insert_data(str.data(), str.size());
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename ParserImpl>
|
||||
std::optional<ParseResult> JSONDataParser<ParserImpl>::parse(const char* begin, size_t length) {
|
||||
std::string_view json {begin, length};
|
||||
Element document;
|
||||
if (!parser.parse(json, document)) {
|
||||
return {};
|
||||
}
|
||||
ParseContext context;
|
||||
traverse(document, context);
|
||||
ParseResult result;
|
||||
result.values = std::move(context.values);
|
||||
result.paths.reserve(context.paths.size());
|
||||
for (auto&& path : context.paths) {
|
||||
result.paths.emplace_back(std::move(path));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
template <typename ParserImpl>
|
||||
void JSONDataParser<ParserImpl>::traverse(const Element& element, ParseContext& ctx) {
|
||||
// checkStackSize();
|
||||
if (element.isObject()) {
|
||||
traverseObject(element.getObject(), ctx);
|
||||
} else if (element.isArray()) {
|
||||
traverseArray(element.getArray(), ctx);
|
||||
} else {
|
||||
ctx.paths.push_back(ctx.builder.get_parts());
|
||||
ctx.values.push_back(getValueAsField(element));
|
||||
}
|
||||
}
|
||||
template <typename ParserImpl>
|
||||
void JSONDataParser<ParserImpl>::traverseObject(const JSONObject& object, ParseContext& ctx) {
|
||||
ctx.paths.reserve(ctx.paths.size() + object.size());
|
||||
ctx.values.reserve(ctx.values.size() + object.size());
|
||||
for (auto it = object.begin(); it != object.end(); ++it) {
|
||||
const auto& [key, value] = *it;
|
||||
ctx.builder.append(key, false);
|
||||
traverse(value, ctx);
|
||||
ctx.builder.pop_back();
|
||||
}
|
||||
}
|
||||
template <typename ParserImpl>
|
||||
void JSONDataParser<ParserImpl>::traverseArray(const JSONArray& array, ParseContext& ctx) {
|
||||
/// Traverse elements of array and collect an array of fields by each path.
|
||||
ParseArrayContext array_ctx;
|
||||
array_ctx.total_size = array.size();
|
||||
for (auto it = array.begin(); it != array.end(); ++it) {
|
||||
traverseArrayElement(*it, array_ctx);
|
||||
++array_ctx.current_size;
|
||||
}
|
||||
auto&& arrays_by_path = array_ctx.arrays_by_path;
|
||||
if (arrays_by_path.empty()) {
|
||||
ctx.paths.push_back(ctx.builder.get_parts());
|
||||
ctx.values.push_back(Array());
|
||||
} else {
|
||||
ctx.paths.reserve(ctx.paths.size() + arrays_by_path.size());
|
||||
ctx.values.reserve(ctx.values.size() + arrays_by_path.size());
|
||||
for (auto it = arrays_by_path.begin(); it != arrays_by_path.end(); ++it) {
|
||||
auto&& [path, path_array] = it->second;
|
||||
/// Merge prefix path and path of array element.
|
||||
ctx.paths.push_back(ctx.builder.append(path, true).get_parts());
|
||||
ctx.values.push_back(std::move(path_array));
|
||||
ctx.builder.pop_back(path.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
template <typename ParserImpl>
|
||||
void JSONDataParser<ParserImpl>::traverseArrayElement(const Element& element,
|
||||
ParseArrayContext& ctx) {
|
||||
ParseContext element_ctx;
|
||||
traverse(element, element_ctx);
|
||||
auto& [_, paths, values] = element_ctx;
|
||||
size_t size = paths.size();
|
||||
size_t keys_to_update = ctx.arrays_by_path.size();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
if (values[i].is_null()) {
|
||||
continue;
|
||||
}
|
||||
UInt128 hash = PathInData::get_parts_hash(paths[i]);
|
||||
auto found = ctx.arrays_by_path.find(hash);
|
||||
if (found != ctx.arrays_by_path.end()) {
|
||||
auto& path_array = found->second.second;
|
||||
assert(path_array.size() == ctx.current_size);
|
||||
/// If current element of array is part of Nested,
|
||||
/// collect its size or check it if the size of
|
||||
/// the Nested has been already collected.
|
||||
auto nested_key = getNameOfNested(paths[i], values[i]);
|
||||
if (!nested_key.empty()) {
|
||||
size_t array_size = get<const Array&>(values[i]).size();
|
||||
auto& current_nested_sizes = ctx.nested_sizes_by_key[nested_key];
|
||||
if (current_nested_sizes.size() == ctx.current_size) {
|
||||
current_nested_sizes.push_back(array_size);
|
||||
} else if (array_size != current_nested_sizes.back()) {
|
||||
LOG(FATAL) << fmt::format("Array sizes mismatched ({} and {})", array_size,
|
||||
current_nested_sizes.back());
|
||||
}
|
||||
}
|
||||
path_array.push_back(std::move(values[i]));
|
||||
--keys_to_update;
|
||||
} else {
|
||||
/// We found a new key. Add and empty array with current size.
|
||||
Array path_array;
|
||||
path_array.reserve(ctx.total_size);
|
||||
path_array.resize(ctx.current_size);
|
||||
auto nested_key = getNameOfNested(paths[i], values[i]);
|
||||
if (!nested_key.empty()) {
|
||||
size_t array_size = get<const Array&>(values[i]).size();
|
||||
auto& current_nested_sizes = ctx.nested_sizes_by_key[nested_key];
|
||||
if (current_nested_sizes.empty()) {
|
||||
current_nested_sizes.resize(ctx.current_size);
|
||||
} else {
|
||||
/// If newly added element is part of the Nested then
|
||||
/// resize its elements to keep correct sizes of Nested arrays.
|
||||
for (size_t j = 0; j < ctx.current_size; ++j) {
|
||||
path_array[j] = Array(current_nested_sizes[j]);
|
||||
}
|
||||
}
|
||||
if (current_nested_sizes.size() == ctx.current_size) {
|
||||
current_nested_sizes.push_back(array_size);
|
||||
} else if (array_size != current_nested_sizes.back()) {
|
||||
LOG(FATAL) << fmt::format("Array sizes mismatched ({} and {})", array_size,
|
||||
current_nested_sizes.back());
|
||||
}
|
||||
}
|
||||
path_array.push_back(std::move(values[i]));
|
||||
auto& elem = ctx.arrays_by_path[hash];
|
||||
elem.first = std::move(paths[i]);
|
||||
elem.second = std::move(path_array);
|
||||
}
|
||||
}
|
||||
/// If some of the keys are missed in current element,
|
||||
/// add default values for them.
|
||||
if (keys_to_update) {
|
||||
fillMissedValuesInArrays(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename ParserImpl>
|
||||
void JSONDataParser<ParserImpl>::fillMissedValuesInArrays(ParseArrayContext& ctx) {
|
||||
for (auto it = ctx.arrays_by_path.begin(); it != ctx.arrays_by_path.end(); ++it) {
|
||||
auto& [path, path_array] = it->second;
|
||||
assert(path_array.size() == ctx.current_size || path_array.size() == ctx.current_size + 1);
|
||||
if (path_array.size() == ctx.current_size) {
|
||||
bool inserted = tryInsertDefaultFromNested(ctx, path, path_array);
|
||||
if (!inserted) {
|
||||
path_array.emplace_back();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <typename ParserImpl>
|
||||
bool JSONDataParser<ParserImpl>::tryInsertDefaultFromNested(ParseArrayContext& ctx,
|
||||
const PathInData::Parts& path,
|
||||
Array& array) {
|
||||
/// If there is a collected size of current Nested
|
||||
/// then insert array of this size as a default value.
|
||||
if (path.empty() || array.empty()) {
|
||||
return false;
|
||||
}
|
||||
/// Last element is not Null, because otherwise this path wouldn't exist.
|
||||
auto nested_key = getNameOfNested(path, array.back());
|
||||
if (nested_key.empty()) {
|
||||
return false;
|
||||
}
|
||||
auto mapped = ctx.nested_sizes_by_key.find(nested_key);
|
||||
if (mapped == ctx.nested_sizes_by_key.end()) {
|
||||
return false;
|
||||
}
|
||||
auto& current_nested_sizes = mapped->second;
|
||||
assert(current_nested_sizes.size() == ctx.current_size ||
|
||||
current_nested_sizes.size() == ctx.current_size + 1);
|
||||
/// If all keys of Nested were missed then add a zero length.
|
||||
if (current_nested_sizes.size() == ctx.current_size) {
|
||||
current_nested_sizes.push_back(0);
|
||||
}
|
||||
size_t array_size = current_nested_sizes.back();
|
||||
array.push_back(Array(array_size));
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename ParserImpl>
|
||||
StringRef JSONDataParser<ParserImpl>::getNameOfNested(const PathInData::Parts& path,
|
||||
const Field& value) {
|
||||
if (value.get_type() != Field::Types::Array || path.empty()) {
|
||||
return {};
|
||||
}
|
||||
/// Find first key that is marked as nested,
|
||||
/// because we may have tuple of Nested and there could be
|
||||
/// several arrays with the same prefix, but with independent sizes.
|
||||
/// Consider we have array element with type `k2 Tuple(k3 Nested(...), k5 Nested(...))`
|
||||
/// Then subcolumns `k2.k3` and `k2.k5` may have indepented sizes and we should extract
|
||||
/// `k3` and `k5` keys instead of `k2`.
|
||||
for (const auto& part : path) {
|
||||
if (part.is_nested) {
|
||||
return StringRef(part.key.data(), part.key.size());
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
template class JSONDataParser<SimdJSONParser>;
|
||||
} // namespace doris::vectorized
|
||||
Reference in New Issue
Block a user