From 6afa14cda717f7776566bb7504c9852409f7caaa Mon Sep 17 00:00:00 2001 From: stdpain <34912776+stdpain@users.noreply.github.com> Date: Tue, 15 Dec 2020 22:55:47 +0800 Subject: [PATCH] [Bug] Fix Memory Leak in Json Load (#5073) fix json load memory leak #5069 --- be/src/exec/broker_reader.cpp | 2 +- be/src/exec/broker_reader.h | 2 +- be/src/exec/buffered_reader.cpp | 3 ++- be/src/exec/buffered_reader.h | 2 +- be/src/exec/file_reader.h | 7 ++----- be/src/exec/json_scanner.cpp | 11 +++++------ be/src/exec/local_file_reader.cpp | 12 ++++-------- be/src/exec/local_file_reader.h | 2 +- be/src/runtime/stream_load/stream_load_pipe.h | 14 +++++++------- 9 files changed, 24 insertions(+), 31 deletions(-) diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp index e5f67b618c..64e587e65e 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/exec/broker_reader.cpp @@ -122,7 +122,7 @@ Status BrokerReader::open() { } //not support -Status BrokerReader::read_one_message(uint8_t** buf, size_t* length) { +Status BrokerReader::read_one_message(std::unique_ptr* buf, size_t* length) { return Status::NotSupported("Not support"); } diff --git a/be/src/exec/broker_reader.h b/be/src/exec/broker_reader.h index 4730260588..d6efc8e984 100644 --- a/be/src/exec/broker_reader.h +++ b/be/src/exec/broker_reader.h @@ -50,7 +50,7 @@ public: virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(uint8_t** buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp index 96640b8a07..e9eb0e9647 100644 --- a/be/src/exec/buffered_reader.cpp +++ b/be/src/exec/buffered_reader.cpp @@ -50,8 +50,9 @@ Status BufferedReader::open() { } //not support -Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) { +Status BufferedReader::read_one_message(std::unique_ptr* buf, size_t* length) { return Status::NotSupported("Not support"); + } Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) { diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h index c347ba4ce5..1eb185c13c 100644 --- a/be/src/exec/buffered_reader.h +++ b/be/src/exec/buffered_reader.h @@ -41,7 +41,7 @@ public: virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(uint8_t** buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/exec/file_reader.h b/be/src/exec/file_reader.h index 19b4660a58..6c7cb32394 100644 --- a/be/src/exec/file_reader.h +++ b/be/src/exec/file_reader.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "common/status.h" @@ -39,12 +40,8 @@ public: * * if read eof then return Status::OK and length is set 0 and buf is set NULL, * other return readed bytes. - * - * !! Important !! - * the buf must be deleted by user, otherwise leak memory - * !! Important !! */ - virtual Status read_one_message(uint8_t** buf, size_t* length) = 0; + virtual Status read_one_message(std::unique_ptr* buf, size_t* length) = 0; virtual int64_t size() = 0; virtual Status seek(int64_t position) = 0; virtual Status tell(int64_t* position) = 0; diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index bf09b09dde..a07bf8ac99 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -272,7 +272,7 @@ void JsonReader::_close() { Status JsonReader::_parse_json_doc(bool* eof) { // read a whole message, must be delete json_str by `delete[]` SCOPED_TIMER(_file_read_timer); - uint8_t* json_str = nullptr; + std::unique_ptr json_str; size_t length = 0; RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length)); _bytes_read_counter += length; @@ -283,15 +283,16 @@ Status JsonReader::_parse_json_doc(bool* eof) { bool has_parse_error = false; // parse jsondata to JsonDoc + // As the issue: https://github.com/Tencent/rapidjson/issues/1458 // Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag. if (_num_as_string) { has_parse_error = _origin_json_doc - .Parse((char*)json_str, length) + .Parse((char*)json_str.get(), length) .HasParseError(); } else { - has_parse_error = _origin_json_doc.Parse((char*)json_str, length).HasParseError(); + has_parse_error = _origin_json_doc.Parse((char*)json_str.get(), length).HasParseError(); } if (has_parse_error) { @@ -299,12 +300,10 @@ Status JsonReader::_parse_json_doc(bool* eof) { str_error << "Parse json data for JsonDoc failed. code = " << _origin_json_doc.GetParseError() << ", error-info:" << rapidjson::GetParseError_En(_origin_json_doc.GetParseError()); - _state->append_error_msg_to_file(std::string((char*)json_str, length), str_error.str()); + _state->append_error_msg_to_file(std::string((char*)json_str.get(), length), str_error.str()); _counter->num_rows_filtered++; - delete[] json_str; return Status::DataQualityError(str_error.str()); } - delete[] json_str; // set json root if (_parsed_json_root.size() != 0) { diff --git a/be/src/exec/local_file_reader.cpp b/be/src/exec/local_file_reader.cpp index 7024557377..04a795ac5c 100644 --- a/be/src/exec/local_file_reader.cpp +++ b/be/src/exec/local_file_reader.cpp @@ -53,21 +53,17 @@ bool LocalFileReader::closed() { } // Read all bytes -Status LocalFileReader::read_one_message(uint8_t** buf, size_t* length) { +Status LocalFileReader::read_one_message(std::unique_ptr* buf, size_t* length) { bool eof; int64_t file_size = size() - _current_offset; if (file_size <= 0) { - *buf = nullptr; + buf->reset(); *length = 0; return Status::OK(); } *length = file_size; - *buf = new uint8_t[file_size]; - read(*buf, length, &eof); - if (*length == 0) { - delete *buf; - *buf = nullptr; - } + buf->reset(new uint8_t[file_size]); + read(buf->get(), length, &eof); return Status::OK(); } diff --git a/be/src/exec/local_file_reader.h b/be/src/exec/local_file_reader.h index 07c1d191d2..4a302cca2d 100644 --- a/be/src/exec/local_file_reader.h +++ b/be/src/exec/local_file_reader.h @@ -38,7 +38,7 @@ public: virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(uint8_t** buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 6b36847d65..0861da5181 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -86,7 +86,7 @@ public: // If _total_length == -1, this should be a Kafka routine load task, // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. // Otherwise, this should be a stream load task that needs to read the specified amount of data. - Status read_one_message(uint8_t** data, size_t* length) override { + Status read_one_message(std::unique_ptr* data, size_t* length) override { if (_total_length < -1) { std::stringstream ss; ss << "invalid, _total_length is: " << _total_length; @@ -102,10 +102,10 @@ public: } // _total_length > 0, read the entire data - *data = new uint8_t[_total_length]; + data->reset(new uint8_t[_total_length]); *length = _total_length; bool eof = false; - Status st = read(*data, length, &eof); + Status st = read(data->get(), length, &eof); if (eof) { *length = 0; } @@ -188,7 +188,7 @@ public: private: // read the next buffer from _buf_queue - Status _read_next_buffer(uint8_t** data, size_t* length) { + Status _read_next_buffer(std::unique_ptr* data, size_t* length) { std::unique_lock l(_lock); while (!_cancelled && !_finished && _buf_queue.empty()) { _get_cond.wait(l); @@ -200,14 +200,14 @@ private: // finished if (_buf_queue.empty()) { DCHECK(_finished); - *data = nullptr; + data->reset(); *length = 0; return Status::OK(); } auto buf = _buf_queue.front(); *length = buf->remaining(); - *data = new uint8_t[*length]; - buf->get_bytes((char*)(*data), *length); + data->reset(new uint8_t[*length]); + buf->get_bytes((char*)(data->get()), *length); _buf_queue.pop_front(); _buffered_bytes -= buf->limit;