[fix](Vectorized)fix json_object and json_array function return wrong result on vectorized engine (#13775)
Issue Number: close #13598
This commit is contained in:
@ -438,22 +438,28 @@ struct FunctionJsonArrayImpl {
|
||||
static void execute_parse(const std::string& type_flags,
|
||||
const std::vector<const ColumnString*>& data_columns,
|
||||
std::vector<rapidjson::Value>& objects,
|
||||
rapidjson::Document::AllocatorType& allocator) {
|
||||
rapidjson::Document::AllocatorType& allocator,
|
||||
const std::vector<const ColumnUInt8*>& nullmaps) {
|
||||
for (int i = 0; i < data_columns.size() - 1; i++) {
|
||||
constexpr_int_match<'0', '5', Reducer>::run(type_flags[i], objects, allocator,
|
||||
data_columns[i]);
|
||||
data_columns[i], nullmaps[i]);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TypeImpl>
|
||||
static void execute_type(std::vector<rapidjson::Value>& objects,
|
||||
rapidjson::Document::AllocatorType& allocator,
|
||||
const ColumnString* data_column) {
|
||||
const ColumnString* data_column, const ColumnUInt8* nullmap) {
|
||||
StringParser::ParseResult result;
|
||||
rapidjson::Value value;
|
||||
|
||||
for (int i = 0; i < objects.size(); i++) {
|
||||
TypeImpl::update_value(result, value, data_column->get_data_at(i), allocator);
|
||||
if (nullmap != nullptr && nullmap->get_data()[i]) {
|
||||
JsonParser<'0'>::update_value(result, value, data_column->get_data_at(i),
|
||||
allocator);
|
||||
} else {
|
||||
TypeImpl::update_value(result, value, data_column->get_data_at(i), allocator);
|
||||
}
|
||||
objects[i].PushBack(value, allocator);
|
||||
}
|
||||
}
|
||||
@ -468,40 +474,97 @@ struct FunctionJsonObjectImpl {
|
||||
static void execute_parse(std::string type_flags,
|
||||
const std::vector<const ColumnString*>& data_columns,
|
||||
std::vector<rapidjson::Value>& objects,
|
||||
rapidjson::Document::AllocatorType& allocator) {
|
||||
rapidjson::Document::AllocatorType& allocator,
|
||||
const std::vector<const ColumnUInt8*>& nullmaps) {
|
||||
for (auto& array_object : objects) {
|
||||
array_object.SetObject();
|
||||
}
|
||||
|
||||
for (int i = 0; i + 1 < data_columns.size() - 1; i += 2) {
|
||||
constexpr_int_match<'0', '5', Reducer>::run(type_flags[i + 1], objects, allocator,
|
||||
data_columns[i], data_columns[i + 1]);
|
||||
data_columns[i], data_columns[i + 1],
|
||||
nullmaps[i + 1]);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TypeImpl>
|
||||
static void execute_type(std::vector<rapidjson::Value>& objects,
|
||||
rapidjson::Document::AllocatorType& allocator,
|
||||
const ColumnString* key_column, const ColumnString* value_column) {
|
||||
const ColumnString* key_column, const ColumnString* value_column,
|
||||
const ColumnUInt8* nullmap) {
|
||||
StringParser::ParseResult result;
|
||||
rapidjson::Value key;
|
||||
rapidjson::Value value;
|
||||
|
||||
for (int i = 0; i < objects.size(); i++) {
|
||||
JsonParser<'5'>::update_value(result, key, key_column->get_data_at(i),
|
||||
allocator); // key always is string
|
||||
TypeImpl::update_value(result, value, value_column->get_data_at(i), allocator);
|
||||
if (nullmap != nullptr && nullmap->get_data()[i]) {
|
||||
JsonParser<'0'>::update_value(result, value, value_column->get_data_at(i),
|
||||
allocator);
|
||||
} else {
|
||||
TypeImpl::update_value(result, value, value_column->get_data_at(i), allocator);
|
||||
}
|
||||
objects[i].AddMember(key, value, allocator);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <typename SpecificImpl>
|
||||
struct FunctionJsonImpl {
|
||||
class FunctionJsonAlwaysNotNullable : public IFunction {
|
||||
public:
|
||||
static constexpr auto name = SpecificImpl::name;
|
||||
|
||||
static FunctionPtr create() {
|
||||
return std::make_shared<FunctionJsonAlwaysNotNullable<SpecificImpl>>();
|
||||
}
|
||||
|
||||
bool use_default_implementation_for_nulls() const override { return false; }
|
||||
|
||||
String get_name() const override { return name; }
|
||||
|
||||
size_t get_number_of_arguments() const override { return 0; }
|
||||
|
||||
bool is_variadic() const override { return true; }
|
||||
|
||||
bool use_default_implementation_for_constants() const override { return true; }
|
||||
|
||||
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
|
||||
return std::make_shared<DataTypeString>();
|
||||
}
|
||||
|
||||
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
|
||||
size_t result, size_t input_rows_count) override {
|
||||
auto result_column = ColumnString::create();
|
||||
|
||||
std::vector<ColumnPtr> column_ptrs; // prevent converted column destruct
|
||||
std::vector<const ColumnString*> data_columns;
|
||||
std::vector<const ColumnUInt8*> nullmaps;
|
||||
for (int i = 0; i < arguments.size(); i++) {
|
||||
auto column = block.get_by_position(arguments[i]).column;
|
||||
column_ptrs.push_back(column->convert_to_full_column_if_const());
|
||||
const ColumnNullable* col_nullable =
|
||||
check_and_get_column<ColumnNullable>(column_ptrs.back().get());
|
||||
if (col_nullable) {
|
||||
const ColumnUInt8* col_nullmap = check_and_get_column<ColumnUInt8>(
|
||||
col_nullable->get_null_map_column_ptr().get());
|
||||
nullmaps.push_back(col_nullmap);
|
||||
const ColumnString* col = check_and_get_column<ColumnString>(
|
||||
col_nullable->get_nested_column_ptr().get());
|
||||
data_columns.push_back(col);
|
||||
} else {
|
||||
nullmaps.push_back(nullptr);
|
||||
data_columns.push_back(assert_cast<const ColumnString*>(column_ptrs.back().get()));
|
||||
}
|
||||
}
|
||||
execute(data_columns, *assert_cast<ColumnString*>(result_column.get()), input_rows_count,
|
||||
nullmaps);
|
||||
block.get_by_position(result).column = std::move(result_column);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
static void execute(const std::vector<const ColumnString*>& data_columns,
|
||||
ColumnString& result_column, size_t input_rows_count) {
|
||||
ColumnString& result_column, size_t input_rows_count,
|
||||
const std::vector<const ColumnUInt8*> nullmaps) {
|
||||
std::string type_flags = data_columns.back()->get_data_at(0).to_string();
|
||||
|
||||
rapidjson::Document document;
|
||||
@ -512,13 +575,14 @@ struct FunctionJsonImpl {
|
||||
objects.emplace_back(rapidjson::kArrayType);
|
||||
}
|
||||
|
||||
SpecificImpl::execute_parse(type_flags, data_columns, objects, allocator);
|
||||
SpecificImpl::execute_parse(type_flags, data_columns, objects, allocator, nullmaps);
|
||||
|
||||
rapidjson::StringBuffer buf;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(buf);
|
||||
|
||||
for (int i = 0; i < input_rows_count; i++) {
|
||||
buf.Clear();
|
||||
writer.Reset(buf);
|
||||
objects[i].Accept(writer);
|
||||
result_column.insert_data(buf.GetString(), buf.GetSize());
|
||||
}
|
||||
@ -596,8 +660,8 @@ void register_function_json(SimpleFunctionFactory& factory) {
|
||||
factory.register_function<FunctionGetJsonDouble>();
|
||||
factory.register_function<FunctionGetJsonString>();
|
||||
|
||||
factory.register_function<FunctionJson<FunctionJsonImpl<FunctionJsonArrayImpl>>>();
|
||||
factory.register_function<FunctionJson<FunctionJsonImpl<FunctionJsonObjectImpl>>>();
|
||||
factory.register_function<FunctionJsonAlwaysNotNullable<FunctionJsonArrayImpl>>();
|
||||
factory.register_function<FunctionJsonAlwaysNotNullable<FunctionJsonObjectImpl>>();
|
||||
factory.register_function<FunctionJson<FunctionJsonQuoteImpl>>();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user