[fix](load) PrefetchBufferedReader Crashing caused updating counter with an invalid runtime profile (#22464)

This commit is contained in:
Jerry Hu
2023-08-02 18:19:48 +08:00
committed by GitHub
parent 751a7680c5
commit 4bc65aa921
9 changed files with 47 additions and 6 deletions

View File

@ -624,8 +624,11 @@ PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
}
PrefetchBufferedReader::~PrefetchBufferedReader() {
close();
_closed = true;
/// set `_sync_profile` to nullptr to avoid updating counter after the runtime profile has been released.
std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
[](std::shared_ptr<PrefetchBuffer>& buffer) { buffer->_sync_profile = nullptr; });
/// Better not to call virtual functions in a destructor.
_close_internal();
}
Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
@ -654,6 +657,10 @@ Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t*
}
Status PrefetchBufferedReader::close() {
return _close_internal();
}
Status PrefetchBufferedReader::_close_internal() {
if (!_closed) {
_closed = true;
std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
@ -669,10 +676,14 @@ InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr reader) : _reader(std:
}
InMemoryFileReader::~InMemoryFileReader() {
close();
_close_internal();
}
Status InMemoryFileReader::close() {
return _close_internal();
}
Status InMemoryFileReader::_close_internal() {
if (!_closed) {
_closed = true;
return _reader->close();

View File

@ -383,6 +383,7 @@ protected:
const IOContext* io_ctx) override;
private:
Status _close_internal();
size_t get_buffer_pos(int64_t position) const {
return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size;
}
@ -436,6 +437,7 @@ protected:
const IOContext* io_ctx) override;
private:
Status _close_internal();
io::FileReaderSPtr _reader;
std::unique_ptr<char[]> _data = nullptr;
size_t _size;

View File

@ -60,7 +60,9 @@ public:
// called when consumer finished
Status close() override {
cancel("closed");
if (!(_finished || _cancelled)) {
cancel("closed");
}
return Status::OK();
}

View File

@ -753,4 +753,14 @@ Status CsvReader::_parse_col_types(size_t col_nums, std::vector<TypeDescriptor>*
return Status::OK();
}
void CsvReader::close() {
if (_line_reader) {
_line_reader->close();
}
if (_file_reader) {
_file_reader->close();
}
}
} // namespace doris::vectorized

View File

@ -80,6 +80,8 @@ public:
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
void close() override;
private:
// used for stream/broker load of csv file.
Status _create_decompressor();

View File

@ -67,6 +67,8 @@ public:
return Status::OK();
}
virtual void close() {}
protected:
const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding)

View File

@ -100,7 +100,7 @@ ParquetReader::ParquetReader(const TFileScanRangeParams& params, const TFileRang
}
ParquetReader::~ParquetReader() {
close();
_close_internal();
}
void ParquetReader::_init_profile() {
@ -162,6 +162,10 @@ void ParquetReader::_init_profile() {
}
void ParquetReader::close() {
_close_internal();
}
void ParquetReader::_close_internal() {
if (!_closed) {
if (_profile != nullptr) {
COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups);

View File

@ -119,7 +119,7 @@ public:
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
void close();
void close() override;
RowRange get_whole_range() { return _whole_range; }
@ -182,6 +182,7 @@ private:
Status _open_file();
void _init_profile();
void _close_internal();
Status _next_row_group_reader();
RowGroupReader::PositionDeleteContext _get_position_delete_ctx(
const tparquet::RowGroup& row_group,

View File

@ -558,6 +558,9 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
Status VFileScanner::_get_next_reader() {
while (true) {
if (_cur_reader) {
_cur_reader->close();
}
_cur_reader.reset(nullptr);
_src_block_init = false;
if (_next_range >= _ranges.size()) {
@ -936,6 +939,10 @@ Status VFileScanner::close(RuntimeState* state) {
cache_profile.update(_file_cache_statistics.get());
}
if (_cur_reader) {
_cur_reader->close();
}
RETURN_IF_ERROR(VScanner::close(state));
return Status::OK();
}