From 66bfd18601d7e8ca9ceb3a852f9bffde09a702b3 Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Tue, 4 Apr 2023 19:05:22 +0800 Subject: [PATCH] [opt](file_reader) add prefetch buffer to read csv&json file (#18301) Co-authored-by: ByteYue <[yj976240184@gmail.com](mailto:yj976240184@gmail.com)> This PR is an optimization for https://github.com/apache/doris/pull/17478: 1. Change the buffer size of `LineReader` to 4MB to align with the size of prefetch buffer. 2. Lazily prefetch data in the first read to prevent wasted reading. 3. S3 block size is 32MB only, which is too small for a file split. Set 128MB as default file split size. 4. Add `_end_offset` for prefetch buffer to prevent wasted reading. The query performance of reading data on object storage is improved by more than 3x+. --- be/src/io/fs/buffered_reader.cpp | 155 ++++++++++++++++++ be/src/io/fs/buffered_reader.h | 110 +++++++++++++ be/src/runtime/exec_env.h | 5 + be/src/runtime/exec_env_init.cpp | 5 + be/src/vec/exec/format/csv/csv_reader.cpp | 16 +- .../new_plain_text_line_reader.cpp | 3 +- .../vec/exec/format/json/new_json_reader.cpp | 16 +- .../java/org/apache/doris/common/Config.java | 5 +- 8 files changed, 309 insertions(+), 6 deletions(-) diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 552b9b1a83..4f47ff5671 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -28,6 +28,161 @@ namespace doris { namespace io { +// there exists occasions where the buffer is already closed but +// some prior tasks are still queued in thread pool, so we have to check whether +// the buffer is closed each time the condition variable is notified. +void PrefetchBuffer::reset_offset(size_t offset) { + if (UNLIKELY(offset >= _end_offset)) { + return; + } + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::RESET; + _offset = offset; + _prefetched.notify_all(); + } + ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); +} + +// only this function would run concurrently in another thread +void PrefetchBuffer::prefetch_buffer() { + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::RESET || _buffer_status == BufferStatus::CLOSED; + }); + // in case buffer is already closed + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::PENDING; + _prefetched.notify_all(); + } + _len = 0; + Status s; + + size_t buf_size = _end_offset - _offset > _size ? _size : _end_offset - _offset; + s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len); + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status == BufferStatus::PENDING; }); + if (!s.ok() && _offset < _reader->size()) { + _prefetch_status = std::move(s); + } + _buffer_status = BufferStatus::PREFETCHED; + _prefetched.notify_all(); + // eof would come up with len == 0, it would be handled by read_buffer +} + +Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, + size_t* bytes_read) { + if (UNLIKELY(off >= _end_offset)) { + // Reader can read out of [_start_offset, _end_offset) by synchronous method. + return _reader->read_at(off, Slice {out, buf_len}, bytes_read); + } + { + std::unique_lock lck {_lock}; + // buffer must be prefetched or it's closed + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::PREFETCHED || + _buffer_status == BufferStatus::CLOSED; + }); + if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) { + return Status::OK(); + } + } + RETURN_IF_ERROR(_prefetch_status); + // there is only parquet would do not sequence read + // it would read the end of the file first + if (UNLIKELY(!contains(off))) { + reset_offset((off / _size) * _size); + return read_buffer(off, out, buf_len, bytes_read); + } + if (UNLIKELY(0 == _len || _offset + _len < off)) { + return Status::OK(); + } + // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has + size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); + memcpy((void*)out, _buf.data() + (off - _offset), read_len); + *bytes_read = read_len; + if (off + *bytes_read == _offset + _len) { + reset_offset(_offset + _whole_buffer_size); + } + return Status::OK(); +} + +void PrefetchBuffer::close() { + std::unique_lock lck {_lock}; + // in case _reader still tries to write to the buf after we close the buffer + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + _buffer_status = BufferStatus::CLOSED; + _prefetched.notify_all(); +} + +// buffered reader +PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader, int64_t offset, + int64_t length, int64_t buffer_size) + : _reader(std::move(reader)), _start_offset(offset), _end_offset(offset + length) { + if (buffer_size == -1L) { + buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; + } + _size = _reader->size(); + _whole_pre_buffer_size = buffer_size; + int buffer_num = buffer_size > s_max_pre_buffer_size ? buffer_size / s_max_pre_buffer_size : 1; + // set the _cur_offset of this reader as same as the inner reader's, + // to make sure the buffer reader will start to read at right position. + for (int i = 0; i < buffer_num; i++) { + _pre_buffers.emplace_back( + std::make_shared(_start_offset, _end_offset, s_max_pre_buffer_size, + _whole_pre_buffer_size, _reader.get())); + } +} + +PrefetchBufferedReader::~PrefetchBufferedReader() { + close(); + _closed = true; +} + +Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) { + if (!_initialized) { + reset_all_buffer(offset); + _initialized = true; + } + if (UNLIKELY(result.get_size() == 0 || offset >= size())) { + *bytes_read = 0; + return Status::OK(); + } + size_t nbytes = result.get_size(); + int actual_bytes_read = 0; + while (actual_bytes_read < nbytes && offset < size()) { + size_t read_num = 0; + auto buffer_pos = get_buffer_pos(offset); + RETURN_IF_ERROR( + _pre_buffers[buffer_pos]->read_buffer(offset, result.get_data() + actual_bytes_read, + nbytes - actual_bytes_read, &read_num)); + actual_bytes_read += read_num; + offset += read_num; + } + *bytes_read = actual_bytes_read; + return Status::OK(); +} + +Status PrefetchBufferedReader::close() { + std::for_each(_pre_buffers.begin(), _pre_buffers.end(), + [](std::shared_ptr& buffer) { buffer->close(); }); + _reader->close(); + _closed = true; + + return Status::OK(); +} + BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, uint64_t length, size_t max_buf_size) : _file(file), diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 8b06721c1d..2b3b6054b9 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -19,8 +19,10 @@ #include +#include #include +#include "common/config.h" #include "common/status.h" #include "io/fs/file_reader.h" #include "olap/olap_define.h" @@ -29,6 +31,114 @@ namespace doris { namespace io { +class PrefetchBufferedReader; +struct PrefetchBuffer : std::enable_shared_from_this { + enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED }; + PrefetchBuffer() = default; + PrefetchBuffer(size_t start_offset, size_t end_offset, size_t buffer_size, + size_t whole_buffer_size, io::FileReader* reader) + : _start_offset(start_offset), + _end_offset(end_offset), + _size(buffer_size), + _whole_buffer_size(whole_buffer_size), + _reader(reader), + _buf(buffer_size, '0') {} + PrefetchBuffer(PrefetchBuffer&& other) + : _offset(other._offset), + _start_offset(other._start_offset), + _end_offset(other._end_offset), + _size(other._size), + _whole_buffer_size(other._whole_buffer_size), + _reader(other._reader), + _buf(std::move(other._buf)) {} + ~PrefetchBuffer() = default; + size_t _offset; + // [_start_offset, _end_offset) is the range that can be prefetched. + // Notice that the reader can read out of [_start_offset, _end_offset), because FE does not align the file + // according to the format when splitting it. + size_t _start_offset; + size_t _end_offset; + size_t _size; + size_t _len {0}; + size_t _whole_buffer_size; + io::FileReader* _reader; + std::string _buf; + BufferStatus _buffer_status {BufferStatus::RESET}; + std::mutex _lock; + std::condition_variable _prefetched; + Status _prefetch_status {Status::OK()}; + // @brief: reset the start offset of this buffer to offset + // @param: the new start offset for this buffer + void reset_offset(size_t offset); + // @brief: start to fetch the content between [_offset, _offset + _size) + void prefetch_buffer(); + // @brief: used by BufferedReader to read the prefetched data + // @param[off] read start address + // @param[buf] buffer to put the actual content + // @param[buf_len] maximum len trying to read + // @param[bytes_read] actual bytes read + Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read); + // @brief: shut down the buffer until the prior prefetching task is done + void close(); + // @brief: to detect whether this buffer contains off + // @param[off] detect offset + bool inline contains(size_t off) const { return _offset <= off && off < _offset + _size; } +}; + +/** + * A buffered reader that prefetch data in the daemon thread pool. + * The data is prefetched sequentially until the underlying buffers(4 * 4M as default) are full. + * When a buffer is read out, it will fetch data backward in daemon, so the underlying reader should be + * thread-safe, and the access mode of data needs to be sequential. + * Therefore, PrefetchBufferedReader now only support csv&json format when reading s3&broker file. + */ +class PrefetchBufferedReader : public io::FileReader { +public: + PrefetchBufferedReader(io::FileReaderSPtr reader, int64_t offset, int64_t length, + int64_t buffer_size = -1L); + ~PrefetchBufferedReader() override; + + Status close() override; + + const io::Path& path() const override { return _reader->path(); } + + size_t size() const override { return _size; } + + bool closed() const override { return _closed; } + + std::shared_ptr fs() const override { return _reader->fs(); } + +protected: + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) override; + +private: + size_t get_buffer_pos(int64_t position) const { + return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size; + } + size_t get_buffer_offset(int64_t position) const { + return (position / s_max_pre_buffer_size) * s_max_pre_buffer_size; + } + void reset_all_buffer(size_t position) { + for (int64_t i = 0; i < _pre_buffers.size(); i++) { + int64_t cur_pos = position + i * s_max_pre_buffer_size; + int cur_buf_pos = get_buffer_pos(cur_pos); + // reset would do all the prefetch work + _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos)); + } + } + + io::FileReaderSPtr _reader; + int64_t _start_offset; + int64_t _end_offset; + int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB + std::vector> _pre_buffers; + int64_t _whole_pre_buffer_size; + bool _initialized = false; + bool _closed = false; + size_t _size; +}; + /** * Load all the needed data in underlying buffer, so the caller does not need to prepare the data container. */ diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4ce64ff153..5483f5496c 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -125,6 +125,9 @@ public: MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); } ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); } + ThreadPool* buffered_reader_prefetch_thread_pool() { + return _buffered_reader_prefetch_thread_pool.get(); + } ThreadPool* send_report_thread_pool() { return _send_report_thread_pool.get(); } ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); } @@ -215,6 +218,8 @@ private: // Threadpool used to download cache from remote storage std::unique_ptr _download_cache_thread_pool; + // Threadpool used to prefetch remote file for buffered reader + std::unique_ptr _buffered_reader_prefetch_thread_pool; // A token used to submit download cache task serially std::unique_ptr _serial_download_cache_thread_token; // Pool used by fragment manager to send profile or status to FE coordinator diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 4287ed8034..b7066bbdbe 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -100,6 +100,11 @@ Status ExecEnv::_init(const std::vector& store_paths) { init_download_cache_required_components(); + ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") + .set_min_threads(16) + .set_max_threads(64) + .build(&_buffered_reader_prefetch_thread_pool); + // min num equal to fragment pool's min num // max num is useless because it will start as many as requested in the past // queue size is useless because the max thread num is very large diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 65d1fb355b..f11a5c2273 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -26,6 +26,9 @@ #include "exec/text_converter.h" #include "exec/text_converter.hpp" #include "io/file_factory.h" +#include "io/fs/broker_file_reader.h" +#include "io/fs/buffered_reader.h" +#include "io/fs/s3_file_reader.h" #include "olap/iterators.h" #include "olap/olap_common.h" #include "util/string_util.h" @@ -135,11 +138,20 @@ Status CsvReader::init_reader(bool is_load) { _file_description.start_offset = start_offset; + io::FileReaderSPtr csv_file_reader; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &csv_file_reader)); } else { RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &_file_reader)); + _profile, _system_properties, _file_description, &_file_system, &csv_file_reader)); + } + if (typeid_cast(csv_file_reader.get()) != nullptr || + typeid_cast(csv_file_reader.get()) != nullptr) { + // PrefetchBufferedReader now only support csv&json format when reading s3&broker file + _file_reader.reset( + new io::PrefetchBufferedReader(csv_file_reader, _range.start_offset, _range.size)); + } else { + _file_reader = std::move(csv_file_reader); } if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index d34b9ee5c8..3984e68f33 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -27,7 +27,8 @@ // larger than 300B for correct lzo header decompressing #define INPUT_CHUNK (2 * 1024 * 1024) // #define INPUT_CHUNK (34) -#define OUTPUT_CHUNK (8 * 1024 * 1024) +// align with prefetch buffer size +#define OUTPUT_CHUNK (4 * 1024 * 1024) // #define OUTPUT_CHUNK (32) // leave these 2 size small for debugging diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index b28a97cfdf..6ae0c74e85 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -20,6 +20,9 @@ #include "common/compiler_util.h" #include "exprs/json_functions.h" #include "io/file_factory.h" +#include "io/fs/broker_file_reader.h" +#include "io/fs/buffered_reader.h" +#include "io/fs/s3_file_reader.h" #include "io/fs/stream_load_pipe.h" #include "olap/iterators.h" #include "runtime/descriptors.h" @@ -332,11 +335,20 @@ Status NewJsonReader::_open_file_reader() { _current_offset = start_offset; _file_description.start_offset = start_offset; + io::FileReaderSPtr json_file_reader; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &json_file_reader)); } else { RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &_file_reader)); + _profile, _system_properties, _file_description, &_file_system, &json_file_reader)); + } + if (typeid_cast(json_file_reader.get()) != nullptr || + typeid_cast(json_file_reader.get()) != nullptr) { + // PrefetchBufferedReader now only support csv&json format when reading s3&broker file + _file_reader.reset( + new io::PrefetchBufferedReader(json_file_reader, _range.start_offset, _range.size)); + } else { + _file_reader = std::move(json_file_reader); } return Status::OK(); } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1960f8a97c..f8aa2c310e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1742,8 +1742,11 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = false) public static long file_scan_node_split_num = 128; + // 0 means use the block size in HDFS/S3 as split size. + // HDFS block size is 128MB, while S3 block size is 32MB. + // 32MB is too small for a S3 file split, so set 128MB as default split size. @ConfField(mutable = true, masterOnly = false) - public static long file_split_size = 0; // 0 means use the block size in HDFS/S3 as split size + public static long file_split_size = 134217728; /** * If set to TRUE, FE will: