[fix](FileReader) broker reader is not thread-safe and can't be prefetched (#19321)
Fix errors when using brokers to load csv/json files: 5# doris::ClientCacheHelper::reopen_client(std::function<doris::ThriftClientImpl* (doris::TNetworkAddress const&, void**)>&, void**, int) [clone .cold] at /root/doris/be/src/runtime/client_cache.cpp:84 6# doris::io::BrokerFileReader::read_at_impl(unsigned long, doris::Slice, unsigned long*, doris::io::IOContext const*) [clone .cold] at /root/doris/be/src/io/fs/broker_file_reader.cpp:104 7# doris::io::FileReader::read_at(unsigned long, doris::Slice, unsigned long*, doris::io::IOContext const*) at /root/doris/be/src/io/fs/file_reader.cpp:31 8# doris::io::PrefetchBuffer::prefetch_buffer() at /root/doris/be/src/io/fs/buffered_reader.cpp:71
This commit is contained in:
@ -726,9 +726,22 @@ Status DelegateReader::create_file_reader(RuntimeProfile* profile,
|
||||
if (reader->size() < IN_MEMORY_FILE_SIZE) {
|
||||
*file_reader = std::make_shared<InMemoryFileReader>(reader);
|
||||
} else if (access_mode == AccessMode::SEQUENTIAL) {
|
||||
io::FileReaderSPtr safeReader = std::make_shared<ThreadSafeReader>(reader);
|
||||
*file_reader = std::make_shared<io::PrefetchBufferedReader>(profile, safeReader, file_range,
|
||||
io_ctx);
|
||||
bool is_thread_safe = false;
|
||||
if (typeid_cast<io::S3FileReader*>(reader.get())) {
|
||||
is_thread_safe = true;
|
||||
} else if (io::CachedRemoteFileReader* cached_reader =
|
||||
typeid_cast<io::CachedRemoteFileReader*>(reader.get())) {
|
||||
if (typeid_cast<io::S3FileReader*>(cached_reader->get_remote_reader())) {
|
||||
is_thread_safe = true;
|
||||
}
|
||||
}
|
||||
if (is_thread_safe) {
|
||||
// PrefetchBufferedReader needs thread-safe reader to prefetch data concurrently.
|
||||
*file_reader = std::make_shared<io::PrefetchBufferedReader>(profile, reader, file_range,
|
||||
io_ctx);
|
||||
} else {
|
||||
*file_reader = std::move(reader);
|
||||
}
|
||||
} else {
|
||||
*file_reader = std::move(reader);
|
||||
}
|
||||
|
||||
@ -243,59 +243,6 @@ private:
|
||||
*/
|
||||
class DelegateReader {
|
||||
public:
|
||||
class ThreadSafeReader : public io::FileReader {
|
||||
public:
|
||||
ThreadSafeReader(io::FileReaderSPtr reader) : _reader(std::move(reader)) {
|
||||
_size = _reader->size();
|
||||
if (typeid_cast<io::S3FileReader*>(_reader.get()) ||
|
||||
typeid_cast<io::BrokerFileReader*>(_reader.get())) {
|
||||
_is_thread_safe = true;
|
||||
} else if (io::CachedRemoteFileReader* cached_reader =
|
||||
typeid_cast<io::CachedRemoteFileReader*>(_reader.get())) {
|
||||
if (typeid_cast<io::S3FileReader*>(cached_reader->get_remote_reader()) ||
|
||||
typeid_cast<io::BrokerFileReader*>(cached_reader->get_remote_reader())) {
|
||||
_is_thread_safe = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
~ThreadSafeReader() override { close(); }
|
||||
|
||||
Status close() override {
|
||||
if (!_closed) {
|
||||
_closed = true;
|
||||
return _reader->close();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
const io::Path& path() const override { return _reader->path(); }
|
||||
|
||||
size_t size() const override { return _size; }
|
||||
|
||||
bool closed() const override { return _closed; }
|
||||
|
||||
std::shared_ptr<io::FileSystem> fs() const override { return _reader->fs(); }
|
||||
|
||||
protected:
|
||||
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
|
||||
const IOContext* io_ctx) override {
|
||||
if (_is_thread_safe) {
|
||||
return _reader->read_at(offset, result, bytes_read, io_ctx);
|
||||
} else {
|
||||
std::lock_guard<std::mutex> lock(_lock);
|
||||
return _reader->read_at(offset, result, bytes_read, io_ctx);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
io::FileReaderSPtr _reader;
|
||||
size_t _size;
|
||||
bool _is_thread_safe = false;
|
||||
bool _closed = false;
|
||||
std::mutex _lock;
|
||||
};
|
||||
|
||||
enum AccessMode { SEQUENTIAL, RANDOM };
|
||||
|
||||
static constexpr size_t IN_MEMORY_FILE_SIZE = 8 * 1024 * 1024;
|
||||
|
||||
Reference in New Issue
Block a user