[feature](json-function) add json_insert, json_replace, json_set functions (#24384)
[feature](json-function) add three json funcitons
This commit is contained in:
@ -1136,6 +1136,327 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
enum class JsonModifyType {
|
||||
JSON_INSERT = 0,
|
||||
JSON_REPLACE,
|
||||
JSON_SET,
|
||||
};
|
||||
|
||||
struct FunctionJsonInsert {
|
||||
static constexpr auto name = "json_insert";
|
||||
static constexpr auto modify_type = JsonModifyType::JSON_INSERT;
|
||||
};
|
||||
|
||||
struct FunctionJsonReplace {
|
||||
static constexpr auto name = "json_replace";
|
||||
static constexpr auto modify_type = JsonModifyType::JSON_REPLACE;
|
||||
};
|
||||
struct FunctionJsonSet {
|
||||
static constexpr auto name = "json_set";
|
||||
static constexpr auto modify_type = JsonModifyType::JSON_SET;
|
||||
};
|
||||
|
||||
template <typename Kind>
|
||||
class FunctionJsonModifyImpl : public IFunction {
|
||||
private:
|
||||
// T = std::vector<std::string>
|
||||
// TODO: update RE2 to support std::vector<std::string_view>
|
||||
// if path is not a valid path expression or contains
|
||||
// a * wildcard, return runtime error.
|
||||
template <typename T>
|
||||
Status get_parsed_paths_with_status(const T& path_exprs, std::vector<JsonPath>* parsed_paths) {
|
||||
if (UNLIKELY(path_exprs.empty())) {
|
||||
return Status::RuntimeError("json path empty function {}", get_name());
|
||||
}
|
||||
|
||||
if (path_exprs[0] != "$") {
|
||||
// keep same behaviour with get_parsed_paths(),
|
||||
// '$[0]' is invalid path, '$.[0]' is valid
|
||||
return Status::RuntimeError(
|
||||
"Invalid JSON path expression. The error is around character position 1");
|
||||
}
|
||||
parsed_paths->emplace_back("$", -1, true);
|
||||
|
||||
for (int i = 1; i < path_exprs.size(); i++) {
|
||||
std::string col;
|
||||
std::string index;
|
||||
if (UNLIKELY(!RE2::FullMatch(path_exprs[i], JSON_PATTERN, &col, &index))) {
|
||||
return Status::RuntimeError(
|
||||
"Invalid JSON path expression. The error is around character position {}",
|
||||
i + 1);
|
||||
} else {
|
||||
int idx = -1;
|
||||
if (!index.empty()) {
|
||||
if (index == "*") {
|
||||
return Status::RuntimeError(
|
||||
"In this situation, path expressions may not contain the * token");
|
||||
} else {
|
||||
idx = atoi(index.c_str());
|
||||
}
|
||||
}
|
||||
parsed_paths->emplace_back(col, idx, true);
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status get_parsed_path_columns(std::vector<std::vector<std::vector<JsonPath>>>& json_paths,
|
||||
const std::vector<const ColumnString*>& data_columns,
|
||||
size_t input_rows_count) {
|
||||
for (auto col = 1; col + 1 < data_columns.size() - 1; col += 2) {
|
||||
json_paths.emplace_back(std::vector<std::vector<JsonPath>>());
|
||||
for (auto row = 0; row < input_rows_count; row++) {
|
||||
const auto path = data_columns[col]->get_data_at(row);
|
||||
std::string_view path_string(path.data, path.size);
|
||||
std::vector<JsonPath> parsed_paths;
|
||||
|
||||
#ifdef USE_LIBCPP
|
||||
std::string s(path_string);
|
||||
auto tok = get_json_token(s);
|
||||
#else
|
||||
auto tok = get_json_token(path_string);
|
||||
#endif
|
||||
std::vector<std::string> paths(tok.begin(), tok.end());
|
||||
RETURN_IF_ERROR(get_parsed_paths_with_status(paths, &parsed_paths));
|
||||
json_paths[col / 2].emplace_back(parsed_paths);
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
public:
|
||||
static constexpr auto name = Kind::name;
|
||||
|
||||
static FunctionPtr create() { return std::make_shared<FunctionJsonModifyImpl<Kind>>(); }
|
||||
|
||||
String get_name() const override { return name; }
|
||||
|
||||
size_t get_number_of_arguments() const override { return 0; }
|
||||
|
||||
bool is_variadic() const override { return true; }
|
||||
|
||||
bool use_default_implementation_for_nulls() const override { return false; }
|
||||
|
||||
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
|
||||
bool is_nullable = false;
|
||||
// arguments: (json_str, path, val[, path, val...], type_flag)
|
||||
for (auto col = 2; col < arguments.size() - 1; col += 2) {
|
||||
if (arguments[col]->is_nullable()) {
|
||||
is_nullable = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return is_nullable ? make_nullable(std::make_shared<DataTypeString>())
|
||||
: std::make_shared<DataTypeString>();
|
||||
}
|
||||
|
||||
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
|
||||
size_t result, size_t input_rows_count) override {
|
||||
auto result_column = ColumnString::create();
|
||||
bool is_nullable = false;
|
||||
auto ret_null_map = ColumnUInt8::create(0, 0);
|
||||
|
||||
std::vector<ColumnPtr> column_ptrs; // prevent converted column destruct
|
||||
std::vector<const ColumnString*> data_columns;
|
||||
std::vector<const ColumnUInt8*> nullmaps;
|
||||
for (int i = 0; i < arguments.size(); i++) {
|
||||
auto column = block.get_by_position(arguments[i]).column;
|
||||
column_ptrs.push_back(column->convert_to_full_column_if_const());
|
||||
const ColumnNullable* col_nullable =
|
||||
check_and_get_column<ColumnNullable>(column_ptrs.back().get());
|
||||
if (col_nullable) {
|
||||
if (!is_nullable) {
|
||||
is_nullable = true;
|
||||
ret_null_map = ColumnUInt8::create(input_rows_count, 0);
|
||||
}
|
||||
const ColumnUInt8* col_nullmap = check_and_get_column<ColumnUInt8>(
|
||||
col_nullable->get_null_map_column_ptr().get());
|
||||
nullmaps.push_back(col_nullmap);
|
||||
const ColumnString* col = check_and_get_column<ColumnString>(
|
||||
col_nullable->get_nested_column_ptr().get());
|
||||
data_columns.push_back(col);
|
||||
} else {
|
||||
nullmaps.push_back(nullptr);
|
||||
data_columns.push_back(assert_cast<const ColumnString*>(column_ptrs.back().get()));
|
||||
}
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(execute_process(
|
||||
data_columns, *assert_cast<ColumnString*>(result_column.get()), input_rows_count,
|
||||
nullmaps, is_nullable, *assert_cast<ColumnUInt8*>(ret_null_map.get())));
|
||||
|
||||
if (is_nullable) {
|
||||
block.replace_by_position(result, ColumnNullable::create(std::move(result_column),
|
||||
std::move(ret_null_map)));
|
||||
} else {
|
||||
block.get_by_position(result).column = std::move(result_column);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status execute_process(const std::vector<const ColumnString*>& data_columns,
|
||||
ColumnString& result_column, size_t input_rows_count,
|
||||
const std::vector<const ColumnUInt8*> nullmaps, bool is_nullable,
|
||||
ColumnUInt8& ret_null_map) {
|
||||
std::string type_flags = data_columns.back()->get_data_at(0).to_string();
|
||||
|
||||
std::vector<rapidjson::Document> objects;
|
||||
for (auto row = 0; row < input_rows_count; row++) {
|
||||
objects.emplace_back(rapidjson::kNullType);
|
||||
const auto json_doc = data_columns[0]->get_data_at(row);
|
||||
std::string_view json_str(json_doc.data, json_doc.size);
|
||||
objects[row].Parse(json_str.data(), json_str.size());
|
||||
if (UNLIKELY(objects[row].HasParseError())) {
|
||||
return Status::RuntimeError("invalid json str {}: function {}", json_str,
|
||||
get_name());
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::vector<std::vector<JsonPath>>> json_paths;
|
||||
RETURN_IF_ERROR(get_parsed_path_columns(json_paths, data_columns, input_rows_count));
|
||||
|
||||
execute_parse(type_flags, data_columns, objects, json_paths, nullmaps);
|
||||
|
||||
rapidjson::StringBuffer buf;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(buf);
|
||||
|
||||
for (int i = 0; i < input_rows_count; i++) {
|
||||
buf.Clear();
|
||||
writer.Reset(buf);
|
||||
objects[i].Accept(writer);
|
||||
if (is_nullable && objects[i].IsNull()) {
|
||||
ret_null_map.get_data()[i] = 1;
|
||||
}
|
||||
result_column.insert_data(buf.GetString(), buf.GetSize());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
template <int flag>
|
||||
using Reducer = ExecuteReducer<flag, FunctionJsonModifyImpl<Kind>>;
|
||||
|
||||
static void execute_parse(std::string type_flags,
|
||||
const std::vector<const ColumnString*>& data_columns,
|
||||
std::vector<rapidjson::Document>& objects,
|
||||
std::vector<std::vector<std::vector<JsonPath>>>& json_paths,
|
||||
const std::vector<const ColumnUInt8*>& nullmaps) {
|
||||
for (auto col = 1; col + 1 < data_columns.size() - 1; col += 2) {
|
||||
constexpr_int_match<'0', '6', Reducer>::run(type_flags[col + 1], objects,
|
||||
json_paths[col / 2], data_columns[col + 1],
|
||||
nullmaps[col + 1]);
|
||||
}
|
||||
}
|
||||
|
||||
static void modify_value(const std::vector<JsonPath>& parsed_paths, rapidjson::Value* document,
|
||||
rapidjson::Document::AllocatorType& mem_allocator, bool is_insert,
|
||||
bool is_replace, rapidjson::Value* value) {
|
||||
rapidjson::Value* root = document;
|
||||
rapidjson::Value key;
|
||||
|
||||
auto i = 1;
|
||||
for (; i < parsed_paths.size(); i++) {
|
||||
if (root->IsNull()) {
|
||||
return;
|
||||
}
|
||||
const std::string& col = parsed_paths[i].key;
|
||||
int index = parsed_paths[i].idx;
|
||||
if (LIKELY(!col.empty())) {
|
||||
if (root->IsObject()) {
|
||||
if (!root->HasMember(col.c_str())) {
|
||||
break;
|
||||
} else {
|
||||
root = &((*root)[col.c_str()]);
|
||||
}
|
||||
} else {
|
||||
// not object
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (UNLIKELY(index != -1)) {
|
||||
if (root->IsArray()) {
|
||||
if (index >= root->Size()) {
|
||||
// array append new value
|
||||
if (is_insert && i + 1 == parsed_paths.size()) {
|
||||
root->PushBack(*value, mem_allocator);
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
root = &((*root)[index]);
|
||||
}
|
||||
} else {
|
||||
if (i + 1 == parsed_paths.size()) {
|
||||
// replace, example:
|
||||
// json_replace({"a": 1}, '$.[0]', null);
|
||||
// output: null
|
||||
if (is_replace && index == 0) {
|
||||
*root = *value;
|
||||
return;
|
||||
}
|
||||
// convert to array, example:
|
||||
// json_insert({"a": 1}, '$.[1]', 3);
|
||||
// output: [{"a": 1}, 3]
|
||||
if (is_insert && index > 0) {
|
||||
key.SetArray();
|
||||
root->Swap(key);
|
||||
root->PushBack(key, mem_allocator);
|
||||
root->PushBack(*value, mem_allocator);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (is_insert && i + 1 == parsed_paths.size()) {
|
||||
if (LIKELY(root->IsObject())) {
|
||||
// object insert new value
|
||||
const std::string& col = parsed_paths[i].key;
|
||||
int index = parsed_paths[i].idx;
|
||||
if (LIKELY(!col.empty() && index == -1)) {
|
||||
key.SetString(col.c_str(), mem_allocator);
|
||||
root->AddMember(key, *value, mem_allocator);
|
||||
}
|
||||
}
|
||||
} else if (is_replace && i == parsed_paths.size()) {
|
||||
*root = *value;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TypeImpl>
|
||||
static void execute_type(std::vector<rapidjson::Document>& objects,
|
||||
std::vector<std::vector<JsonPath>>& paths_column,
|
||||
const ColumnString* value_column, const ColumnUInt8* nullmap) {
|
||||
StringParser::ParseResult result;
|
||||
rapidjson::Value value;
|
||||
for (auto row = 0; row < objects.size(); row++) {
|
||||
std::vector<JsonPath>* parsed_paths = &paths_column[row];
|
||||
|
||||
if (nullmap != nullptr && nullmap->get_data()[row]) {
|
||||
JsonParser<'0'>::update_value(result, value, value_column->get_data_at(row),
|
||||
objects[row].GetAllocator());
|
||||
} else {
|
||||
TypeImpl::update_value(result, value, value_column->get_data_at(row),
|
||||
objects[row].GetAllocator());
|
||||
}
|
||||
|
||||
switch (Kind::modify_type) {
|
||||
case JsonModifyType::JSON_INSERT:
|
||||
modify_value(*parsed_paths, &objects[row], objects[row].GetAllocator(), true, false,
|
||||
&value);
|
||||
break;
|
||||
case JsonModifyType::JSON_REPLACE:
|
||||
modify_value(*parsed_paths, &objects[row], objects[row].GetAllocator(), false, true,
|
||||
&value);
|
||||
break;
|
||||
case JsonModifyType::JSON_SET:
|
||||
modify_value(*parsed_paths, &objects[row], objects[row].GetAllocator(), true, true,
|
||||
&value);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void register_function_json(SimpleFunctionFactory& factory) {
|
||||
factory.register_function<FunctionGetJsonInt>();
|
||||
factory.register_function<FunctionGetJsonBigInt>();
|
||||
@ -1150,6 +1471,10 @@ void register_function_json(SimpleFunctionFactory& factory) {
|
||||
|
||||
factory.register_function<FunctionJsonValid>();
|
||||
factory.register_function<FunctionJsonContains>();
|
||||
|
||||
factory.register_function<FunctionJsonModifyImpl<FunctionJsonInsert>>();
|
||||
factory.register_function<FunctionJsonModifyImpl<FunctionJsonReplace>>();
|
||||
factory.register_function<FunctionJsonModifyImpl<FunctionJsonSet>>();
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
Reference in New Issue
Block a user