[Bug] Fix Memory Leak in Json Load (#5073)
fix json load memory leak #5069
This commit is contained in:
@ -122,7 +122,7 @@ Status BrokerReader::open() {
|
||||
}
|
||||
|
||||
//not support
|
||||
Status BrokerReader::read_one_message(uint8_t** buf, size_t* length) {
|
||||
Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
|
||||
return Status::NotSupported("Not support");
|
||||
}
|
||||
|
||||
|
||||
@ -50,7 +50,7 @@ public:
|
||||
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
|
||||
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
|
||||
void* out) override;
|
||||
virtual Status read_one_message(uint8_t** buf, size_t* length) override;
|
||||
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
|
||||
virtual int64_t size() override;
|
||||
virtual Status seek(int64_t position) override;
|
||||
virtual Status tell(int64_t* position) override;
|
||||
|
||||
@ -50,8 +50,9 @@ Status BufferedReader::open() {
|
||||
}
|
||||
|
||||
//not support
|
||||
Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
|
||||
Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
|
||||
return Status::NotSupported("Not support");
|
||||
|
||||
}
|
||||
|
||||
Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
|
||||
|
||||
@ -41,7 +41,7 @@ public:
|
||||
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
|
||||
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
|
||||
void* out) override;
|
||||
virtual Status read_one_message(uint8_t** buf, size_t* length) override;
|
||||
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
|
||||
virtual int64_t size() override;
|
||||
virtual Status seek(int64_t position) override;
|
||||
virtual Status tell(int64_t* position) override;
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
#include <memory>
|
||||
|
||||
#include "common/status.h"
|
||||
|
||||
@ -39,12 +40,8 @@ public:
|
||||
*
|
||||
* if read eof then return Status::OK and length is set 0 and buf is set NULL,
|
||||
* other return readed bytes.
|
||||
*
|
||||
* !! Important !!
|
||||
* the buf must be deleted by user, otherwise leak memory
|
||||
* !! Important !!
|
||||
*/
|
||||
virtual Status read_one_message(uint8_t** buf, size_t* length) = 0;
|
||||
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) = 0;
|
||||
virtual int64_t size() = 0;
|
||||
virtual Status seek(int64_t position) = 0;
|
||||
virtual Status tell(int64_t* position) = 0;
|
||||
|
||||
@ -272,7 +272,7 @@ void JsonReader::_close() {
|
||||
Status JsonReader::_parse_json_doc(bool* eof) {
|
||||
// read a whole message, must be delete json_str by `delete[]`
|
||||
SCOPED_TIMER(_file_read_timer);
|
||||
uint8_t* json_str = nullptr;
|
||||
std::unique_ptr<uint8_t[]> json_str;
|
||||
size_t length = 0;
|
||||
RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length));
|
||||
_bytes_read_counter += length;
|
||||
@ -283,15 +283,16 @@ Status JsonReader::_parse_json_doc(bool* eof) {
|
||||
|
||||
bool has_parse_error = false;
|
||||
// parse jsondata to JsonDoc
|
||||
|
||||
// As the issue: https://github.com/Tencent/rapidjson/issues/1458
|
||||
// Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag.
|
||||
if (_num_as_string) {
|
||||
has_parse_error =
|
||||
_origin_json_doc
|
||||
.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, length)
|
||||
.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str.get(), length)
|
||||
.HasParseError();
|
||||
} else {
|
||||
has_parse_error = _origin_json_doc.Parse((char*)json_str, length).HasParseError();
|
||||
has_parse_error = _origin_json_doc.Parse((char*)json_str.get(), length).HasParseError();
|
||||
}
|
||||
|
||||
if (has_parse_error) {
|
||||
@ -299,12 +300,10 @@ Status JsonReader::_parse_json_doc(bool* eof) {
|
||||
str_error << "Parse json data for JsonDoc failed. code = "
|
||||
<< _origin_json_doc.GetParseError() << ", error-info:"
|
||||
<< rapidjson::GetParseError_En(_origin_json_doc.GetParseError());
|
||||
_state->append_error_msg_to_file(std::string((char*)json_str, length), str_error.str());
|
||||
_state->append_error_msg_to_file(std::string((char*)json_str.get(), length), str_error.str());
|
||||
_counter->num_rows_filtered++;
|
||||
delete[] json_str;
|
||||
return Status::DataQualityError(str_error.str());
|
||||
}
|
||||
delete[] json_str;
|
||||
|
||||
// set json root
|
||||
if (_parsed_json_root.size() != 0) {
|
||||
|
||||
@ -53,21 +53,17 @@ bool LocalFileReader::closed() {
|
||||
}
|
||||
|
||||
// Read all bytes
|
||||
Status LocalFileReader::read_one_message(uint8_t** buf, size_t* length) {
|
||||
Status LocalFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
|
||||
bool eof;
|
||||
int64_t file_size = size() - _current_offset;
|
||||
if (file_size <= 0) {
|
||||
*buf = nullptr;
|
||||
buf->reset();
|
||||
*length = 0;
|
||||
return Status::OK();
|
||||
}
|
||||
*length = file_size;
|
||||
*buf = new uint8_t[file_size];
|
||||
read(*buf, length, &eof);
|
||||
if (*length == 0) {
|
||||
delete *buf;
|
||||
*buf = nullptr;
|
||||
}
|
||||
buf->reset(new uint8_t[file_size]);
|
||||
read(buf->get(), length, &eof);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
|
||||
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
|
||||
void* out) override;
|
||||
virtual Status read_one_message(uint8_t** buf, size_t* length) override;
|
||||
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
|
||||
virtual int64_t size() override;
|
||||
virtual Status seek(int64_t position) override;
|
||||
virtual Status tell(int64_t* position) override;
|
||||
|
||||
@ -86,7 +86,7 @@ public:
|
||||
// If _total_length == -1, this should be a Kafka routine load task,
|
||||
// just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data.
|
||||
// Otherwise, this should be a stream load task that needs to read the specified amount of data.
|
||||
Status read_one_message(uint8_t** data, size_t* length) override {
|
||||
Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length) override {
|
||||
if (_total_length < -1) {
|
||||
std::stringstream ss;
|
||||
ss << "invalid, _total_length is: " << _total_length;
|
||||
@ -102,10 +102,10 @@ public:
|
||||
}
|
||||
|
||||
// _total_length > 0, read the entire data
|
||||
*data = new uint8_t[_total_length];
|
||||
data->reset(new uint8_t[_total_length]);
|
||||
*length = _total_length;
|
||||
bool eof = false;
|
||||
Status st = read(*data, length, &eof);
|
||||
Status st = read(data->get(), length, &eof);
|
||||
if (eof) {
|
||||
*length = 0;
|
||||
}
|
||||
@ -188,7 +188,7 @@ public:
|
||||
|
||||
private:
|
||||
// read the next buffer from _buf_queue
|
||||
Status _read_next_buffer(uint8_t** data, size_t* length) {
|
||||
Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
while (!_cancelled && !_finished && _buf_queue.empty()) {
|
||||
_get_cond.wait(l);
|
||||
@ -200,14 +200,14 @@ private:
|
||||
// finished
|
||||
if (_buf_queue.empty()) {
|
||||
DCHECK(_finished);
|
||||
*data = nullptr;
|
||||
data->reset();
|
||||
*length = 0;
|
||||
return Status::OK();
|
||||
}
|
||||
auto buf = _buf_queue.front();
|
||||
*length = buf->remaining();
|
||||
*data = new uint8_t[*length];
|
||||
buf->get_bytes((char*)(*data), *length);
|
||||
data->reset(new uint8_t[*length]);
|
||||
buf->get_bytes((char*)(data->get()), *length);
|
||||
|
||||
_buf_queue.pop_front();
|
||||
_buffered_bytes -= buf->limit;
|
||||
|
||||
Reference in New Issue
Block a user