[fix](orc) fix heap-use-after-free and potential memory leak of orc reader (#17431)
fix heap-use-after-free The OrcReader has a internal FileInputStream, If the file is empty, the memory of FileInputStream will leak. Besides, there is a Statistics instance in FileInputStream. FileInputStream maybe delete if the orc reader is inited failed, but Statistics maybe used when orc reader is closed, causing heap-use-after-free error. Potential memory leak When init file scanner in file scan node, the file scanner prepare failed, the memory of file scanner will leak.
This commit is contained in:
@ -300,7 +300,7 @@ void DumpSignalInfo(int signal_number, siginfo_t* siginfo) {
|
||||
if (reason != nullptr) {
|
||||
formatter.AppendString(reason);
|
||||
} else {
|
||||
formatter.AppendString("unkown detail explain");
|
||||
formatter.AppendString("unknown detail explain");
|
||||
}
|
||||
formatter.AppendString(" (@0x");
|
||||
formatter.AppendUint64(reinterpret_cast<uintptr_t>(siginfo->si_addr), 16);
|
||||
|
||||
@ -48,9 +48,9 @@ namespace doris::vectorized {
|
||||
M(TypeIndex::Float64, Float64, orc::DoubleVectorBatch)
|
||||
|
||||
void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
|
||||
_statistics.read_calls++;
|
||||
_statistics.read_bytes += length;
|
||||
SCOPED_RAW_TIMER(&_statistics.read_time);
|
||||
_statistics->fs_read_calls++;
|
||||
_statistics->fs_read_bytes += length;
|
||||
SCOPED_RAW_TIMER(&_statistics->fs_read_time);
|
||||
uint64_t has_read = 0;
|
||||
char* out = reinterpret_cast<char*>(buf);
|
||||
IOContext io_ctx;
|
||||
@ -113,23 +113,28 @@ OrcReader::~OrcReader() {
|
||||
|
||||
void OrcReader::close() {
|
||||
if (!_closed) {
|
||||
if (_profile != nullptr) {
|
||||
if (_file_reader != nullptr) {
|
||||
auto& fst = _file_reader->statistics();
|
||||
COUNTER_UPDATE(_orc_profile.read_time, fst.read_time);
|
||||
COUNTER_UPDATE(_orc_profile.read_calls, fst.read_calls);
|
||||
COUNTER_UPDATE(_orc_profile.read_bytes, fst.read_bytes);
|
||||
}
|
||||
COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time);
|
||||
COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time);
|
||||
COUNTER_UPDATE(_orc_profile.parse_meta_time, _statistics.parse_meta_time);
|
||||
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
|
||||
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
|
||||
}
|
||||
_collect_profile_on_close();
|
||||
_closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
void OrcReader::_collect_profile_on_close() {
|
||||
if (_profile != nullptr) {
|
||||
COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time);
|
||||
COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls);
|
||||
COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes);
|
||||
COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time);
|
||||
COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time);
|
||||
COUNTER_UPDATE(_orc_profile.parse_meta_time, _statistics.parse_meta_time);
|
||||
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
|
||||
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t OrcReader::size() const {
|
||||
return _file_input_stream->getLength();
|
||||
}
|
||||
|
||||
void OrcReader::_init_profile() {
|
||||
if (_profile != nullptr) {
|
||||
static const char* orc_profile = "OrcReader";
|
||||
@ -146,33 +151,33 @@ void OrcReader::_init_profile() {
|
||||
}
|
||||
}
|
||||
|
||||
Status OrcReader::init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
|
||||
if (_file_reader == nullptr) {
|
||||
Status OrcReader::_create_file_reader() {
|
||||
if (_file_input_stream == nullptr) {
|
||||
io::FileReaderSPtr inner_reader;
|
||||
|
||||
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties,
|
||||
_file_description, &_file_system,
|
||||
&inner_reader, _io_ctx));
|
||||
|
||||
_file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
|
||||
_file_input_stream.reset(
|
||||
new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics));
|
||||
}
|
||||
if (_file_reader->getLength() == 0) {
|
||||
return Status::EndOfFile("init reader failed, empty orc file: " + _scan_range.path);
|
||||
if (_file_input_stream->getLength() == 0) {
|
||||
return Status::EndOfFile("empty orc file: " + _scan_range.path);
|
||||
}
|
||||
|
||||
// create orc reader
|
||||
try {
|
||||
orc::ReaderOptions options;
|
||||
_reader = orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options);
|
||||
_reader = orc::createReader(
|
||||
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
|
||||
} catch (std::exception& e) {
|
||||
return Status::InternalError("Init OrcReader failed. reason = {}", e.what());
|
||||
}
|
||||
if (_reader->getNumberOfRows() == 0) {
|
||||
return Status::EndOfFile("init reader failed, empty orc file with row num 0: " +
|
||||
_scan_range.path);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status OrcReader::init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
|
||||
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
|
||||
RETURN_IF_ERROR(_create_file_reader());
|
||||
// _init_bloom_filter(colname_to_value_range);
|
||||
|
||||
// create orc row reader
|
||||
@ -206,27 +211,7 @@ Status OrcReader::init_reader(
|
||||
|
||||
Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
|
||||
std::vector<TypeDescriptor>* col_types) {
|
||||
if (_file_reader == nullptr) {
|
||||
io::FileReaderSPtr inner_reader;
|
||||
|
||||
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties,
|
||||
_file_description, &_file_system,
|
||||
&inner_reader, _io_ctx));
|
||||
|
||||
_file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
|
||||
}
|
||||
if (_file_reader->getLength() == 0) {
|
||||
return Status::EndOfFile("get parsed schema fail, empty orc file: " + _scan_range.path);
|
||||
}
|
||||
|
||||
// create orc reader
|
||||
try {
|
||||
orc::ReaderOptions options;
|
||||
_reader = orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options);
|
||||
} catch (std::exception& e) {
|
||||
return Status::InternalError("Init OrcReader failed. reason = {}", e.what());
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(_create_file_reader());
|
||||
auto& root_type = _reader->getType();
|
||||
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
|
||||
col_names->emplace_back(_get_field_name_lower_case(&root_type, i));
|
||||
|
||||
@ -31,38 +31,13 @@
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
class ORCFileInputStream : public orc::InputStream {
|
||||
public:
|
||||
struct Statistics {
|
||||
int64_t read_time = 0;
|
||||
int64_t read_calls = 0;
|
||||
int64_t read_bytes = 0;
|
||||
};
|
||||
|
||||
ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr file_reader)
|
||||
: _file_name(file_name), _file_reader(file_reader) {}
|
||||
|
||||
~ORCFileInputStream() override = default;
|
||||
|
||||
uint64_t getLength() const override { return _file_reader->size(); }
|
||||
|
||||
uint64_t getNaturalReadSize() const override { return config::orc_natural_read_size_mb << 20; }
|
||||
|
||||
void read(void* buf, uint64_t length, uint64_t offset) override;
|
||||
|
||||
const std::string& getName() const override { return _file_name; }
|
||||
|
||||
Statistics& statistics() { return _statistics; }
|
||||
|
||||
private:
|
||||
Statistics _statistics;
|
||||
const std::string& _file_name;
|
||||
io::FileReaderSPtr _file_reader;
|
||||
};
|
||||
|
||||
class ORCFileInputStream;
|
||||
class OrcReader : public GenericReader {
|
||||
public:
|
||||
struct Statistics {
|
||||
int64_t fs_read_time = 0;
|
||||
int64_t fs_read_calls = 0;
|
||||
int64_t fs_read_bytes = 0;
|
||||
int64_t column_read_time = 0;
|
||||
int64_t get_batch_time = 0;
|
||||
int64_t parse_meta_time = 0;
|
||||
@ -79,10 +54,6 @@ public:
|
||||
IOContext* io_ctx);
|
||||
|
||||
~OrcReader() override;
|
||||
// for test
|
||||
void set_file_reader(const std::string& file_name, io::FileReaderSPtr file_reader) {
|
||||
_file_reader = new ORCFileInputStream(file_name, file_reader);
|
||||
}
|
||||
|
||||
Status init_reader(
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
|
||||
@ -91,7 +62,7 @@ public:
|
||||
|
||||
void close();
|
||||
|
||||
int64_t size() const { return _file_reader->getLength(); }
|
||||
int64_t size() const;
|
||||
|
||||
std::unordered_map<std::string, TypeDescriptor> get_name_to_type() override;
|
||||
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
|
||||
@ -111,6 +82,12 @@ private:
|
||||
RuntimeProfile::Counter* decode_value_time;
|
||||
RuntimeProfile::Counter* decode_null_map_time;
|
||||
};
|
||||
|
||||
// Create inner orc file,
|
||||
// return EOF if file is empty
|
||||
// return EROOR if encounter error.
|
||||
Status _create_file_reader();
|
||||
|
||||
void _init_profile();
|
||||
Status _init_read_columns();
|
||||
TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
|
||||
@ -261,6 +238,9 @@ private:
|
||||
|
||||
std::string _get_field_name_lower_case(const orc::Type* orc_type, int pos);
|
||||
|
||||
void _collect_profile_on_close();
|
||||
|
||||
private:
|
||||
RuntimeProfile* _profile;
|
||||
const TFileScanRangeParams& _scan_params;
|
||||
const TFileRangeDesc& _scan_range;
|
||||
@ -284,7 +264,7 @@ private:
|
||||
// Flag for hive engine. True if the external table engine is Hive.
|
||||
bool _is_hive = false;
|
||||
std::vector<const orc::Type*> _col_orc_type;
|
||||
ORCFileInputStream* _file_reader = nullptr;
|
||||
std::unique_ptr<ORCFileInputStream> _file_input_stream;
|
||||
Statistics _statistics;
|
||||
OrcProfile _orc_profile;
|
||||
bool _closed = false;
|
||||
@ -303,4 +283,27 @@ private:
|
||||
DecimalScaleParams _decimal_scale_params;
|
||||
};
|
||||
|
||||
class ORCFileInputStream : public orc::InputStream {
|
||||
public:
|
||||
ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr file_reader,
|
||||
OrcReader::Statistics* statistics)
|
||||
: _file_name(file_name), _file_reader(file_reader), _statistics(statistics) {}
|
||||
|
||||
~ORCFileInputStream() override = default;
|
||||
|
||||
uint64_t getLength() const override { return _file_reader->size(); }
|
||||
|
||||
uint64_t getNaturalReadSize() const override { return config::orc_natural_read_size_mb << 20; }
|
||||
|
||||
void read(void* buf, uint64_t length, uint64_t offset) override;
|
||||
|
||||
const std::string& getName() const override { return _file_name; }
|
||||
|
||||
private:
|
||||
const std::string& _file_name;
|
||||
io::FileReaderSPtr _file_reader;
|
||||
// Owned by OrcReader
|
||||
OrcReader::Statistics* _statistics;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -88,20 +88,16 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
}
|
||||
|
||||
for (auto& scan_range : _scan_ranges) {
|
||||
VScanner* scanner =
|
||||
(VScanner*)_create_scanner(scan_range.scan_range.ext_scan_range.file_scan_range);
|
||||
VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner,
|
||||
scan_range.scan_range.ext_scan_range.file_scan_range,
|
||||
runtime_profile(), _kv_cache);
|
||||
_scanner_pool.add(scanner);
|
||||
RETURN_IF_ERROR(((VFileScanner*)scanner)
|
||||
->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range));
|
||||
scanners->push_back(scanner);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
|
||||
VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range,
|
||||
runtime_profile(), _kv_cache);
|
||||
((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range);
|
||||
_scanner_pool.add(scanner);
|
||||
return scanner;
|
||||
}
|
||||
|
||||
}; // namespace doris::vectorized
|
||||
|
||||
@ -38,8 +38,6 @@ protected:
|
||||
Status _init_scanners(std::list<VScanner*>* scanners) override;
|
||||
|
||||
private:
|
||||
VScanner* _create_scanner(const TFileScanRange& scan_range);
|
||||
|
||||
std::vector<TScanRangeParams> _scan_ranges;
|
||||
KVCache<std::string> _kv_cache;
|
||||
};
|
||||
|
||||
@ -54,8 +54,8 @@ Status VMetaScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
for (auto& scan_range : _scan_ranges) {
|
||||
VMetaScanner* scanner = new VMetaScanner(_state, this, _tuple_id, scan_range,
|
||||
_limit_per_scanner, runtime_profile());
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
|
||||
_scanner_pool.add(scanner);
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
|
||||
scanners->push_back(static_cast<VScanner*>(scanner));
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
Reference in New Issue
Block a user