[Bug][Json] Refactor the json load logic to fix some bug

1. Add `json_root` for nest json data.
2. Remove `_jmap` to make the logic reasonable.
This commit is contained in:
worker24h
2020-07-30 10:36:34 +08:00
committed by GitHub
parent 594e53ec92
commit fdcc223ad2
17 changed files with 317 additions and 186 deletions

View File

@ -134,16 +134,20 @@ Status JsonScanner::open_next_reader() {
}
}
std::string json_root = "";
std::string jsonpath = "";
bool strip_outer_array = false;
if (range.__isset.jsonpaths) {
jsonpath = range.jsonpaths;
}
if (range.__isset.json_root) {
json_root = range.json_root;
}
if (range.__isset.strip_outer_array) {
strip_outer_array = range.strip_outer_array;
}
_cur_file_reader = new JsonReader(_state, _counter, _profile, file, jsonpath, strip_outer_array);
RETURN_IF_ERROR(_cur_file_reader->init());
_cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array);
RETURN_IF_ERROR(_cur_file_reader->init(jsonpath, json_root));
return Status::OK();
}
@ -178,9 +182,8 @@ JsonReader::JsonReader(
RuntimeState* state, ScannerCounter* counter,
RuntimeProfile* profile,
FileReader* file_reader,
std::string& jsonpath,
bool strip_outer_array) :
_jsonpath(jsonpath),
_handle_json_callback(nullptr),
_next_line(0),
_total_lines(0),
_state(state),
@ -188,7 +191,8 @@ JsonReader::JsonReader(
_profile(profile),
_file_reader(file_reader),
_closed(false),
_strip_outer_array(strip_outer_array) {
_strip_outer_array(strip_outer_array),
_json_doc(nullptr) {
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "FileReadTime");
}
@ -197,32 +201,52 @@ JsonReader::~JsonReader() {
_close();
}
Status JsonReader::init() {
Status JsonReader::init(const std::string& jsonpath, const std::string& json_root) {
// parse jsonpath
rapidjson::Document jsonpaths_doc;
if (!_jsonpath.empty()) {
if (!jsonpaths_doc.Parse(_jsonpath.c_str()).HasParseError()) {
if (!jsonpaths_doc.IsArray()) {
return Status::InvalidArgument("Invalid json path: " + _jsonpath);
} else {
for (int i = 0; i < jsonpaths_doc.Size(); i++) {
const rapidjson::Value& path = jsonpaths_doc[i];
if (!path.IsString()) {
return Status::InvalidArgument("Invalid json path: " + _jsonpath);
}
path.GetString();
std::vector<JsonPath> parsed_paths;
JsonFunctions::parse_json_paths(path.GetString(), &parsed_paths);
_parsed_jsonpaths.push_back(parsed_paths);
}
}
if (!jsonpath.empty()) {
Status st = _generate_json_paths(jsonpath, &_parsed_jsonpaths);
RETURN_IF_ERROR(st);
}
if (!json_root.empty()) {
JsonFunctions::parse_json_paths(json_root, &_parsed_json_root);
}
//improve performance
if (_parsed_jsonpaths.empty()) { // input is a simple json-string
_handle_json_callback = &JsonReader::_handle_simple_json;
} else { // input is a complex json-string and a json-path
if (_strip_outer_array) {
_handle_json_callback = &JsonReader::_handle_flat_array_complex_json;
} else {
return Status::InvalidArgument("Invalid json path: " + _jsonpath);
_handle_json_callback = &JsonReader::_handle_nested_complex_json;
}
}
return Status::OK();
}
Status JsonReader::_generate_json_paths(const std::string& jsonpath, std::vector<std::vector<JsonPath>>* vect) {
rapidjson::Document jsonpaths_doc;
if (!jsonpaths_doc.Parse(jsonpath.c_str()).HasParseError()) {
if (!jsonpaths_doc.IsArray()) {
return Status::InvalidArgument("Invalid json path: " + jsonpath);
} else {
for (int i = 0; i < jsonpaths_doc.Size(); i++) {
const rapidjson::Value& path = jsonpaths_doc[i];
if (!path.IsString()) {
return Status::InvalidArgument("Invalid json path: " + jsonpath);
}
path.GetString();
std::vector<JsonPath> parsed_paths;
JsonFunctions::parse_json_paths(path.GetString(), &parsed_paths);
vect->push_back(parsed_paths);
}
return Status::OK();
}
} else {
return Status::InvalidArgument("Invalid json path: " + jsonpath);
}
}
void JsonReader::_close() {
if (_closed) {
return;
@ -248,35 +272,47 @@ Status JsonReader::_parse_json_doc(bool* eof) {
return Status::OK();
}
// parse jsondata to JsonDoc
if (_json_doc.Parse((char*)json_str, length).HasParseError()) {
if (_origin_json_doc.Parse((char*)json_str, length).HasParseError()) {
std::stringstream str_error;
str_error << "Parse json data for JsonDoc failed. code = " << _json_doc.GetParseError()
<< ", error-info:" << rapidjson::GetParseError_En(_json_doc.GetParseError());
str_error << "Parse json data for JsonDoc failed. code = " << _origin_json_doc.GetParseError()
<< ", error-info:" << rapidjson::GetParseError_En(_origin_json_doc.GetParseError());
_state->append_error_msg_to_file(std::string((char*) json_str, length), str_error.str());
_counter->num_rows_filtered++;
delete[] json_str;
return Status::DataQualityError(str_error.str());
}
delete[] json_str;
if (_json_doc.IsArray() && !_strip_outer_array) {
delete[] json_str;
// set json root
if (_parsed_json_root.size() != 0) {
_json_doc = JsonFunctions::get_json_object_from_parsed_json(_parsed_json_root, &_origin_json_doc, _origin_json_doc.GetAllocator());
if (_json_doc == nullptr) {
std::stringstream str_error;
str_error << "JSON Root not found.";
_state->append_error_msg_to_file(_print_json_value(_origin_json_doc), str_error.str());
_counter->num_rows_filtered++;
return Status::DataQualityError(str_error.str());
}
} else {
_json_doc = &_origin_json_doc;
}
if (_json_doc->IsArray() && !_strip_outer_array) {
std::stringstream str_error;
str_error << "JSON data is array-object, `strip_outer_array` must be TRUE.";
_state->append_error_msg_to_file(_print_json_value(_json_doc), str_error.str());
_state->append_error_msg_to_file(_print_json_value(_origin_json_doc), str_error.str());
_counter->num_rows_filtered++;
return Status::DataQualityError(str_error.str());
}
if (!_json_doc.IsArray() && _strip_outer_array) {
delete[] json_str;
if (!_json_doc->IsArray() && _strip_outer_array) {
std::stringstream str_error;
str_error << "JSON data is not an array-object, `strip_outer_array` must be FALSE.";
_state->append_error_msg_to_file(_print_json_value(_json_doc), str_error.str());
_state->append_error_msg_to_file(_print_json_value(_origin_json_doc), str_error.str());
_counter->num_rows_filtered++;
return Status::DataQualityError(str_error.str());
}
delete[] json_str;
return Status::OK();
}
@ -288,27 +324,6 @@ std::string JsonReader::_print_json_value(const rapidjson::Value& value) {
return std::string(buffer.GetString());
}
void JsonReader::_assemble_jmap(
const std::vector<SlotDescriptor*>& slot_descs) {
// iterator jsonpath to find object and save it to Map
_jmap.clear();
for (int i = 0; i < _parsed_jsonpaths.size(); i++) {
rapidjson::Value* json_value = JsonFunctions::get_json_array_from_parsed_json(_parsed_jsonpaths[i], &_json_doc, _json_doc.GetAllocator());
if (json_value == nullptr) {
// null means failed to match, we also put it in _jmap, it will be handled later.
_jmap.emplace(slot_descs[i]->col_name(), nullptr);
} else {
if (json_value->Size() == 1) {
// see NOTICE1
json_value = &((*json_value)[0]);
}
_jmap.emplace(slot_descs[i]->col_name(), json_value);
}
}
return;
}
std::string JsonReader::_print_jsonpath(const std::vector<JsonPath>& path) {
std::stringstream ss;
for (auto& p : path) {
@ -421,7 +436,8 @@ void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, c
* handle input a simple json.
* A json is a simple json only when user not specifying the json path.
* For example:
* case 1. [{"colunm1":"value1", "colunm2":10}, {"colunm1":"value2", "colunm2":30}]
* case 1. [{"colunm1":"value1", "colunm2":10}, {"colunm1":"
", "colunm2":30}]
* case 2. {"colunm1":"value1", "colunm2":10}
*/
Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof) {
@ -436,19 +452,19 @@ Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector<SlotDescr
if (*eof) {// read all data, then return
return Status::OK();
}
if (_json_doc.IsArray() ) {
_total_lines = _json_doc.Size();
if (_json_doc->IsArray()) {
_total_lines = _json_doc->Size();
} else {
_total_lines = 1; // only one row
}
_next_line = 0;
}
if (_json_doc.IsArray()) { // handle case 1
rapidjson::Value& objectValue = _json_doc[_next_line];// json object
if (_json_doc->IsArray()) { // handle case 1
rapidjson::Value& objectValue = (*_json_doc)[_next_line];// json object
_set_tuple_value(objectValue, tuple, slot_descs, tuple_pool, &valid);
} else { // handle case 2
_set_tuple_value(_json_doc, tuple, slot_descs, tuple_pool, &valid);
_set_tuple_value(*_json_doc, tuple, slot_descs, tuple_pool, &valid);
}
_next_line++;
if (!valid) {
@ -459,64 +475,68 @@ Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector<SlotDescr
return Status::OK();
}
// for complex format json with strip_outer_array = false
Status JsonReader::_set_tuple_value_from_jmap(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid) {
std::unordered_map<std::string, JsonDataInternal>::iterator it_map;
for (auto v : slot_descs) {
it_map = _jmap.find(v->col_name());
if (it_map == _jmap.end()) {
return Status::RuntimeError("The column name of table is not foud in jsonpath: " + v->col_name());
}
rapidjson::Value* value = it_map->second.get_value();
if (value == nullptr) {
if (v->is_nullable()) {
tuple->set_null(v->null_indicator_offset());
bool JsonReader::_write_values_by_jsonpath(rapidjson::Value& objectValue, MemPool* tuple_pool, Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs) {
int nullcount = 0;
bool valid = true;
size_t column_num = std::min(slot_descs.size(), _parsed_jsonpaths.size());
for (size_t i = 0; i < column_num; i++) {
rapidjson::Value* json_values = JsonFunctions::get_json_array_from_parsed_json(_parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator());
if (json_values == nullptr) {
// not match in jsondata.
if (slot_descs[i]->is_nullable()) {
tuple->set_null(slot_descs[i]->null_indicator_offset());
nullcount++;
} else {
std::stringstream str_error;
str_error << "The column `" << it_map->first << "` is not nullable, but it's not found in jsondata.";
_state->append_error_msg_to_file(_print_json_value(*value), str_error.str());
str_error << "The column `" << slot_descs[i]->col_name() << "` is not nullable, but it's not found in jsondata.";
_state->append_error_msg_to_file(_print_json_value(objectValue), str_error.str());
_counter->num_rows_filtered++;
*valid = false; // current row is invalid
valid = false; // current row is invalid
break;
}
} else {
_write_data_to_tuple(value, v, tuple, tuple_pool, valid);
if (!(*valid)) {
return Status::OK();
CHECK(json_values->IsArray());
CHECK(json_values->Size() >= 1);
if (json_values->Size() == 1) {
// NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array.
// so here we unwrap the array to get the real element.
// if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap.
json_values = &((*json_values)[0]);
}
_write_data_to_tuple(json_values, slot_descs[i], tuple, tuple_pool, &valid);
if (!valid) {
break;
}
}
}
*valid = true;
return Status::OK();
if (nullcount == column_num) {
_state->append_error_msg_to_file(_print_json_value(objectValue), "All fields is null or not matched, this is a invalid row.");
_counter->num_rows_filtered++;
}
return valid;
}
// _json_doc should be an object
/**
* for example:
* {
* "data": {"a":"a1", "b":"b1", "c":"c1"}
* }
* In this scene, generate only one row
*/
Status JsonReader::_handle_nested_complex_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof) {
do {
bool valid = false;
if (_next_line >= _total_lines) {
Status st = _parse_json_doc(eof);
if (st.is_data_quality_error()) {
continue; // continue to read next
}
RETURN_IF_ERROR(st); // terminate if encounter other errors
if (*eof) {
return Status::OK();
}
_assemble_jmap(slot_descs);
// for json with strip_outer_array is false, we treat it as a single row.
// so _total_lines is always 1.
_total_lines = 1;
_next_line = 0;
while(true) {
Status st = _parse_json_doc(eof);
if (st.is_data_quality_error()) {
continue; // continue to read next
}
RETURN_IF_ERROR(_set_tuple_value_from_jmap(tuple, slot_descs, tuple_pool, &valid));
_next_line++;
if (!valid) { // read a invalid row, then read next one
continue;
RETURN_IF_ERROR(st);
if (*eof) {
return Status::OK();// read over,then return
}
break; // read a valid row, then break
} while (_next_line <= _total_lines);
break; //read a valid row
}
_write_values_by_jsonpath(*_json_doc, tuple_pool, tuple, slot_descs);
return Status::OK();
}
@ -541,73 +561,20 @@ Status JsonReader::_handle_flat_array_complex_json(Tuple* tuple, const std::vect
if (*eof) {// read all data, then return
return Status::OK();
}
_total_lines = _json_doc.Size();
_total_lines = _json_doc->Size();
_next_line = 0;
}
int nullcount = 0;
bool valid = true;
size_t column_num = std::min(slot_descs.size(), _parsed_jsonpaths.size());
rapidjson::Value& objectValue = _json_doc[_next_line];
for (size_t i = 0; i < column_num; i++) {
rapidjson::Value* json_values = JsonFunctions::get_json_array_from_parsed_json(_parsed_jsonpaths[i], &objectValue, _json_doc.GetAllocator());
if (json_values == nullptr) {
// not match in jsondata.
if (slot_descs[i]->is_nullable()) {
tuple->set_null(slot_descs[i]->null_indicator_offset());
nullcount++;
} else {
std::stringstream str_error;
str_error << "The column `" << slot_descs[i]->col_name() << "` is not nullable, but it's not found in jsondata.";
_state->append_error_msg_to_file(_print_json_value(objectValue), str_error.str());
_counter->num_rows_filtered++;
valid = false; // current row is invalid
break;
}
} else {
CHECK(json_values->IsArray());
CHECK(json_values->Size() >= 1);
if (json_values->Size() == 1) {
// NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array.
// so here we unwrap the array to get the real element.
// if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap.
json_values = &((*json_values)[0]);
}
_write_data_to_tuple(json_values, slot_descs[i], tuple, tuple_pool, &valid);
if (!valid) {
break;
}
}
}
_next_line++;
if (!valid) {
continue;
}
if (nullcount == column_num) {
_state->append_error_msg_to_file(_print_json_value(objectValue), "All fields is null or not matched, this is a invalid row.");
_counter->num_rows_filtered++;
continue;
rapidjson::Value& objectValue = (*_json_doc)[_next_line++];
if (!_write_values_by_jsonpath(objectValue, tuple_pool, tuple, slot_descs)) {
continue; // process next line
}
break; // get a valid row, then break
} while (_next_line <= _total_lines);
return Status::OK();
}
// handle json with specified json path
Status JsonReader::_handle_complex_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof) {
if (_strip_outer_array) {
return _handle_flat_array_complex_json(tuple, slot_descs, tuple_pool, eof);
} else {
return _handle_nested_complex_json(tuple, slot_descs, tuple_pool, eof);
}
}
Status JsonReader::read(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof) {
if (_parsed_jsonpaths.empty()) { // input is a simple json-string
return _handle_simple_json(tuple, slot_descs, tuple_pool, eof);
} else { // input is a complex json-string and a json-path
return _handle_complex_json(tuple, slot_descs, tuple_pool, eof);
}
return (this->*_handle_json_callback)(tuple, slot_descs, tuple_pool, eof);
}

View File

@ -90,7 +90,6 @@ public:
JsonDataInternal(rapidjson::Value* v);
~JsonDataInternal() {}
rapidjson::Value::ConstValueIterator get_next();
rapidjson::Value* get_value() { return _json_values; }
bool is_null() const { return _json_values == nullptr; }
private:
@ -106,33 +105,31 @@ struct JsonPath;
class JsonReader {
public:
JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, FileReader* file_reader,
std::string& jsonpath, bool strip_outer_array);
bool strip_outer_array);
~JsonReader();
Status init(); // must call before use
Status init(const std::string& jsonpath, const std::string& json_root); // must call before use
Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof);
private:
Status (JsonReader::*_handle_json_callback)(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof);
Status _handle_simple_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof);
Status _handle_complex_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof);
Status _handle_flat_array_complex_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof);
Status _handle_nested_complex_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof);
void _fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len);
void _assemble_jmap(const std::vector<SlotDescriptor*>& slot_descs);
Status _parse_json_doc(bool* eof);
void _set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid);
Status _set_tuple_value_from_jmap(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid);
void _write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool, bool* valid);
bool _write_values_by_jsonpath(rapidjson::Value& objectValue, MemPool* tuple_pool, Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs);
std::string _print_json_value(const rapidjson::Value& value);
std::string _print_jsonpath(const std::vector<JsonPath>& path);
void _close();
Status _generate_json_paths(const std::string& jsonpath, std::vector<std::vector<JsonPath>>* vect);
private:
// origin json path
std::string _jsonpath;
int _next_line;
int _total_lines;
RuntimeState* _state;
@ -143,10 +140,12 @@ private:
bool _strip_outer_array;
RuntimeProfile::Counter* _bytes_read_counter;
RuntimeProfile::Counter* _read_timer;
std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
rapidjson::Document _json_doc;
//key: column name
std::unordered_map<std::string, JsonDataInternal> _jmap;
std::vector<JsonPath> _parsed_json_root;
rapidjson::Document _origin_json_doc; // origin json document object from parsed json string
rapidjson::Value *_json_doc; // _json_doc equals _final_json_doc iff not set `json_root`
};

View File

@ -299,6 +299,23 @@ rapidjson::Value* JsonFunctions::get_json_array_from_parsed_json(
return root;
}
rapidjson::Value* JsonFunctions::get_json_object_from_parsed_json(
const std::vector<JsonPath>& parsed_paths,
rapidjson::Value* document,
rapidjson::Document::AllocatorType& mem_allocator) {
if (!parsed_paths[0].is_valid) {
return nullptr;
}
rapidjson::Value* root = match_value(parsed_paths, document, mem_allocator, true);
if (root == nullptr || root == document) { // not found
return nullptr;
}
return root;
}
void JsonFunctions::json_path_prepare(
doris_udf::FunctionContext* context,
doris_udf::FunctionContext::FunctionStateScope scope) {

View File

@ -95,6 +95,11 @@ public:
rapidjson::Value* document,
rapidjson::Document::AllocatorType& mem_allocator);
static rapidjson::Value* get_json_object_from_parsed_json(
const std::vector<JsonPath>& parsed_paths,
rapidjson::Value* document,
rapidjson::Document::AllocatorType& mem_allocator);
static void json_path_prepare(
doris_udf::FunctionContext*,
doris_udf::FunctionContext::FunctionStateScope);

View File

@ -371,11 +371,14 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
return Status::InvalidArgument("Invalid mem limit format");
}
}
if (!http_req->header(HTTP_EXEC_JSONPATHS).empty()) {
request.__set_jsonpaths(http_req->header(HTTP_EXEC_JSONPATHS));
if (!http_req->header(HTTP_JSONPATHS).empty()) {
request.__set_jsonpaths(http_req->header(HTTP_JSONPATHS));
}
if (!http_req->header(HTTP_EXEC_STRIP_OUTER_ARRAY).empty()) {
if (boost::iequals(http_req->header(HTTP_EXEC_STRIP_OUTER_ARRAY), "true")) {
if (!http_req->header(HTTP_JSONROOT).empty()) {
request.__set_json_root(http_req->header(HTTP_JSONROOT));
}
if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
if (boost::iequals(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
request.__set_strip_outer_array(true);
} else {
request.__set_strip_outer_array(false);

View File

@ -36,8 +36,9 @@ static const std::string HTTP_NEGATIVE = "negative";
static const std::string HTTP_STRICT_MODE = "strict_mode";
static const std::string HTTP_TIMEZONE = "timezone";
static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit";
static const std::string HTTP_EXEC_JSONPATHS = "jsonpaths";
static const std::string HTTP_EXEC_STRIP_OUTER_ARRAY = "strip_outer_array";
static const std::string HTTP_JSONPATHS = "jsonpaths";
static const std::string HTTP_JSONROOT = "json_root";
static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array";
static const std::string HTTP_100_CONTINUE = "100-continue";

View File

@ -401,6 +401,8 @@ TEST_F(JsonSannerTest, normal_simple_arrayjson) {
range.start_offset = 0;
range.size = -1;
range.format_type = TFileFormatType::FORMAT_JSON;
range.strip_outer_array = true;
range.__isset.strip_outer_array = true;
range.splittable = true;
range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
range.file_type = TFileType::FILE_LOCAL;

View File

@ -178,6 +178,9 @@ FROM data_source
8. `strip_outer_array`
Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false.
9. `json_root`
json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "".
5. data_source
The type of data source. Current support:
@ -461,6 +464,36 @@ FROM data_source
1)If the json data starts as an array and each object in the array is a record, you need to set the strip_outer_array to true to represent the flat array.
2)If the json data starts with an array, and each object in the array is a record, our ROOT node is actually an object in the array when we set jsonpath.
6. User specifies the json_root node
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"strip_outer_array" = "true",
"json_root" = "$.RECORDS"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
For example json data:
{
"RECORDS":[
{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
{"category":"22","author":"2avc","price":895,"timestamp":1589191487},
{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
]
}
## keyword
CREATE, ROUTINE, LOAD

View File

@ -120,6 +120,9 @@ There are two ways to import json: simple mode and matched mode. If jsonpath is
`strip_outer_array`
Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false.
`json_root`
json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "".
RETURN VALUES
After the load is completed, the related content of this load will be returned in Json format. Current field included
@ -213,7 +216,7 @@ Where url is the url given by ErrorURL.
{"category":"Linux","author":"avc","title":"Linux kernel","price":195}
]
11. Matched load json by jsonpaths
json data
For example json data:
[
{"category":"xuxb111","author":"1avc","title":"SayingsoftheCentury","price":895},
{"category":"xuxb222","author":"2avc","title":"SayingsoftheCentury","price":895},
@ -225,6 +228,18 @@ Where url is the url given by ErrorURL.
1)If the json data starts as an array and each object in the array is a record, you need to set the strip_outer_array to true to represent the flat array.
2)If the json data starts with an array, and each object in the array is a record, our ROOT node is actually an object in the array when we set jsonpath.
12. User specifies the json_root node
For example json data:
{
"RECORDS":[
{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
{"category":"22","author":"2avc","price":895,"timestamp":1589191487},
{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
]
}
Matched imports are made by specifying jsonpath parameter, such as `category`, `author`, and `price`, for example:
curl --location-trusted -u root -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/testDb/testTbl/_stream_load
## keyword
STREAM, LOAD

View File

@ -158,6 +158,10 @@ under the License.
布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。
9. json_root
json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。
5. data_source
数据源的类型。当前支持:
@ -398,6 +402,36 @@ under the License.
1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。
2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。
6. 用户指定根节点json_root
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"strip_outer_array" = "true",
"json_root" = "$.RECORDS"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
json数据格式:
{
"RECORDS":[
{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
{"category":"22","author":"2avc","price":895,"timestamp":1589191487},
{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
]
}
## keyword
CREATE,ROUTINE,LOAD

View File

@ -89,6 +89,8 @@ under the License.
]
当strip_outer_array为true,最后导入到doris中会生成两行数据。
json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。
RETURN VALUES
导入完成后,会以Json格式返回这次导入的相关内容。当前包括一下字段
Status: 导入最后的状态。
@ -172,6 +174,18 @@ under the License.
1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。
2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。
12. 用户指定json根节点
json数据格式:
{
"RECORDS":[
{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
{"category":"22","author":"2avc","price":895,"timestamp":1589191487},
{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
]
}
通过指定jsonpath进行精准导入,例如只导入category、author、price三个属性
curl --location-trusted -u root -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/testDb/testTbl/_stream_load
## keyword
STREAM,LOAD

View File

@ -92,6 +92,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String FORMAT = "format";// the value is csv or json, default is csv
public static final String STRIP_OUTER_ARRAY = "strip_outer_array";
public static final String JSONPATHS = "jsonpaths";
public static final String JSONROOT = "json_root";
// kafka type properties
public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list";
@ -113,6 +114,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(FORMAT)
.add(JSONPATHS)
.add(STRIP_OUTER_ARRAY)
.add(JSONROOT)
.add(LoadStmt.STRICT_MODE)
.add(LoadStmt.TIMEZONE)
.build();
@ -149,11 +151,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
* 1) dataFormat = "json"
* 2) jsonPaths = "$.XXX.xxx"
*/
private String format = ""; //default is csv.
private String jsonPaths = "";
private String format = ""; //default is csv.
private String jsonPaths = "";
private String jsonRoot = ""; // MUST be a jsonpath string
private boolean stripOuterArray = false;
// kafka related properties
private String kafkaBrokerList;
private String kafkaTopic;
@ -240,6 +242,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return jsonPaths;
}
public String getJsonRoot() {
return jsonRoot;
}
public String getKafkaBrokerList() {
return kafkaBrokerList;
}
@ -364,6 +370,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
} else if (format.equalsIgnoreCase("json")) {
format = "json";
jsonPaths = jobProperties.get(JSONPATHS);
jsonRoot = jobProperties.get(JSONROOT);
stripOuterArray = Boolean.valueOf(jobProperties.get(STRIP_OUTER_ARRAY));
} else {
throw new UserException("Format type is invalid. format=`" + format + "`");

View File

@ -61,7 +61,6 @@ import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -70,7 +69,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -184,6 +182,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
private static final String PROPS_FORMAT = "format";
private static final String PROPS_STRIP_OUTER_ARRAY = "strip_outer_array";
private static final String PROPS_JSONPATHS = "jsonpaths";
private static final String PROPS_JSONROOT = "json_root";
protected int currentTaskConcurrentNum;
protected RoutineLoadProgress progress;
@ -285,6 +284,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
jobProperties.put(PROPS_FORMAT, "csv");
jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false");
jobProperties.put(PROPS_JSONPATHS, "");
jobProperties.put(PROPS_JSONROOT, "");
} else if (stmt.getFormat().equals("json")) {
jobProperties.put(PROPS_FORMAT, "json");
if (!Strings.isNullOrEmpty(stmt.getJsonPaths())) {
@ -292,6 +292,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
} else {
jobProperties.put(PROPS_JSONPATHS, "");
}
if (!Strings.isNullOrEmpty(stmt.getJsonRoot())) {
jobProperties.put(PROPS_JSONROOT, stmt.getJsonRoot());
} else {
jobProperties.put(PROPS_JSONROOT, "");
}
if (stmt.isStripOuterArray()) {
jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "true");
} else {
@ -472,6 +477,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return value;
}
public String getJsonRoot() {
String value = jobProperties.get(PROPS_JSONROOT);
if (value == null) {
return "";
}
return value;
}
public int getSizeOfRoutineLoadTaskInfoList() {
readLock();
try {

View File

@ -103,6 +103,9 @@ public class StreamLoadScanNode extends LoadScanNode {
if (!streamLoadTask.getJsonPaths().isEmpty()) {
rangeDesc.setJsonpaths(streamLoadTask.getJsonPaths());
}
if (!streamLoadTask.getJsonRoot().isEmpty()) {
rangeDesc.setJson_root(streamLoadTask.getJsonRoot());
}
rangeDesc.setStrip_outer_array(streamLoadTask.isStripOuterArray());
}
rangeDesc.splittable = false;

View File

@ -53,6 +53,7 @@ public class StreamLoadTask {
private TFileFormatType formatType;
private boolean stripOuterArray;
private String jsonPaths;
private String jsonRoot;
// optional
private List<ImportColumnDesc> columnExprDescs = Lists.newArrayList();
@ -72,6 +73,7 @@ public class StreamLoadTask {
this.fileType = fileType;
this.formatType = formatType;
this.jsonPaths = "";
this.jsonRoot = "";
this.stripOuterArray = false;
}
@ -143,6 +145,14 @@ public class StreamLoadTask {
this.jsonPaths = jsonPaths;
}
public String getJsonRoot() {
return jsonRoot;
}
public void setJsonRoot(String jsonRoot) {
this.jsonRoot = jsonRoot;
}
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request, Database db) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType());
@ -194,6 +204,9 @@ public class StreamLoadTask {
if (request.getJsonpaths() != null) {
jsonPaths = request.getJsonpaths();
}
if (request.getJson_root() != null) {
jsonRoot = request.getJson_root();
}
stripOuterArray = request.isStrip_outer_array();
}
}
@ -225,6 +238,9 @@ public class StreamLoadTask {
if (!routineLoadJob.getJsonPaths().isEmpty()) {
jsonPaths = routineLoadJob.getJsonPaths();
}
if (!routineLoadJob.getJsonRoot().isEmpty()) {
jsonRoot = routineLoadJob.getJsonRoot();
}
stripOuterArray = routineLoadJob.isStripOuterArray();
}

View File

@ -551,6 +551,7 @@ struct TStreamLoadPutRequest {
23: optional bool strip_outer_array
24: optional string jsonpaths
25: optional i64 thrift_rpc_timeout_ms
26: optional string json_root
}
struct TStreamLoadPutResult {

View File

@ -129,6 +129,7 @@ struct TBrokerRangeDesc {
// it's usefull when format_type == FORMAT_JSON
11: optional bool strip_outer_array;
12: optional string jsonpaths;
13: optional string json_root;
}
struct TBrokerScanRangeParams {