diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 5b2c036dca..847489f5fc 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -726,9 +726,22 @@ Status DelegateReader::create_file_reader(RuntimeProfile* profile, if (reader->size() < IN_MEMORY_FILE_SIZE) { *file_reader = std::make_shared(reader); } else if (access_mode == AccessMode::SEQUENTIAL) { - io::FileReaderSPtr safeReader = std::make_shared(reader); - *file_reader = std::make_shared(profile, safeReader, file_range, - io_ctx); + bool is_thread_safe = false; + if (typeid_cast(reader.get())) { + is_thread_safe = true; + } else if (io::CachedRemoteFileReader* cached_reader = + typeid_cast(reader.get())) { + if (typeid_cast(cached_reader->get_remote_reader())) { + is_thread_safe = true; + } + } + if (is_thread_safe) { + // PrefetchBufferedReader needs thread-safe reader to prefetch data concurrently. + *file_reader = std::make_shared(profile, reader, file_range, + io_ctx); + } else { + *file_reader = std::move(reader); + } } else { *file_reader = std::move(reader); } diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 990b0cb7a0..d0fb79a2d7 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -243,59 +243,6 @@ private: */ class DelegateReader { public: - class ThreadSafeReader : public io::FileReader { - public: - ThreadSafeReader(io::FileReaderSPtr reader) : _reader(std::move(reader)) { - _size = _reader->size(); - if (typeid_cast(_reader.get()) || - typeid_cast(_reader.get())) { - _is_thread_safe = true; - } else if (io::CachedRemoteFileReader* cached_reader = - typeid_cast(_reader.get())) { - if (typeid_cast(cached_reader->get_remote_reader()) || - typeid_cast(cached_reader->get_remote_reader())) { - _is_thread_safe = true; - } - } - } - - ~ThreadSafeReader() override { close(); } - - Status close() override { - if (!_closed) { - _closed = true; - return _reader->close(); - } - return Status::OK(); - } - - const io::Path& path() const override { return _reader->path(); } - - size_t size() const override { return _size; } - - bool closed() const override { return _closed; } - - std::shared_ptr fs() const override { return _reader->fs(); } - - protected: - Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* io_ctx) override { - if (_is_thread_safe) { - return _reader->read_at(offset, result, bytes_read, io_ctx); - } else { - std::lock_guard lock(_lock); - return _reader->read_at(offset, result, bytes_read, io_ctx); - } - } - - private: - io::FileReaderSPtr _reader; - size_t _size; - bool _is_thread_safe = false; - bool _closed = false; - std::mutex _lock; - }; - enum AccessMode { SEQUENTIAL, RANDOM }; static constexpr size_t IN_MEMORY_FILE_SIZE = 8 * 1024 * 1024;