diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 6143a9e19c..ca6cd5622c 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -134,16 +134,20 @@ Status JsonScanner::open_next_reader() { } } + std::string json_root = ""; std::string jsonpath = ""; bool strip_outer_array = false; if (range.__isset.jsonpaths) { jsonpath = range.jsonpaths; } + if (range.__isset.json_root) { + json_root = range.json_root; + } if (range.__isset.strip_outer_array) { strip_outer_array = range.strip_outer_array; } - _cur_file_reader = new JsonReader(_state, _counter, _profile, file, jsonpath, strip_outer_array); - RETURN_IF_ERROR(_cur_file_reader->init()); + _cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array); + RETURN_IF_ERROR(_cur_file_reader->init(jsonpath, json_root)); return Status::OK(); } @@ -178,9 +182,8 @@ JsonReader::JsonReader( RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, FileReader* file_reader, - std::string& jsonpath, bool strip_outer_array) : - _jsonpath(jsonpath), + _handle_json_callback(nullptr), _next_line(0), _total_lines(0), _state(state), @@ -188,7 +191,8 @@ JsonReader::JsonReader( _profile(profile), _file_reader(file_reader), _closed(false), - _strip_outer_array(strip_outer_array) { + _strip_outer_array(strip_outer_array), + _json_doc(nullptr) { _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); _read_timer = ADD_TIMER(_profile, "FileReadTime"); } @@ -197,32 +201,52 @@ JsonReader::~JsonReader() { _close(); } -Status JsonReader::init() { +Status JsonReader::init(const std::string& jsonpath, const std::string& json_root) { // parse jsonpath - rapidjson::Document jsonpaths_doc; - if (!_jsonpath.empty()) { - if (!jsonpaths_doc.Parse(_jsonpath.c_str()).HasParseError()) { - if (!jsonpaths_doc.IsArray()) { - return Status::InvalidArgument("Invalid json path: " + _jsonpath); - } else { - for (int i = 0; i < jsonpaths_doc.Size(); i++) { - const rapidjson::Value& path = jsonpaths_doc[i]; - if (!path.IsString()) { - return Status::InvalidArgument("Invalid json path: " + _jsonpath); - } - path.GetString(); - std::vector parsed_paths; - JsonFunctions::parse_json_paths(path.GetString(), &parsed_paths); - _parsed_jsonpaths.push_back(parsed_paths); - } - } + if (!jsonpath.empty()) { + Status st = _generate_json_paths(jsonpath, &_parsed_jsonpaths); + RETURN_IF_ERROR(st); + } + if (!json_root.empty()) { + JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); + } + + //improve performance + if (_parsed_jsonpaths.empty()) { // input is a simple json-string + _handle_json_callback = &JsonReader::_handle_simple_json; + } else { // input is a complex json-string and a json-path + if (_strip_outer_array) { + _handle_json_callback = &JsonReader::_handle_flat_array_complex_json; } else { - return Status::InvalidArgument("Invalid json path: " + _jsonpath); + _handle_json_callback = &JsonReader::_handle_nested_complex_json; } } return Status::OK(); } +Status JsonReader::_generate_json_paths(const std::string& jsonpath, std::vector>* vect) { + rapidjson::Document jsonpaths_doc; + if (!jsonpaths_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!jsonpaths_doc.IsArray()) { + return Status::InvalidArgument("Invalid json path: " + jsonpath); + } else { + for (int i = 0; i < jsonpaths_doc.Size(); i++) { + const rapidjson::Value& path = jsonpaths_doc[i]; + if (!path.IsString()) { + return Status::InvalidArgument("Invalid json path: " + jsonpath); + } + path.GetString(); + std::vector parsed_paths; + JsonFunctions::parse_json_paths(path.GetString(), &parsed_paths); + vect->push_back(parsed_paths); + } + return Status::OK(); + } + } else { + return Status::InvalidArgument("Invalid json path: " + jsonpath); + } +} + void JsonReader::_close() { if (_closed) { return; @@ -248,35 +272,47 @@ Status JsonReader::_parse_json_doc(bool* eof) { return Status::OK(); } // parse jsondata to JsonDoc - if (_json_doc.Parse((char*)json_str, length).HasParseError()) { + if (_origin_json_doc.Parse((char*)json_str, length).HasParseError()) { std::stringstream str_error; - str_error << "Parse json data for JsonDoc failed. code = " << _json_doc.GetParseError() - << ", error-info:" << rapidjson::GetParseError_En(_json_doc.GetParseError()); + str_error << "Parse json data for JsonDoc failed. code = " << _origin_json_doc.GetParseError() + << ", error-info:" << rapidjson::GetParseError_En(_origin_json_doc.GetParseError()); _state->append_error_msg_to_file(std::string((char*) json_str, length), str_error.str()); _counter->num_rows_filtered++; delete[] json_str; return Status::DataQualityError(str_error.str()); } + delete[] json_str; - if (_json_doc.IsArray() && !_strip_outer_array) { - delete[] json_str; + // set json root + if (_parsed_json_root.size() != 0) { + _json_doc = JsonFunctions::get_json_object_from_parsed_json(_parsed_json_root, &_origin_json_doc, _origin_json_doc.GetAllocator()); + if (_json_doc == nullptr) { + std::stringstream str_error; + str_error << "JSON Root not found."; + _state->append_error_msg_to_file(_print_json_value(_origin_json_doc), str_error.str()); + _counter->num_rows_filtered++; + return Status::DataQualityError(str_error.str()); + } + } else { + _json_doc = &_origin_json_doc; + } + + if (_json_doc->IsArray() && !_strip_outer_array) { std::stringstream str_error; str_error << "JSON data is array-object, `strip_outer_array` must be TRUE."; - _state->append_error_msg_to_file(_print_json_value(_json_doc), str_error.str()); + _state->append_error_msg_to_file(_print_json_value(_origin_json_doc), str_error.str()); _counter->num_rows_filtered++; return Status::DataQualityError(str_error.str()); } - if (!_json_doc.IsArray() && _strip_outer_array) { - delete[] json_str; + if (!_json_doc->IsArray() && _strip_outer_array) { std::stringstream str_error; str_error << "JSON data is not an array-object, `strip_outer_array` must be FALSE."; - _state->append_error_msg_to_file(_print_json_value(_json_doc), str_error.str()); + _state->append_error_msg_to_file(_print_json_value(_origin_json_doc), str_error.str()); _counter->num_rows_filtered++; return Status::DataQualityError(str_error.str()); } - delete[] json_str; return Status::OK(); } @@ -288,27 +324,6 @@ std::string JsonReader::_print_json_value(const rapidjson::Value& value) { return std::string(buffer.GetString()); } -void JsonReader::_assemble_jmap( - const std::vector& slot_descs) { - // iterator jsonpath to find object and save it to Map - _jmap.clear(); - - for (int i = 0; i < _parsed_jsonpaths.size(); i++) { - rapidjson::Value* json_value = JsonFunctions::get_json_array_from_parsed_json(_parsed_jsonpaths[i], &_json_doc, _json_doc.GetAllocator()); - if (json_value == nullptr) { - // null means failed to match, we also put it in _jmap, it will be handled later. - _jmap.emplace(slot_descs[i]->col_name(), nullptr); - } else { - if (json_value->Size() == 1) { - // see NOTICE1 - json_value = &((*json_value)[0]); - } - _jmap.emplace(slot_descs[i]->col_name(), json_value); - } - } - return; -} - std::string JsonReader::_print_jsonpath(const std::vector& path) { std::stringstream ss; for (auto& p : path) { @@ -421,7 +436,8 @@ void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, c * handle input a simple json. * A json is a simple json only when user not specifying the json path. * For example: - * case 1. [{"colunm1":"value1", "colunm2":10}, {"colunm1":"value2", "colunm2":30}] + * case 1. [{"colunm1":"value1", "colunm2":10}, {"colunm1":" +", "colunm2":30}] * case 2. {"colunm1":"value1", "colunm2":10} */ Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof) { @@ -436,19 +452,19 @@ Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vectorIsArray()) { + _total_lines = _json_doc->Size(); } else { _total_lines = 1; // only one row } _next_line = 0; } - if (_json_doc.IsArray()) { // handle case 1 - rapidjson::Value& objectValue = _json_doc[_next_line];// json object + if (_json_doc->IsArray()) { // handle case 1 + rapidjson::Value& objectValue = (*_json_doc)[_next_line];// json object _set_tuple_value(objectValue, tuple, slot_descs, tuple_pool, &valid); } else { // handle case 2 - _set_tuple_value(_json_doc, tuple, slot_descs, tuple_pool, &valid); + _set_tuple_value(*_json_doc, tuple, slot_descs, tuple_pool, &valid); } _next_line++; if (!valid) { @@ -459,64 +475,68 @@ Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool *valid) { - std::unordered_map::iterator it_map; - for (auto v : slot_descs) { - it_map = _jmap.find(v->col_name()); - if (it_map == _jmap.end()) { - return Status::RuntimeError("The column name of table is not foud in jsonpath: " + v->col_name()); - } - rapidjson::Value* value = it_map->second.get_value(); - if (value == nullptr) { - if (v->is_nullable()) { - tuple->set_null(v->null_indicator_offset()); +bool JsonReader::_write_values_by_jsonpath(rapidjson::Value& objectValue, MemPool* tuple_pool, Tuple* tuple, const std::vector& slot_descs) { + int nullcount = 0; + bool valid = true; + size_t column_num = std::min(slot_descs.size(), _parsed_jsonpaths.size()); + + for (size_t i = 0; i < column_num; i++) { + rapidjson::Value* json_values = JsonFunctions::get_json_array_from_parsed_json(_parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator()); + if (json_values == nullptr) { + // not match in jsondata. + if (slot_descs[i]->is_nullable()) { + tuple->set_null(slot_descs[i]->null_indicator_offset()); + nullcount++; } else { std::stringstream str_error; - str_error << "The column `" << it_map->first << "` is not nullable, but it's not found in jsondata."; - _state->append_error_msg_to_file(_print_json_value(*value), str_error.str()); + str_error << "The column `" << slot_descs[i]->col_name() << "` is not nullable, but it's not found in jsondata."; + _state->append_error_msg_to_file(_print_json_value(objectValue), str_error.str()); _counter->num_rows_filtered++; - *valid = false; // current row is invalid + valid = false; // current row is invalid break; } } else { - _write_data_to_tuple(value, v, tuple, tuple_pool, valid); - if (!(*valid)) { - return Status::OK(); + CHECK(json_values->IsArray()); + CHECK(json_values->Size() >= 1); + if (json_values->Size() == 1) { + // NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array. + // so here we unwrap the array to get the real element. + // if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap. + json_values = &((*json_values)[0]); + } + _write_data_to_tuple(json_values, slot_descs[i], tuple, tuple_pool, &valid); + if (!valid) { + break; } } } - *valid = true; - return Status::OK(); + if (nullcount == column_num) { + _state->append_error_msg_to_file(_print_json_value(objectValue), "All fields is null or not matched, this is a invalid row."); + _counter->num_rows_filtered++; + } + return valid; } -// _json_doc should be an object +/** + * for example: + * { + * "data": {"a":"a1", "b":"b1", "c":"c1"} + * } + * In this scene, generate only one row + */ Status JsonReader::_handle_nested_complex_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof) { - do { - bool valid = false; - if (_next_line >= _total_lines) { - Status st = _parse_json_doc(eof); - if (st.is_data_quality_error()) { - continue; // continue to read next - } - RETURN_IF_ERROR(st); // terminate if encounter other errors - if (*eof) { - return Status::OK(); - } - _assemble_jmap(slot_descs); - // for json with strip_outer_array is false, we treat it as a single row. - // so _total_lines is always 1. - _total_lines = 1; - _next_line = 0; + while(true) { + Status st = _parse_json_doc(eof); + if (st.is_data_quality_error()) { + continue; // continue to read next } - - RETURN_IF_ERROR(_set_tuple_value_from_jmap(tuple, slot_descs, tuple_pool, &valid)); - _next_line++; - if (!valid) { // read a invalid row, then read next one - continue; + RETURN_IF_ERROR(st); + if (*eof) { + return Status::OK();// read over,then return } - break; // read a valid row, then break - } while (_next_line <= _total_lines); + break; //read a valid row + } + _write_values_by_jsonpath(*_json_doc, tuple_pool, tuple, slot_descs); return Status::OK(); } @@ -541,73 +561,20 @@ Status JsonReader::_handle_flat_array_complex_json(Tuple* tuple, const std::vect if (*eof) {// read all data, then return return Status::OK(); } - _total_lines = _json_doc.Size(); + _total_lines = _json_doc->Size(); _next_line = 0; } - int nullcount = 0; - bool valid = true; - size_t column_num = std::min(slot_descs.size(), _parsed_jsonpaths.size()); - rapidjson::Value& objectValue = _json_doc[_next_line]; - - for (size_t i = 0; i < column_num; i++) { - rapidjson::Value* json_values = JsonFunctions::get_json_array_from_parsed_json(_parsed_jsonpaths[i], &objectValue, _json_doc.GetAllocator()); - if (json_values == nullptr) { - // not match in jsondata. - if (slot_descs[i]->is_nullable()) { - tuple->set_null(slot_descs[i]->null_indicator_offset()); - nullcount++; - } else { - std::stringstream str_error; - str_error << "The column `" << slot_descs[i]->col_name() << "` is not nullable, but it's not found in jsondata."; - _state->append_error_msg_to_file(_print_json_value(objectValue), str_error.str()); - _counter->num_rows_filtered++; - valid = false; // current row is invalid - break; - } - } else { - CHECK(json_values->IsArray()); - CHECK(json_values->Size() >= 1); - if (json_values->Size() == 1) { - // NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array. - // so here we unwrap the array to get the real element. - // if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap. - json_values = &((*json_values)[0]); - } - _write_data_to_tuple(json_values, slot_descs[i], tuple, tuple_pool, &valid); - if (!valid) { - break; - } - } - } - _next_line++; - if (!valid) { - continue; - } - if (nullcount == column_num) { - _state->append_error_msg_to_file(_print_json_value(objectValue), "All fields is null or not matched, this is a invalid row."); - _counter->num_rows_filtered++; - continue; + rapidjson::Value& objectValue = (*_json_doc)[_next_line++]; + if (!_write_values_by_jsonpath(objectValue, tuple_pool, tuple, slot_descs)) { + continue; // process next line } break; // get a valid row, then break } while (_next_line <= _total_lines); return Status::OK(); } -// handle json with specified json path -Status JsonReader::_handle_complex_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof) { - if (_strip_outer_array) { - return _handle_flat_array_complex_json(tuple, slot_descs, tuple_pool, eof); - } else { - return _handle_nested_complex_json(tuple, slot_descs, tuple_pool, eof); - } -} - Status JsonReader::read(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof) { - if (_parsed_jsonpaths.empty()) { // input is a simple json-string - return _handle_simple_json(tuple, slot_descs, tuple_pool, eof); - } else { // input is a complex json-string and a json-path - return _handle_complex_json(tuple, slot_descs, tuple_pool, eof); - } + return (this->*_handle_json_callback)(tuple, slot_descs, tuple_pool, eof); } diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index fcb1186f31..6dae93159d 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -90,7 +90,6 @@ public: JsonDataInternal(rapidjson::Value* v); ~JsonDataInternal() {} rapidjson::Value::ConstValueIterator get_next(); - rapidjson::Value* get_value() { return _json_values; } bool is_null() const { return _json_values == nullptr; } private: @@ -106,33 +105,31 @@ struct JsonPath; class JsonReader { public: JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, FileReader* file_reader, - std::string& jsonpath, bool strip_outer_array); + bool strip_outer_array); ~JsonReader(); - Status init(); // must call before use + Status init(const std::string& jsonpath, const std::string& json_root); // must call before use Status read(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof); private: + Status (JsonReader::*_handle_json_callback)(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof); Status _handle_simple_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof); - Status _handle_complex_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof); Status _handle_flat_array_complex_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof); Status _handle_nested_complex_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof); void _fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); - void _assemble_jmap(const std::vector& slot_descs); Status _parse_json_doc(bool* eof); void _set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool *valid); - Status _set_tuple_value_from_jmap(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool *valid); void _write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool, bool* valid); + bool _write_values_by_jsonpath(rapidjson::Value& objectValue, MemPool* tuple_pool, Tuple* tuple, const std::vector& slot_descs); std::string _print_json_value(const rapidjson::Value& value); std::string _print_jsonpath(const std::vector& path); void _close(); + Status _generate_json_paths(const std::string& jsonpath, std::vector>* vect); private: - // origin json path - std::string _jsonpath; int _next_line; int _total_lines; RuntimeState* _state; @@ -143,10 +140,12 @@ private: bool _strip_outer_array; RuntimeProfile::Counter* _bytes_read_counter; RuntimeProfile::Counter* _read_timer; + std::vector> _parsed_jsonpaths; - rapidjson::Document _json_doc; - //key: column name - std::unordered_map _jmap; + std::vector _parsed_json_root; + + rapidjson::Document _origin_json_doc; // origin json document object from parsed json string + rapidjson::Value *_json_doc; // _json_doc equals _final_json_doc iff not set `json_root` }; diff --git a/be/src/exprs/json_functions.cpp b/be/src/exprs/json_functions.cpp index df2510d9e7..8deead4cbe 100644 --- a/be/src/exprs/json_functions.cpp +++ b/be/src/exprs/json_functions.cpp @@ -299,6 +299,23 @@ rapidjson::Value* JsonFunctions::get_json_array_from_parsed_json( return root; } + +rapidjson::Value* JsonFunctions::get_json_object_from_parsed_json( + const std::vector& parsed_paths, + rapidjson::Value* document, + rapidjson::Document::AllocatorType& mem_allocator) { + + if (!parsed_paths[0].is_valid) { + return nullptr; + } + + rapidjson::Value* root = match_value(parsed_paths, document, mem_allocator, true); + if (root == nullptr || root == document) { // not found + return nullptr; + } + return root; +} + void JsonFunctions::json_path_prepare( doris_udf::FunctionContext* context, doris_udf::FunctionContext::FunctionStateScope scope) { diff --git a/be/src/exprs/json_functions.h b/be/src/exprs/json_functions.h index d879e32a5f..657a9c75f3 100644 --- a/be/src/exprs/json_functions.h +++ b/be/src/exprs/json_functions.h @@ -95,6 +95,11 @@ public: rapidjson::Value* document, rapidjson::Document::AllocatorType& mem_allocator); + static rapidjson::Value* get_json_object_from_parsed_json( + const std::vector& parsed_paths, + rapidjson::Value* document, + rapidjson::Document::AllocatorType& mem_allocator); + static void json_path_prepare( doris_udf::FunctionContext*, doris_udf::FunctionContext::FunctionStateScope); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 6e9a616bd8..52e16a5ebc 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -371,11 +371,14 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* return Status::InvalidArgument("Invalid mem limit format"); } } - if (!http_req->header(HTTP_EXEC_JSONPATHS).empty()) { - request.__set_jsonpaths(http_req->header(HTTP_EXEC_JSONPATHS)); + if (!http_req->header(HTTP_JSONPATHS).empty()) { + request.__set_jsonpaths(http_req->header(HTTP_JSONPATHS)); } - if (!http_req->header(HTTP_EXEC_STRIP_OUTER_ARRAY).empty()) { - if (boost::iequals(http_req->header(HTTP_EXEC_STRIP_OUTER_ARRAY), "true")) { + if (!http_req->header(HTTP_JSONROOT).empty()) { + request.__set_json_root(http_req->header(HTTP_JSONROOT)); + } + if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) { + if (boost::iequals(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) { request.__set_strip_outer_array(true); } else { request.__set_strip_outer_array(false); diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index b1001f9eec..984aa102a3 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -36,8 +36,9 @@ static const std::string HTTP_NEGATIVE = "negative"; static const std::string HTTP_STRICT_MODE = "strict_mode"; static const std::string HTTP_TIMEZONE = "timezone"; static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit"; -static const std::string HTTP_EXEC_JSONPATHS = "jsonpaths"; -static const std::string HTTP_EXEC_STRIP_OUTER_ARRAY = "strip_outer_array"; +static const std::string HTTP_JSONPATHS = "jsonpaths"; +static const std::string HTTP_JSONROOT = "json_root"; +static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array"; static const std::string HTTP_100_CONTINUE = "100-continue"; diff --git a/be/test/exec/json_scanner_test.cpp b/be/test/exec/json_scanner_test.cpp index 1deaca2b26..c49282fc0e 100644 --- a/be/test/exec/json_scanner_test.cpp +++ b/be/test/exec/json_scanner_test.cpp @@ -401,6 +401,8 @@ TEST_F(JsonSannerTest, normal_simple_arrayjson) { range.start_offset = 0; range.size = -1; range.format_type = TFileFormatType::FORMAT_JSON; + range.strip_outer_array = true; + range.__isset.strip_outer_array = true; range.splittable = true; range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; range.file_type = TFileType::FILE_LOCAL; diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 2faf51c7f7..9772f5b4f5 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -178,6 +178,9 @@ FROM data_source 8. `strip_outer_array` Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false. + 9. `json_root` + json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "". + 5. data_source The type of data source. Current support: @@ -461,6 +464,36 @@ FROM data_source 1)If the json data starts as an array and each object in the array is a record, you need to set the strip_outer_array to true to represent the flat array. 2)If the json data starts with an array, and each object in the array is a record, our ROOT node is actually an object in the array when we set jsonpath. +6. User specifies the json_root node + CREATE ROUTINE LOAD example_db.test1 ON example_tbl + COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200", + "strict_mode" = "false", + "format" = "json", + "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", + "strip_outer_array" = "true", + "json_root" = "$.RECORDS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "kafka_partitions" = "0,1,2", + "kafka_offsets" = "0,0,0" + ); + For example json data: + { + "RECORDS":[ + {"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587}, + {"category":"22","author":"2avc","price":895,"timestamp":1589191487}, + {"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387} + ] + } ## keyword CREATE, ROUTINE, LOAD diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 17c88a4e4d..03ed5db392 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -120,6 +120,9 @@ There are two ways to import json: simple mode and matched mode. If jsonpath is `strip_outer_array` Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false. +`json_root` +json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "". + RETURN VALUES After the load is completed, the related content of this load will be returned in Json format. Current field included @@ -213,7 +216,7 @@ Where url is the url given by ErrorURL. {"category":"Linux","author":"avc","title":"Linux kernel","price":195} ] 11. Matched load json by jsonpaths - json data: + For example json data: [ {"category":"xuxb111","author":"1avc","title":"SayingsoftheCentury","price":895}, {"category":"xuxb222","author":"2avc","title":"SayingsoftheCentury","price":895}, @@ -225,6 +228,18 @@ Where url is the url given by ErrorURL. 1)If the json data starts as an array and each object in the array is a record, you need to set the strip_outer_array to true to represent the flat array. 2)If the json data starts with an array, and each object in the array is a record, our ROOT node is actually an object in the array when we set jsonpath. +12. User specifies the json_root node + For example json data: + { + "RECORDS":[ + {"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587}, + {"category":"22","author":"2avc","price":895,"timestamp":1589191487}, + {"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387} + ] + } + Matched imports are made by specifying jsonpath parameter, such as `category`, `author`, and `price`, for example: + curl --location-trusted -u root -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/testDb/testTbl/_stream_load + ## keyword STREAM, LOAD diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 3a870e57b6..a84ab43ffc 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -158,6 +158,10 @@ under the License. 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。 + 9. json_root + + json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。 + 5. data_source 数据源的类型。当前支持: @@ -398,6 +402,36 @@ under the License. 1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。 2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。 + 6. 用户指定根节点json_root + CREATE ROUTINE LOAD example_db.test1 ON example_tbl + COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200", + "strict_mode" = "false", + "format" = "json", + "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", + "strip_outer_array" = "true", + "json_root" = "$.RECORDS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "kafka_partitions" = "0,1,2", + "kafka_offsets" = "0,0,0" + ); + json数据格式: + { + "RECORDS":[ + {"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587}, + {"category":"22","author":"2avc","price":895,"timestamp":1589191487}, + {"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387} + ] + } ## keyword CREATE,ROUTINE,LOAD diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 68f895f5e6..b95912d288 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -89,6 +89,8 @@ under the License. ] 当strip_outer_array为true,最后导入到doris中会生成两行数据。 + json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。 + RETURN VALUES 导入完成后,会以Json格式返回这次导入的相关内容。当前包括一下字段 Status: 导入最后的状态。 @@ -172,6 +174,18 @@ under the License. 1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。 2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。 + 12. 用户指定json根节点 + json数据格式: + { + "RECORDS":[ + {"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587}, + {"category":"22","author":"2avc","price":895,"timestamp":1589191487}, + {"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387} + ] + } + 通过指定jsonpath进行精准导入,例如只导入category、author、price三个属性 + curl --location-trusted -u root -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/testDb/testTbl/_stream_load + ## keyword STREAM,LOAD diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 6fe4d87da7..6861cafdc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -92,6 +92,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String FORMAT = "format";// the value is csv or json, default is csv public static final String STRIP_OUTER_ARRAY = "strip_outer_array"; public static final String JSONPATHS = "jsonpaths"; + public static final String JSONROOT = "json_root"; // kafka type properties public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list"; @@ -113,6 +114,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(FORMAT) .add(JSONPATHS) .add(STRIP_OUTER_ARRAY) + .add(JSONROOT) .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) .build(); @@ -149,11 +151,11 @@ public class CreateRoutineLoadStmt extends DdlStmt { * 1) dataFormat = "json" * 2) jsonPaths = "$.XXX.xxx" */ - private String format = ""; //default is csv. - private String jsonPaths = ""; + private String format = ""; //default is csv. + private String jsonPaths = ""; + private String jsonRoot = ""; // MUST be a jsonpath string private boolean stripOuterArray = false; - // kafka related properties private String kafkaBrokerList; private String kafkaTopic; @@ -240,6 +242,10 @@ public class CreateRoutineLoadStmt extends DdlStmt { return jsonPaths; } + public String getJsonRoot() { + return jsonRoot; + } + public String getKafkaBrokerList() { return kafkaBrokerList; } @@ -364,6 +370,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { } else if (format.equalsIgnoreCase("json")) { format = "json"; jsonPaths = jobProperties.get(JSONPATHS); + jsonRoot = jobProperties.get(JSONROOT); stripOuterArray = Boolean.valueOf(jobProperties.get(STRIP_OUTER_ARRAY)); } else { throw new UserException("Format type is invalid. format=`" + format + "`"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 5e236c8808..feaf50f171 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -61,7 +61,6 @@ import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -70,7 +69,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -184,6 +182,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl private static final String PROPS_FORMAT = "format"; private static final String PROPS_STRIP_OUTER_ARRAY = "strip_outer_array"; private static final String PROPS_JSONPATHS = "jsonpaths"; + private static final String PROPS_JSONROOT = "json_root"; protected int currentTaskConcurrentNum; protected RoutineLoadProgress progress; @@ -285,6 +284,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl jobProperties.put(PROPS_FORMAT, "csv"); jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false"); jobProperties.put(PROPS_JSONPATHS, ""); + jobProperties.put(PROPS_JSONROOT, ""); } else if (stmt.getFormat().equals("json")) { jobProperties.put(PROPS_FORMAT, "json"); if (!Strings.isNullOrEmpty(stmt.getJsonPaths())) { @@ -292,6 +292,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } else { jobProperties.put(PROPS_JSONPATHS, ""); } + if (!Strings.isNullOrEmpty(stmt.getJsonRoot())) { + jobProperties.put(PROPS_JSONROOT, stmt.getJsonRoot()); + } else { + jobProperties.put(PROPS_JSONROOT, ""); + } if (stmt.isStripOuterArray()) { jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "true"); } else { @@ -472,6 +477,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return value; } + public String getJsonRoot() { + String value = jobProperties.get(PROPS_JSONROOT); + if (value == null) { + return ""; + } + return value; + } + public int getSizeOfRoutineLoadTaskInfoList() { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 2fa49190b0..5725af2a7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -103,6 +103,9 @@ public class StreamLoadScanNode extends LoadScanNode { if (!streamLoadTask.getJsonPaths().isEmpty()) { rangeDesc.setJsonpaths(streamLoadTask.getJsonPaths()); } + if (!streamLoadTask.getJsonRoot().isEmpty()) { + rangeDesc.setJson_root(streamLoadTask.getJsonRoot()); + } rangeDesc.setStrip_outer_array(streamLoadTask.isStripOuterArray()); } rangeDesc.splittable = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index b09b3478fe..d5ff24e137 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -53,6 +53,7 @@ public class StreamLoadTask { private TFileFormatType formatType; private boolean stripOuterArray; private String jsonPaths; + private String jsonRoot; // optional private List columnExprDescs = Lists.newArrayList(); @@ -72,6 +73,7 @@ public class StreamLoadTask { this.fileType = fileType; this.formatType = formatType; this.jsonPaths = ""; + this.jsonRoot = ""; this.stripOuterArray = false; } @@ -143,6 +145,14 @@ public class StreamLoadTask { this.jsonPaths = jsonPaths; } + public String getJsonRoot() { + return jsonRoot; + } + + public void setJsonRoot(String jsonRoot) { + this.jsonRoot = jsonRoot; + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request, Database db) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType()); @@ -194,6 +204,9 @@ public class StreamLoadTask { if (request.getJsonpaths() != null) { jsonPaths = request.getJsonpaths(); } + if (request.getJson_root() != null) { + jsonRoot = request.getJson_root(); + } stripOuterArray = request.isStrip_outer_array(); } } @@ -225,6 +238,9 @@ public class StreamLoadTask { if (!routineLoadJob.getJsonPaths().isEmpty()) { jsonPaths = routineLoadJob.getJsonPaths(); } + if (!routineLoadJob.getJsonRoot().isEmpty()) { + jsonRoot = routineLoadJob.getJsonRoot(); + } stripOuterArray = routineLoadJob.isStripOuterArray(); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 28ccb28cf2..dd5b6ec892 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -551,6 +551,7 @@ struct TStreamLoadPutRequest { 23: optional bool strip_outer_array 24: optional string jsonpaths 25: optional i64 thrift_rpc_timeout_ms + 26: optional string json_root } struct TStreamLoadPutResult { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index eb64042c8b..3ebff28f1e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -129,6 +129,7 @@ struct TBrokerRangeDesc { // it's usefull when format_type == FORMAT_JSON 11: optional bool strip_outer_array; 12: optional string jsonpaths; + 13: optional string json_root; } struct TBrokerScanRangeParams {