[enhancement]improve performance of json load (#5055)
* imporve performance of json load
This commit is contained in:
@ -152,7 +152,8 @@ Status JsonScanner::open_next_reader() {
|
||||
if (range.__isset.num_as_string) {
|
||||
num_as_string = range.num_as_string;
|
||||
}
|
||||
_cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array, num_as_string);
|
||||
_cur_file_reader =
|
||||
new JsonReader(_state, _counter, _profile, file, strip_outer_array, num_as_string);
|
||||
RETURN_IF_ERROR(_cur_file_reader->init(jsonpath, json_root));
|
||||
|
||||
return Status::OK();
|
||||
@ -183,25 +184,22 @@ rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() {
|
||||
}
|
||||
|
||||
////// class JsonReader
|
||||
JsonReader::JsonReader(
|
||||
RuntimeState* state, ScannerCounter* counter,
|
||||
RuntimeProfile* profile,
|
||||
FileReader* file_reader,
|
||||
bool strip_outer_array,
|
||||
bool num_as_string) :
|
||||
_handle_json_callback(nullptr),
|
||||
_next_line(0),
|
||||
_total_lines(0),
|
||||
_state(state),
|
||||
_counter(counter),
|
||||
_profile(profile),
|
||||
_file_reader(file_reader),
|
||||
_closed(false),
|
||||
_strip_outer_array(strip_outer_array),
|
||||
_num_as_string(num_as_string),
|
||||
_json_doc(nullptr) {
|
||||
JsonReader::JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
|
||||
FileReader* file_reader, bool strip_outer_array, bool num_as_string)
|
||||
: _handle_json_callback(nullptr),
|
||||
_next_line(0),
|
||||
_total_lines(0),
|
||||
_state(state),
|
||||
_counter(counter),
|
||||
_profile(profile),
|
||||
_file_reader(file_reader),
|
||||
_closed(false),
|
||||
_strip_outer_array(strip_outer_array),
|
||||
_num_as_string(num_as_string),
|
||||
_json_doc(nullptr) {
|
||||
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
|
||||
_read_timer = ADD_TIMER(_profile, "FileReadTime");
|
||||
_read_timer = ADD_TIMER(_profile, "ReadTime");
|
||||
_file_read_timer = ADD_TIMER(_profile, "FileReadTime");
|
||||
}
|
||||
|
||||
JsonReader::~JsonReader() {
|
||||
@ -273,9 +271,11 @@ void JsonReader::_close() {
|
||||
// return Status::OK() if parse succeed or reach EOF.
|
||||
Status JsonReader::_parse_json_doc(bool* eof) {
|
||||
// read a whole message, must be delete json_str by `delete[]`
|
||||
SCOPED_TIMER(_file_read_timer);
|
||||
uint8_t* json_str = nullptr;
|
||||
size_t length = 0;
|
||||
RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length));
|
||||
_bytes_read_counter += length;
|
||||
if (length == 0) {
|
||||
*eof = true;
|
||||
return Status::OK();
|
||||
@ -286,7 +286,10 @@ Status JsonReader::_parse_json_doc(bool* eof) {
|
||||
// As the issue: https://github.com/Tencent/rapidjson/issues/1458
|
||||
// Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag.
|
||||
if (_num_as_string) {
|
||||
has_parse_error = _origin_json_doc.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, length).HasParseError();
|
||||
has_parse_error =
|
||||
_origin_json_doc
|
||||
.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, length)
|
||||
.HasParseError();
|
||||
} else {
|
||||
has_parse_error = _origin_json_doc.Parse((char*)json_str, length).HasParseError();
|
||||
}
|
||||
@ -425,7 +428,9 @@ void JsonReader::_write_data_to_tuple(rapidjson::Value::ConstValueIterator value
|
||||
// for simple format json
|
||||
void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple,
|
||||
const std::vector<SlotDescriptor*>& slot_descs,
|
||||
const std::vector<rapidjson::Value>& value_key,
|
||||
MemPool* tuple_pool, bool* valid) {
|
||||
DCHECK(slot_descs.size() == value_key.size());
|
||||
if (!objectValue.IsObject()) {
|
||||
// Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"},
|
||||
// not other type of Json format.
|
||||
@ -437,20 +442,21 @@ void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple,
|
||||
}
|
||||
|
||||
int nullcount = 0;
|
||||
for (auto v : slot_descs) {
|
||||
if (objectValue.HasMember(v->col_name().c_str())) {
|
||||
rapidjson::Value& value = objectValue[v->col_name().c_str()];
|
||||
_write_data_to_tuple(&value, v, tuple, tuple_pool, valid);
|
||||
for (int i = 0; i < slot_descs.size(); ++i) {
|
||||
rapidjson::Value::ConstMemberIterator it = objectValue.FindMember(value_key[i]);
|
||||
if (it != objectValue.MemberEnd()) {
|
||||
const rapidjson::Value& value = it->value;
|
||||
_write_data_to_tuple(&value, slot_descs[i], tuple, tuple_pool, valid);
|
||||
if (!(*valid)) {
|
||||
return;
|
||||
}
|
||||
} else { // not found
|
||||
if (v->is_nullable()) {
|
||||
tuple->set_null(v->null_indicator_offset());
|
||||
if (slot_descs[i]->is_nullable()) {
|
||||
tuple->set_null(slot_descs[i]->null_indicator_offset());
|
||||
nullcount++;
|
||||
} else {
|
||||
std::stringstream str_error;
|
||||
str_error << "The column `" << v->col_name()
|
||||
str_error << "The column `" << slot_descs[i]->col_name()
|
||||
<< "` is not nullable, but it's not found in jsondata.";
|
||||
_state->append_error_msg_to_file(_print_json_value(objectValue), str_error.str());
|
||||
_counter->num_rows_filtered++;
|
||||
@ -481,6 +487,11 @@ void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple,
|
||||
*/
|
||||
Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs,
|
||||
MemPool* tuple_pool, bool* eof) {
|
||||
// If you use a string as the key to find the json object, strlen will be called every time, so the key is constructed in advance
|
||||
std::vector<rapidjson::Value> value_key;
|
||||
for (auto v : slot_descs) {
|
||||
value_key.emplace_back(v->col_name().c_str(), v->col_name().size());
|
||||
}
|
||||
do {
|
||||
bool valid = false;
|
||||
if (_next_line >= _total_lines) { // parse json and generic document
|
||||
@ -512,9 +523,9 @@ Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector<SlotDescr
|
||||
|
||||
if (_json_doc->IsArray()) { // handle case 1
|
||||
rapidjson::Value& objectValue = (*_json_doc)[_next_line]; // json object
|
||||
_set_tuple_value(objectValue, tuple, slot_descs, tuple_pool, &valid);
|
||||
_set_tuple_value(objectValue, tuple, slot_descs, value_key, tuple_pool, &valid);
|
||||
} else { // handle case 2
|
||||
_set_tuple_value(*_json_doc, tuple, slot_descs, tuple_pool, &valid);
|
||||
_set_tuple_value(*_json_doc, tuple, slot_descs, value_key, tuple_pool, &valid);
|
||||
}
|
||||
_next_line++;
|
||||
if (!valid) {
|
||||
|
||||
@ -103,8 +103,8 @@ struct JsonPath;
|
||||
// return other error Status if encounter other errors.
|
||||
class JsonReader {
|
||||
public:
|
||||
JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, FileReader* file_reader,
|
||||
bool strip_outer_array, bool num_as_string);
|
||||
JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
|
||||
FileReader* file_reader, bool strip_outer_array, bool num_as_string);
|
||||
|
||||
~JsonReader();
|
||||
|
||||
@ -129,7 +129,8 @@ private:
|
||||
const uint8_t* value, int32_t len);
|
||||
Status _parse_json_doc(bool* eof);
|
||||
void _set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple,
|
||||
const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool,
|
||||
const std::vector<SlotDescriptor*>& slot_descs,
|
||||
const std::vector<rapidjson::Value>& value_key, MemPool* tuple_pool,
|
||||
bool* valid);
|
||||
void _write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc,
|
||||
Tuple* tuple, MemPool* tuple_pool, bool* valid);
|
||||
@ -154,6 +155,7 @@ private:
|
||||
bool _num_as_string;
|
||||
RuntimeProfile::Counter* _bytes_read_counter;
|
||||
RuntimeProfile::Counter* _read_timer;
|
||||
RuntimeProfile::Counter* _file_read_timer;
|
||||
|
||||
std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
|
||||
std::vector<JsonPath> _parsed_json_root;
|
||||
|
||||
Reference in New Issue
Block a user