[feature](jsonb)support json_length and json_contains function (#24332)

This commit is contained in:
Liqf
2023-09-20 10:40:44 +08:00
committed by GitHub
parent e59aa49f28
commit 14bd290aec
18 changed files with 1348 additions and 23 deletions

View File

@ -539,15 +539,6 @@ public:
rdata.size()));
}
// if not valid json path , should return error message to user
if (is_invalid_json_path) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(rdata.data()),
rdata.size()));
}
json_path_list[pi] = std::move(path);
return Status::OK();
@ -1011,6 +1002,268 @@ using FunctionJsonbExtractDouble = FunctionJsonbExtract<JsonbExtractDouble>;
using FunctionJsonbExtractString = FunctionJsonbExtract<JsonbExtractString>;
using FunctionJsonbExtractJsonb = FunctionJsonbExtract<JsonbExtractJsonb>;
template <typename Impl>
class FunctionJsonbLength : public IFunction {
public:
static constexpr auto name = "json_length";
String get_name() const override { return name; }
static FunctionPtr create() { return std::make_shared<FunctionJsonbLength<Impl>>(); }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeInt32>());
}
DataTypes get_variadic_argument_types_impl() const override {
return Impl::get_variadic_argument_types();
}
size_t get_number_of_arguments() const override {
return get_variadic_argument_types_impl().size();
}
bool use_default_implementation_for_nulls() const override { return false; }
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
return Impl::execute_impl(context, block, arguments, result, input_rows_count);
}
};
struct JsonbLengthUtil {
static Status jsonb_length_execute(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, size_t result,
size_t input_rows_count) {
DCHECK_GE(arguments.size(), 2);
ColumnPtr jsonb_data_column;
bool jsonb_data_const = false;
// prepare jsonb data column
std::tie(jsonb_data_column, jsonb_data_const) =
unpack_if_const(block.get_by_position(arguments[0]).column);
ColumnPtr path_column;
bool is_const = false;
std::tie(path_column, is_const) =
unpack_if_const(block.get_by_position(arguments[1]).column);
JsonbPath path;
if (is_const) {
auto path_value = path_column->get_data_at(0);
if (!path.seek(path_value.data, path_value.size)) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(path_value.data),
path_value.size));
}
}
auto null_map = ColumnUInt8::create(input_rows_count, 0);
auto return_type = block.get_data_type(result);
MutableColumnPtr res = return_type->create_column();
for (size_t i = 0; i < input_rows_count; ++i) {
if (jsonb_data_column->is_null_at(i) || path_column->is_null_at(i)) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
continue;
}
if (!is_const) {
auto path_value = path_column->get_data_at(i);
path.clean();
if (!path.seek(path_value.data, path_value.size)) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(path_value.data),
path_value.size));
}
}
auto jsonb_value = jsonb_data_column->get_data_at(i);
// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
JsonbDocument* doc = JsonbDocument::createDocument(jsonb_value.data, jsonb_value.size);
JsonbValue* value = doc->getValue()->findValue(path, nullptr);
if (UNLIKELY(jsonb_value.size == 0 || !value)) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
continue;
}
auto length = value->length();
res->insert_data(const_cast<const char*>((char*)&length), 0);
}
block.replace_by_position(result,
ColumnNullable::create(std::move(res), std::move(null_map)));
return Status::OK();
}
};
struct JsonbLengthImpl {
static DataTypes get_variadic_argument_types() { return {std::make_shared<DataTypeJsonb>()}; }
static Status execute_impl(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, size_t result,
size_t input_rows_count) {
auto path = ColumnString::create();
std::string root_path = "$";
for (int i = 0; i < input_rows_count; i++) {
reinterpret_cast<ColumnString*>(path.get())
->insert_data(root_path.data(), root_path.size());
}
block.insert({std::move(path), std::make_shared<DataTypeString>(), "path"});
ColumnNumbers temp_arguments = {arguments[0], block.columns() - 1};
return JsonbLengthUtil::jsonb_length_execute(context, block, temp_arguments, result,
input_rows_count);
}
};
struct JsonbLengthAndPathImpl {
static DataTypes get_variadic_argument_types() {
return {std::make_shared<DataTypeJsonb>(), std::make_shared<DataTypeString>()};
}
static Status execute_impl(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, size_t result,
size_t input_rows_count) {
return JsonbLengthUtil::jsonb_length_execute(context, block, arguments, result,
input_rows_count);
}
};
template <typename Impl>
class FunctionJsonbContains : public IFunction {
public:
static constexpr auto name = "json_contains";
String get_name() const override { return name; }
static FunctionPtr create() { return std::make_shared<FunctionJsonbContains<Impl>>(); }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeUInt8>());
}
DataTypes get_variadic_argument_types_impl() const override {
return Impl::get_variadic_argument_types();
}
size_t get_number_of_arguments() const override {
return get_variadic_argument_types_impl().size();
}
bool use_default_implementation_for_nulls() const override { return false; }
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
return Impl::execute_impl(context, block, arguments, result, input_rows_count);
}
};
struct JsonbContainsUtil {
static Status jsonb_contains_execute(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, size_t result,
size_t input_rows_count) {
DCHECK_GE(arguments.size(), 3);
auto jsonb_data1_column = block.get_by_position(arguments[0]).column;
auto jsonb_data2_column = block.get_by_position(arguments[1]).column;
ColumnPtr path_column;
bool is_const = false;
std::tie(path_column, is_const) =
unpack_if_const(block.get_by_position(arguments[2]).column);
JsonbPath path;
if (is_const) {
auto path_value = path_column->get_data_at(0);
if (!path.seek(path_value.data, path_value.size)) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(path_value.data),
path_value.size));
}
}
auto null_map = ColumnUInt8::create(input_rows_count, 0);
auto return_type = block.get_data_type(result);
MutableColumnPtr res = return_type->create_column();
for (size_t i = 0; i < input_rows_count; ++i) {
if (jsonb_data1_column->is_null_at(i) || jsonb_data2_column->is_null_at(i) ||
path_column->is_null_at(i)) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
continue;
}
if (!is_const) {
auto path_value = path_column->get_data_at(i);
path.clean();
if (!path.seek(path_value.data, path_value.size)) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(path_value.data),
path_value.size));
}
}
auto jsonb_value1 = jsonb_data1_column->get_data_at(i);
auto jsonb_value2 = jsonb_data2_column->get_data_at(i);
// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
JsonbDocument* doc1 =
JsonbDocument::createDocument(jsonb_value1.data, jsonb_value1.size);
JsonbDocument* doc2 =
JsonbDocument::createDocument(jsonb_value2.data, jsonb_value2.size);
JsonbValue* value1 = doc1->getValue()->findValue(path, nullptr);
JsonbValue* value2 = doc2->getValue();
if (UNLIKELY(jsonb_value1.size == 0 || jsonb_value2.size == 0 || !value1 || !value2)) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
continue;
}
auto contains_value = value1->contains(value2);
res->insert_data(const_cast<const char*>((char*)&contains_value), 0);
}
block.replace_by_position(result,
ColumnNullable::create(std::move(res), std::move(null_map)));
return Status::OK();
}
};
struct JsonbContainsImpl {
static DataTypes get_variadic_argument_types() {
return {std::make_shared<DataTypeJsonb>(), std::make_shared<DataTypeJsonb>()};
}
static Status execute_impl(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, size_t result,
size_t input_rows_count) {
auto path = ColumnString::create();
std::string root_path = "$";
for (int i = 0; i < input_rows_count; i++) {
reinterpret_cast<ColumnString*>(path.get())
->insert_data(root_path.data(), root_path.size());
}
block.insert({std::move(path), std::make_shared<DataTypeString>(), "path"});
ColumnNumbers temp_arguments = {arguments[0], arguments[1], block.columns() - 1};
return JsonbContainsUtil::jsonb_contains_execute(context, block, temp_arguments, result,
input_rows_count);
}
};
struct JsonbContainsAndPathImpl {
static DataTypes get_variadic_argument_types() {
return {std::make_shared<DataTypeJsonb>(), std::make_shared<DataTypeJsonb>(),
std::make_shared<DataTypeString>()};
}
static Status execute_impl(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, size_t result,
size_t input_rows_count) {
return JsonbContainsUtil::jsonb_contains_execute(context, block, arguments, result,
input_rows_count);
}
};
void register_function_jsonb(SimpleFunctionFactory& factory) {
factory.register_function<FunctionJsonbParse>(FunctionJsonbParse::name);
factory.register_alias(FunctionJsonbParse::name, FunctionJsonbParse::alias);
@ -1067,6 +1320,11 @@ void register_function_jsonb(SimpleFunctionFactory& factory) {
factory.register_alias(FunctionJsonbExtractString::name, FunctionJsonbExtractString::alias);
factory.register_function<FunctionJsonbExtractJsonb>();
// factory.register_alias(FunctionJsonbExtractJsonb::name, FunctionJsonbExtractJsonb::alias);
factory.register_function<FunctionJsonbLength<JsonbLengthImpl>>();
factory.register_function<FunctionJsonbLength<JsonbLengthAndPathImpl>>();
factory.register_function<FunctionJsonbContains<JsonbContainsImpl>>();
factory.register_function<FunctionJsonbContains<JsonbContainsAndPathImpl>>();
}
} // namespace doris::vectorized