// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "vec/exec/vjson_scanner.h" #include #include #include "exec/line_reader.h" #include "exprs/json_functions.h" #include "runtime/runtime_state.h" #include "vec/data_types/data_type_string.h" namespace doris::vectorized { template VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter) : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter) {} template Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) { SCOPED_TIMER(_read_timer); RETURN_IF_ERROR(_init_src_block()); const int batch_size = _state->batch_size(); auto columns = _src_block.mutate_columns(); // Get one line while (columns[0]->size() < batch_size && !_scanner_eof) { if (_real_reader == nullptr || _cur_reader_eof) { RETURN_IF_ERROR(open_next_reader()); // If there isn't any more reader, break this if (_scanner_eof) { break; } } if (_read_json_by_line && _skip_next_line) { size_t size = 0; const uint8_t* line_ptr = nullptr; RETURN_IF_ERROR(_cur_line_reader->read_line(&line_ptr, &size, &_cur_reader_eof)); _skip_next_line = false; continue; } bool is_empty_row = false; if constexpr (std::is_same_v) { RETURN_IF_ERROR(_cur_vjson_reader->read_json_column(_src_block, _src_slot_descs, &is_empty_row, &_cur_reader_eof)); } else { RETURN_IF_ERROR(_cur_vjson_reader->read_json_column(columns, _src_slot_descs, &is_empty_row, &_cur_reader_eof)); } if (is_empty_row) { // Read empty row, just continue continue; } } COUNTER_UPDATE(_rows_read_counter, columns[0]->size()); SCOPED_TIMER(_materialize_timer); return _fill_dest_block(output_block, eof); } template Status VJsonScanner::open_next_reader() { if (_next_range >= _ranges.size()) { _scanner_eof = true; return Status::OK(); } RETURN_IF_ERROR(JsonScanner::open_based_reader()); RETURN_IF_ERROR(open_vjson_reader()); _next_range++; return Status::OK(); } template Status VJsonScanner::open_vjson_reader() { if (_cur_vjson_reader != nullptr) { _cur_vjson_reader.reset(); } std::string json_root = ""; std::string jsonpath = ""; bool strip_outer_array = false; bool num_as_string = false; bool fuzzy_parse = false; RETURN_IF_ERROR(JsonScanner::get_range_params(jsonpath, json_root, strip_outer_array, num_as_string, fuzzy_parse)); _cur_vjson_reader.reset(new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, fuzzy_parse, &_scanner_eof, _read_json_by_line ? nullptr : _real_reader, _read_json_by_line ? _cur_line_reader : nullptr)); RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root)); return Status::OK(); } VJsonReader::VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, bool strip_outer_array, bool num_as_string, bool fuzzy_parse, bool* scanner_eof, FileReader* file_reader, LineReader* line_reader) : JsonReader(state, counter, profile, strip_outer_array, num_as_string, fuzzy_parse, scanner_eof, file_reader, line_reader), _vhandle_json_callback(nullptr) {} VJsonReader::~VJsonReader() {} Status VJsonReader::init(const std::string& jsonpath, const std::string& json_root) { // generate _parsed_jsonpaths and _parsed_json_root RETURN_IF_ERROR(JsonReader::_parse_jsonpath_and_json_root(jsonpath, json_root)); //improve performance if (_parsed_jsonpaths.empty()) { // input is a simple json-string _vhandle_json_callback = &VJsonReader::_vhandle_simple_json; } else { // input is a complex json-string and a json-path if (_strip_outer_array) { _vhandle_json_callback = &VJsonReader::_vhandle_flat_array_complex_json; } else { _vhandle_json_callback = &VJsonReader::_vhandle_nested_complex_json; } } return Status::OK(); } Status VJsonReader::read_json_column(std::vector& columns, const std::vector& slot_descs, bool* is_empty_row, bool* eof) { return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row, eof); } Status VJsonReader::_vhandle_simple_json(std::vector& columns, const std::vector& slot_descs, bool* is_empty_row, bool* eof) { do { bool valid = false; if (_next_line >= _total_lines) { // parse json and generic document Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { continue; // continue to read next } RETURN_IF_ERROR(st); if (*is_empty_row == true) { return Status::OK(); } _name_map.clear(); rapidjson::Value* objectValue = nullptr; if (_json_doc->IsArray()) { _total_lines = _json_doc->Size(); if (_total_lines == 0) { // may be passing an empty json, such as "[]" RETURN_IF_ERROR(_append_error_msg(*_json_doc, "Empty json line", "", nullptr)); if (*_scanner_eof) { *is_empty_row = true; return Status::OK(); } continue; } objectValue = &(*_json_doc)[0]; } else { _total_lines = 1; // only one row objectValue = _json_doc; } _next_line = 0; if (_fuzzy_parse) { for (auto v : slot_descs) { for (int i = 0; i < objectValue->MemberCount(); ++i) { auto it = objectValue->MemberBegin() + i; if (v->col_name() == it->name.GetString()) { _name_map[v->col_name()] = i; break; } } } } } if (_json_doc->IsArray()) { // handle case 1 rapidjson::Value& objectValue = (*_json_doc)[_next_line]; // json object RETURN_IF_ERROR(_set_column_value(objectValue, columns, slot_descs, &valid)); } else { // handle case 2 RETURN_IF_ERROR(_set_column_value(*_json_doc, columns, slot_descs, &valid)); } _next_line++; if (!valid) { if (*_scanner_eof) { // When _scanner_eof is true and valid is false, it means that we have encountered // unqualified data and decided to stop the scan. *is_empty_row = true; return Status::OK(); } continue; } *is_empty_row = false; break; // get a valid row, then break } while (_next_line <= _total_lines); return Status::OK(); } // for simple format json // set valid to true and return OK if succeed. // set valid to false and return OK if we met an invalid row. // return other status if encounter other problmes. Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, std::vector& columns, const std::vector& slot_descs, bool* valid) { if (!objectValue.IsObject()) { // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, // not other type of Json format. RETURN_IF_ERROR(_append_error_msg(objectValue, "Expect json object value", "", valid)); return Status::OK(); } int ctx_idx = 0; bool has_valid_value = false; size_t cur_row_count = columns[0]->size(); for (auto slot_desc : slot_descs) { if (!slot_desc->is_materialized()) { continue; } int dest_index = ctx_idx++; auto* column_ptr = columns[dest_index].get(); rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd(); if (_fuzzy_parse) { auto idx_it = _name_map.find(slot_desc->col_name()); if (idx_it != _name_map.end() && idx_it->second < objectValue.MemberCount()) { it = objectValue.MemberBegin() + idx_it->second; } } else { it = objectValue.FindMember( rapidjson::Value(slot_desc->col_name().c_str(), slot_desc->col_name().size())); } if (it != objectValue.MemberEnd()) { const rapidjson::Value& value = it->value; RETURN_IF_ERROR(_write_data_to_column(&value, slot_desc, column_ptr, valid)); if (!(*valid)) { return Status::OK(); } has_valid_value = true; } else { // not found // When the entire row has no valid value, this row should be filtered, // so the default value cannot be directly inserted here if (!slot_desc->is_nullable()) { RETURN_IF_ERROR(_append_error_msg( objectValue, "The column `{}` is not nullable, but it's not found in jsondata.", slot_desc->col_name(), valid)); break; } } } if (!has_valid_value) { RETURN_IF_ERROR(_append_error_msg(objectValue, "All fields is null, this is a invalid row.", "", valid)); return Status::OK(); } ctx_idx = 0; int nullcount = 0; // fill missing slot for (auto slot_desc : slot_descs) { if (!slot_desc->is_materialized()) { continue; } int dest_index = ctx_idx++; auto* column_ptr = columns[dest_index].get(); if (column_ptr->size() < cur_row_count + 1) { DCHECK(column_ptr->size() == cur_row_count); column_ptr->assume_mutable()->insert_default(); ++nullcount; } DCHECK(column_ptr->size() == cur_row_count + 1); } // There is at least one valid value here DCHECK(nullcount < columns.size()); *valid = true; return Status::OK(); } Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value, SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr, bool* valid) { const char* str_value = nullptr; char tmp_buf[128] = {0}; int32_t wbytes = 0; std::string json_str; vectorized::ColumnNullable* nullable_column = nullptr; if (slot_desc->is_nullable()) { nullable_column = reinterpret_cast(column_ptr); // kNullType will put 1 into the Null map, so there is no need to push 0 for kNullType. if (value->GetType() != rapidjson::Type::kNullType) { nullable_column->get_null_map_data().push_back(0); } else { nullable_column->insert_default(); } column_ptr = &nullable_column->get_nested_column(); } switch (value->GetType()) { case rapidjson::Type::kStringType: str_value = value->GetString(); wbytes = strlen(str_value); break; case rapidjson::Type::kNumberType: if (value->IsUint()) { wbytes = sprintf(tmp_buf, "%u", value->GetUint()); } else if (value->IsInt()) { wbytes = sprintf(tmp_buf, "%d", value->GetInt()); } else if (value->IsUint64()) { wbytes = sprintf(tmp_buf, "%" PRIu64, value->GetUint64()); } else if (value->IsInt64()) { wbytes = sprintf(tmp_buf, "%" PRId64, value->GetInt64()); } else { wbytes = sprintf(tmp_buf, "%f", value->GetDouble()); } str_value = tmp_buf; break; case rapidjson::Type::kFalseType: wbytes = 1; str_value = (char*)"0"; break; case rapidjson::Type::kTrueType: wbytes = 1; str_value = (char*)"1"; break; case rapidjson::Type::kNullType: if (!slot_desc->is_nullable()) { RETURN_IF_ERROR(_append_error_msg( *value, "Json value is null, but the column `{}` is not nullable.", slot_desc->col_name(), valid)); return Status::OK(); } // return immediately to prevent from repeatedly insert_data *valid = true; return Status::OK(); default: // for other type like array or object. we convert it to string to save json_str = JsonReader::_print_json_value(*value); wbytes = json_str.size(); str_value = json_str.c_str(); break; } // TODO: if the vexpr can support another 'slot_desc type' than 'TYPE_VARCHAR', // we need use a function to support these types to insert data in columns. DCHECK(slot_desc->type().type == TYPE_VARCHAR); assert_cast(column_ptr)->insert_data(str_value, wbytes); *valid = true; return Status::OK(); } Status VJsonReader::_vhandle_flat_array_complex_json(std::vector& columns, const std::vector& slot_descs, bool* is_empty_row, bool* eof) { do { if (_next_line >= _total_lines) { Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { continue; // continue to read next } RETURN_IF_ERROR(st); if (*is_empty_row == true) { if (st == Status::OK()) { return Status::OK(); } if (_total_lines == 0) { continue; } } } rapidjson::Value& objectValue = (*_json_doc)[_next_line++]; bool valid = true; RETURN_IF_ERROR(_write_columns_by_jsonpath(objectValue, slot_descs, columns, &valid)); if (!valid) { continue; // process next line } *is_empty_row = false; break; // get a valid row, then break } while (_next_line <= _total_lines); return Status::OK(); } Status VJsonReader::_vhandle_nested_complex_json(std::vector& columns, const std::vector& slot_descs, bool* is_empty_row, bool* eof) { while (true) { Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { continue; // continue to read next } RETURN_IF_ERROR(st); if (*is_empty_row == true) { return Status::OK(); } *is_empty_row = false; break; // read a valid row } bool valid = true; RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs, columns, &valid)); if (!valid) { // there is only one line in this case, so if it return false, just set is_empty_row true // so that the caller will continue reading next line. *is_empty_row = true; } return Status::OK(); } Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, const std::vector& slot_descs, std::vector& columns, bool* valid) { int ctx_idx = 0; bool has_valid_value = false; size_t cur_row_count = columns[0]->size(); for (auto slot_desc : slot_descs) { if (!slot_desc->is_materialized()) { continue; } int i = ctx_idx++; auto* column_ptr = columns[i].get(); rapidjson::Value* json_values = nullptr; bool wrap_explicitly = false; if (LIKELY(i < _parsed_jsonpaths.size())) { json_values = JsonFunctions::get_json_array_from_parsed_json( _parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator(), &wrap_explicitly); } if (json_values == nullptr) { // not match in jsondata. if (!slot_descs[i]->is_nullable()) { RETURN_IF_ERROR(_append_error_msg( objectValue, "The column `{}` is not nullable, but it's not found in jsondata.", slot_descs[i]->col_name(), valid)); return Status::OK(); } } else { CHECK(json_values->IsArray()); if (json_values->Size() == 1 && wrap_explicitly) { // NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array. // so here we unwrap the array to get the real element. // if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap. json_values = &((*json_values)[0]); } RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], column_ptr, valid)); if (!(*valid)) { return Status::OK(); } has_valid_value = true; } } if (!has_valid_value) { RETURN_IF_ERROR(_append_error_msg( objectValue, "All fields is null or not matched, this is a invalid row.", "", valid)); return Status::OK(); } ctx_idx = 0; for (auto slot_desc : slot_descs) { if (!slot_desc->is_materialized()) { continue; } int dest_index = ctx_idx++; auto* column_ptr = columns[dest_index].get(); if (column_ptr->size() < cur_row_count + 1) { DCHECK(column_ptr->size() == cur_row_count); column_ptr->assume_mutable()->insert_default(); } DCHECK(column_ptr->size() == cur_row_count + 1); } return Status::OK(); } Status VJsonReader::_parse_json(bool* is_empty_row, bool* eof) { size_t size = 0; Status st = JsonReader::_parse_json_doc(&size, eof); // terminate if encounter other errors RETURN_IF_ERROR(st); // read all data, then return if (size == 0 || *eof) { *is_empty_row = true; return Status::OK(); } if (!_parsed_jsonpaths.empty() && _strip_outer_array) { _total_lines = _json_doc->Size(); _next_line = 0; if (_total_lines == 0) { // meet an empty json array. *is_empty_row = true; } } return Status::OK(); } Status VJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string error_msg, std::string col_name, bool* valid) { std::string err_msg; if (!col_name.empty()) { fmt::memory_buffer error_buf; fmt::format_to(error_buf, error_msg, col_name); err_msg = fmt::to_string(error_buf); } else { err_msg = error_msg; } RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, [&]() -> std::string { return err_msg; }, _scanner_eof)); _counter->num_rows_filtered++; if (valid != nullptr) { // current row is invalid *valid = false; } return Status::OK(); } // simdjson VSIMDJsonReader::VSIMDJsonReader(doris::RuntimeState* state, doris::ScannerCounter* counter, RuntimeProfile* profile, bool strip_outer_array, bool num_as_string, bool fuzzy_parse, bool* scanner_eof, FileReader* file_reader, LineReader* line_reader) : _vhandle_json_callback(nullptr), _next_line(0), _total_lines(0), _state(state), _counter(counter), _profile(profile), _file_reader(file_reader), _line_reader(line_reader), _strip_outer_array(strip_outer_array), _scanner_eof(scanner_eof) { _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); _read_timer = ADD_TIMER(_profile, "ReadTime"); _file_read_timer = ADD_TIMER(_profile, "FileReadTime"); _json_parser = std::make_unique(); } VSIMDJsonReader::~VSIMDJsonReader() {} Status VSIMDJsonReader::init(const std::string& jsonpath, const std::string& json_root) { // generate _parsed_jsonpaths and _parsed_json_root RETURN_IF_ERROR(_parse_jsonpath_and_json_root(jsonpath, json_root)); // improve performance if (_parsed_jsonpaths.empty()) { // input is a simple json-string _vhandle_json_callback = &VSIMDJsonReader::_vhandle_simple_json; } else { // input is a complex json-string and a json-path if (_strip_outer_array) { _vhandle_json_callback = &VSIMDJsonReader::_vhandle_flat_array_complex_json; } else { _vhandle_json_callback = &VSIMDJsonReader::_vhandle_nested_complex_json; } } return Status::OK(); } Status VSIMDJsonReader::read_json_column(Block& block, const std::vector& slot_descs, bool* is_empty_row, bool* eof) { return (this->*_vhandle_json_callback)(block, slot_descs, is_empty_row, eof); } #define RETURN_IF_SIMDJSON_ERROR(error, col_name, valid) \ if (UNLIKELY(error)) { \ RETURN_IF_ERROR(_append_error_msg("Encounter error while iterate json", col_name, valid)); \ Status::DataQualityError("Encounter error while iterate json"); \ } // for simple format json // set valid to true and return OK if succeed. // set valid to false and return OK if we met an invalid row. // return other status if encounter other problmes. Status VSIMDJsonReader::_set_column_value(simdjson::ondemand::value objectValue, Block& block, const std::vector& slot_descs, bool* valid) { if (objectValue.type() != simdjson::ondemand::json_type::object) { // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, // not other type of Json format. RETURN_IF_ERROR(_append_error_msg("Expect json object value", "", valid)); return Status::OK(); } auto object_val = objectValue.get_object(); size_t cur_row_count = block.rows(); bool has_valid_value = false; // iterate through object, simdjson::ondemond will parsing on the fly for (auto field : object_val) { std::string_view key; RETURN_IF_SIMDJSON_ERROR(field.unescaped_key().get(key), "", valid) auto column_type_and_name = block.try_get_by_name(std::string(key)); if (!column_type_and_name) { continue; } _write_data_to_column(field.value(), nullptr, column_type_and_name->column->assume_mutable().get(), valid); if (!(*valid)) { return Status::OK(); } has_valid_value = true; } if (!has_valid_value) { RETURN_IF_ERROR(_append_error_msg("All fields is null, this is a invalid row.", "", valid)); return Status::OK(); } int nullcount = 0; // fill missing slot for (const auto& column_type_name : block) { auto column = column_type_name.column; if (column->size() < cur_row_count + 1) { DCHECK(column->size() == cur_row_count); column->assume_mutable()->insert_default(); ++nullcount; } DCHECK(column->size() == cur_row_count + 1); } if (nullcount == block.columns()) { RETURN_IF_ERROR(_append_error_msg("All fields is null, this is a invalid row.", "", valid)); return Status::OK(); } *valid = true; return Status::OK(); } Status VSIMDJsonReader::_write_data_to_column(simdjson::ondemand::value value, SlotDescriptor* slot_desc, vectorized::IColumn* column, bool* valid) { vectorized::ColumnNullable* nullable_column = nullptr; vectorized::IColumn* column_ptr = nullptr; if (column->is_nullable()) { nullable_column = assert_cast(column); column_ptr = &nullable_column->get_nested_column(); } // TODO: if the vexpr can support another 'slot_desc type' than 'TYPE_VARCHAR', // we need use a function to support these types to insert data in columns. ColumnString* column_string = assert_cast(column_ptr); if (value.is_null()) { if (column->is_nullable()) { // insert_default already push 1 to null_map nullable_column->insert_default(); } else { RETURN_IF_ERROR( _append_error_msg("Json value is null, but the column `{}` is not nullable.", slot_desc->col_name(), valid)); return Status::OK(); } } else if (value.type() == simdjson::ondemand::json_type::boolean) { nullable_column->get_null_map_data().push_back(0); if (value.get_bool()) { column_string->insert_data("1", 1); } else { column_string->insert_data("0", 1); } } else { // just return it's str representation auto str_view = simdjson::to_json_string(value).value(); if (str_view[0] == '\"' || str_view[0] == '\'') { str_view = str_view.substr(1, str_view.length() - 2); } nullable_column->get_null_map_data().push_back(0); column_string->insert_data(str_view.data(), str_view.length()); } *valid = true; return Status::OK(); } Status VSIMDJsonReader::_parse_json(bool* is_empty_row, bool* eof) { size_t size = 0; Status st = _parse_json_doc(&size, eof); // terminate if encounter other errors RETURN_IF_ERROR(st); // read all data, then return if (size == 0 || *eof) { *is_empty_row = true; return Status::OK(); } if (!_parsed_jsonpaths.empty() && _strip_outer_array) { _total_lines = _json_value.count_elements(); _next_line = 0; if (_total_lines == 0) { // meet an empty json array. *is_empty_row = true; } } return Status::OK(); } // read one json string from line reader or file reader and parse it to json doc. // return Status::DataQualityError() if data has quality error. // return other error if encounter other problemes. // return Status::OK() if parse succeed or reach EOF. Status VSIMDJsonReader::_parse_json_doc(size_t* size, bool* eof) { // read a whole message SCOPED_TIMER(_file_read_timer); const uint8_t* json_str = nullptr; std::unique_ptr json_str_ptr; if (_line_reader != nullptr) { RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof)); } else { int64_t length = 0; RETURN_IF_ERROR(_file_reader->read_one_message(&json_str_ptr, &length)); json_str = json_str_ptr.get(); *size = length; if (length == 0) { *eof = true; } } _bytes_read_counter += *size; if (*eof) { return Status::OK(); } memcpy(_simdjson_ondemand_padding_buffer, json_str, *size); auto error = _json_parser ->iterate(std::string_view(reinterpret_cast( _simdjson_ondemand_padding_buffer), *size), _padded_size) .get(_original_json_doc); if (error != simdjson::error_code::SUCCESS) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", error, simdjson::error_message(error)); RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return std::string((char*)json_str, *size); }, [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); _counter->num_rows_filtered++; if (*_scanner_eof) { // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means // we meet enough invalid rows and the scanner should be stopped. // So we set eof to true and return OK, the caller will stop the process as we meet the end of file. *eof = true; return Status::OK(); } return Status::DataQualityError(fmt::to_string(error_msg)); } // set json root if (_parsed_json_root.size() != 0) { auto real_doc = _original_json_doc.at_pointer( std::string_view {_parsed_json_root.data(), _parsed_json_root.size()}); if (real_doc.error() != simdjson::error_code::SUCCESS) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON Root not found."); RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return std::string(simdjson::to_json_string(_original_json_doc).value()); }, [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); _counter->num_rows_filtered++; if (*_scanner_eof) { // Same as Case A *eof = true; return Status::OK(); } return Status::DataQualityError(fmt::to_string(error_msg)); } RETURN_IF_SIMDJSON_ERROR(real_doc.get(_json_value), "", nullptr); } else { RETURN_IF_SIMDJSON_ERROR(_original_json_doc.get(_json_value), "", nullptr); } if (_json_value.type() == simdjson::ondemand::json_type::array && !_strip_outer_array) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON data is array-object, `strip_outer_array` must be TRUE."); RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return std::string(simdjson::to_json_string(_json_value).value()); }, [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); _counter->num_rows_filtered++; if (*_scanner_eof) { // Same as Case A *eof = true; return Status::OK(); } return Status::DataQualityError(fmt::to_string(error_msg)); } if (_json_value.type() != simdjson::ondemand::json_type::array && _strip_outer_array) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "{}", "JSON data is not an array-object, `strip_outer_array` must be FALSE."); RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return std::string(simdjson::to_json_string(_json_value).value()); }, [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); _counter->num_rows_filtered++; if (*_scanner_eof) { // Same as Case A *eof = true; return Status::OK(); } return Status::DataQualityError(fmt::to_string(error_msg)); } return Status::OK(); } Status VSIMDJsonReader::_append_error_msg(std::string error_msg, std::string col_name, bool* valid) { std::string err_msg; if (!col_name.empty()) { fmt::memory_buffer error_buf; fmt::format_to(error_buf, error_msg, col_name); err_msg = fmt::to_string(error_buf); } else { err_msg = error_msg; } RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return std::string(simdjson::to_json_string(_original_json_doc).value()); }, [&]() -> std::string { return err_msg; }, _scanner_eof)); _counter->num_rows_filtered++; if (valid != nullptr) { // current row is invalid *valid = false; } return Status::OK(); } Status VSIMDJsonReader::_vhandle_simple_json(Block& block, const std::vector& slot_descs, bool* is_empty_row, bool* eof) { simdjson::ondemand::value objectValue; simdjson::ondemand::array array; do { bool valid = false; try { if (_next_line >= _total_lines) { // parse json and generic document Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { continue; // continue to read next } RETURN_IF_ERROR(st); if (*is_empty_row == true) { return Status::OK(); } if (_json_value.type() == simdjson::ondemand::json_type::array) { array = _json_value.get_array(); _array_iter = array.begin(); _total_lines = array.count_elements(); if (_total_lines == 0) { // may be passing an empty json, such as "[]" RETURN_IF_ERROR(_append_error_msg("Empty json line", "", nullptr)); if (*_scanner_eof) { *is_empty_row = true; return Status::OK(); } continue; } } else { _total_lines = 1; // only one row objectValue = _json_value; } _next_line = 0; } if (_json_value.type() == simdjson::ondemand::json_type::array) { // handle case 1 objectValue = *_array_iter; RETURN_IF_ERROR(_set_column_value(objectValue, block, slot_descs, &valid)); ++_array_iter; } else { // handle case 2 RETURN_IF_ERROR(_set_column_value(_json_value, block, slot_descs, &valid)); } _next_line++; if (!valid) { if (*_scanner_eof) { // When _scanner_eof is true and valid is false, it means that we have encountered // unqualified data and decided to stop the scan. *is_empty_row = true; return Status::OK(); } continue; } *is_empty_row = false; break; // get a valid row, then break } catch (simdjson::simdjson_error& e) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Parse json data for array failed. code: {}, error info: {}", e.error(), e.what()); RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return ""; }, [&]() -> std::string { return fmt::to_string(error_msg); }, eof)); _counter->num_rows_filtered++; RETURN_IF_ERROR(_append_error_msg("Empty json line", "", nullptr)); if (!valid) { if (*_scanner_eof) { // When _scanner_eof is true and valid is false, it means that we have encountered // unqualified data and decided to stop the scan. *is_empty_row = true; return Status::OK(); } continue; } continue; } } while (_next_line <= _total_lines); return Status::OK(); } Status VSIMDJsonReader::_vhandle_flat_array_complex_json( Block& block, const std::vector& slot_descs, bool* is_empty_row, bool* eof) { do { try { if (_next_line >= _total_lines) { Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { continue; // continue to read next } RETURN_IF_ERROR(st); if (*is_empty_row == true) { if (st == Status::OK()) { return Status::OK(); } if (_total_lines == 0) { continue; } } simdjson::ondemand::array array; RETURN_IF_SIMDJSON_ERROR(_json_value.get(array), "", nullptr); _array_iter = array.begin(); } bool valid = true; RETURN_IF_ERROR(_write_columns_by_jsonpath(*_array_iter, slot_descs, block, &valid)); ++_array_iter; ++_next_line; if (!valid) { continue; // process next line } *is_empty_row = false; break; // get a valid row, then break } catch (simdjson::simdjson_error& e) { RETURN_IF_SIMDJSON_ERROR(e.error(), "", nullptr); } } while (_next_line <= _total_lines); return Status::OK(); } Status VSIMDJsonReader::_vhandle_nested_complex_json(Block& block, const std::vector& slot_descs, bool* is_empty_row, bool* eof) { while (true) { try { Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { continue; // continue to read next } RETURN_IF_ERROR(st); if (*is_empty_row == true) { return Status::OK(); } *is_empty_row = false; break; // read a valid row } catch (simdjson::simdjson_error& e) { RETURN_IF_SIMDJSON_ERROR(e.error(), "", nullptr); } } bool valid = true; RETURN_IF_ERROR(_write_columns_by_jsonpath(_json_value, slot_descs, block, &valid)); if (!valid) { // there is only one line in this case, so if it return false, just set is_empty_row true // so that the caller will continue reading next line. *is_empty_row = true; } return Status::OK(); } // convert `["$.k1[0]", "$.k2.a"]` -> ["/k1/0", "/k2/a"] static std::optional convert_to_simdjson_path( const std::vector& parsed_paths) { std::stringstream read_path; bool is_valid = true; std::for_each(parsed_paths.begin() + 1, parsed_paths.end(), [&read_path, &is_valid](const auto& path) { if (is_valid) { read_path << path.to_simdjson_pointer(&is_valid); } }); if (!is_valid) { return {}; } return read_path.str(); } Status VSIMDJsonReader::_parse_jsonpath_and_json_root(const std::string& jsonpath, const std::string& json_root) { // parse jsonpath if (!jsonpath.empty()) { RETURN_IF_ERROR(_generate_json_paths(jsonpath, &_parsed_jsonpaths)); } if (!json_root.empty()) { std::vector parsed_json_root; JsonFunctions::parse_json_paths(json_root, &parsed_json_root); auto json_root_path = convert_to_simdjson_path(parsed_json_root); if (!json_root_path) { return Status::InvalidArgument("Invalid json root: " + json_root); } _parsed_json_root = json_root_path.value(); } return Status::OK(); } Status VSIMDJsonReader::_generate_json_paths(const std::string& jsonpath, std::vector* vect) { memcpy(_simdjson_ondemand_padding_buffer, jsonpath.data(), jsonpath.size()); simdjson::ondemand::document path_doc; auto error = _json_parser ->iterate(std::string_view(reinterpret_cast( _simdjson_ondemand_padding_buffer), jsonpath.size()), _padded_size) .get(path_doc); if (error || path_doc.type() != simdjson::ondemand::json_type::array) { return Status::InvalidArgument("Invalid json path: " + jsonpath); } for (auto item : path_doc) { if (item.type() != simdjson::ondemand::json_type::string) { return Status::InvalidArgument("Invalid json path: " + jsonpath); } std::vector parsed_paths; JsonFunctions::parse_json_paths(std::string(item.get_string().value()), &parsed_paths); auto simdjson_path = convert_to_simdjson_path(parsed_paths); if (!simdjson_path) { return Status::InvalidArgument("Invalid json path: " + jsonpath); } vect->push_back(simdjson_path.value()); } return Status::OK(); } Status VSIMDJsonReader::_write_columns_by_jsonpath(simdjson::ondemand::value value, const std::vector& slot_descs, Block& block, bool* valid) { size_t column_num = slot_descs.size(); auto object_value = value.get_object(); bool has_valid_value = false; size_t cur_row_count = block.rows(); for (size_t i = 0; i < column_num; i++) { auto* column_ptr = block.get_by_position(i).column->assume_mutable().get(); simdjson::simdjson_result json_value; if (i < _parsed_jsonpaths.size()) { json_value = object_value.at_pointer( std::string_view {_parsed_jsonpaths[i].data(), _parsed_jsonpaths[i].size()}); } if (i >= _parsed_jsonpaths.size() || json_value.error() != simdjson::error_code::SUCCESS) { // not match in jsondata. if (!slot_descs[i]->is_nullable()) { RETURN_IF_ERROR(_append_error_msg( "The column `{}` is not nullable, but it's not found in jsondata.", slot_descs[i]->col_name(), valid)); return Status::OK(); } } else { RETURN_IF_ERROR( _write_data_to_column(json_value.value(), slot_descs[i], column_ptr, valid)); if (!(*valid)) { return Status::OK(); } has_valid_value = true; } object_value.reset(); } if (!has_valid_value) { RETURN_IF_ERROR(_append_error_msg("All fields is null, this is a invalid row.", "", valid)); return Status::OK(); } // fill missing slot for (const auto& column_type_name : block) { auto column = column_type_name.column; if (column->size() < cur_row_count + 1) { DCHECK(column->size() == cur_row_count); column->assume_mutable()->insert_default(); } DCHECK(column->size() == cur_row_count + 1); } return Status::OK(); } template class VJsonScanner; template class VJsonScanner; } // namespace doris::vectorized