diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 552b9b1a83..4f47ff5671 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -28,6 +28,161 @@ namespace doris { namespace io { +// there exists occasions where the buffer is already closed but +// some prior tasks are still queued in thread pool, so we have to check whether +// the buffer is closed each time the condition variable is notified. +void PrefetchBuffer::reset_offset(size_t offset) { + if (UNLIKELY(offset >= _end_offset)) { + return; + } + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::RESET; + _offset = offset; + _prefetched.notify_all(); + } + ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); +} + +// only this function would run concurrently in another thread +void PrefetchBuffer::prefetch_buffer() { + { + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::RESET || _buffer_status == BufferStatus::CLOSED; + }); + // in case buffer is already closed + if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { + _prefetched.notify_all(); + return; + } + _buffer_status = BufferStatus::PENDING; + _prefetched.notify_all(); + } + _len = 0; + Status s; + + size_t buf_size = _end_offset - _offset > _size ? _size : _end_offset - _offset; + s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len); + std::unique_lock lck {_lock}; + _prefetched.wait(lck, [this]() { return _buffer_status == BufferStatus::PENDING; }); + if (!s.ok() && _offset < _reader->size()) { + _prefetch_status = std::move(s); + } + _buffer_status = BufferStatus::PREFETCHED; + _prefetched.notify_all(); + // eof would come up with len == 0, it would be handled by read_buffer +} + +Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, + size_t* bytes_read) { + if (UNLIKELY(off >= _end_offset)) { + // Reader can read out of [_start_offset, _end_offset) by synchronous method. + return _reader->read_at(off, Slice {out, buf_len}, bytes_read); + } + { + std::unique_lock lck {_lock}; + // buffer must be prefetched or it's closed + _prefetched.wait(lck, [this]() { + return _buffer_status == BufferStatus::PREFETCHED || + _buffer_status == BufferStatus::CLOSED; + }); + if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) { + return Status::OK(); + } + } + RETURN_IF_ERROR(_prefetch_status); + // there is only parquet would do not sequence read + // it would read the end of the file first + if (UNLIKELY(!contains(off))) { + reset_offset((off / _size) * _size); + return read_buffer(off, out, buf_len, bytes_read); + } + if (UNLIKELY(0 == _len || _offset + _len < off)) { + return Status::OK(); + } + // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has + size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); + memcpy((void*)out, _buf.data() + (off - _offset), read_len); + *bytes_read = read_len; + if (off + *bytes_read == _offset + _len) { + reset_offset(_offset + _whole_buffer_size); + } + return Status::OK(); +} + +void PrefetchBuffer::close() { + std::unique_lock lck {_lock}; + // in case _reader still tries to write to the buf after we close the buffer + _prefetched.wait(lck, [this]() { return _buffer_status != BufferStatus::PENDING; }); + _buffer_status = BufferStatus::CLOSED; + _prefetched.notify_all(); +} + +// buffered reader +PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader, int64_t offset, + int64_t length, int64_t buffer_size) + : _reader(std::move(reader)), _start_offset(offset), _end_offset(offset + length) { + if (buffer_size == -1L) { + buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; + } + _size = _reader->size(); + _whole_pre_buffer_size = buffer_size; + int buffer_num = buffer_size > s_max_pre_buffer_size ? buffer_size / s_max_pre_buffer_size : 1; + // set the _cur_offset of this reader as same as the inner reader's, + // to make sure the buffer reader will start to read at right position. + for (int i = 0; i < buffer_num; i++) { + _pre_buffers.emplace_back( + std::make_shared(_start_offset, _end_offset, s_max_pre_buffer_size, + _whole_pre_buffer_size, _reader.get())); + } +} + +PrefetchBufferedReader::~PrefetchBufferedReader() { + close(); + _closed = true; +} + +Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) { + if (!_initialized) { + reset_all_buffer(offset); + _initialized = true; + } + if (UNLIKELY(result.get_size() == 0 || offset >= size())) { + *bytes_read = 0; + return Status::OK(); + } + size_t nbytes = result.get_size(); + int actual_bytes_read = 0; + while (actual_bytes_read < nbytes && offset < size()) { + size_t read_num = 0; + auto buffer_pos = get_buffer_pos(offset); + RETURN_IF_ERROR( + _pre_buffers[buffer_pos]->read_buffer(offset, result.get_data() + actual_bytes_read, + nbytes - actual_bytes_read, &read_num)); + actual_bytes_read += read_num; + offset += read_num; + } + *bytes_read = actual_bytes_read; + return Status::OK(); +} + +Status PrefetchBufferedReader::close() { + std::for_each(_pre_buffers.begin(), _pre_buffers.end(), + [](std::shared_ptr& buffer) { buffer->close(); }); + _reader->close(); + _closed = true; + + return Status::OK(); +} + BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, uint64_t length, size_t max_buf_size) : _file(file), diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 8b06721c1d..2b3b6054b9 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -19,8 +19,10 @@ #include +#include #include +#include "common/config.h" #include "common/status.h" #include "io/fs/file_reader.h" #include "olap/olap_define.h" @@ -29,6 +31,114 @@ namespace doris { namespace io { +class PrefetchBufferedReader; +struct PrefetchBuffer : std::enable_shared_from_this { + enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED }; + PrefetchBuffer() = default; + PrefetchBuffer(size_t start_offset, size_t end_offset, size_t buffer_size, + size_t whole_buffer_size, io::FileReader* reader) + : _start_offset(start_offset), + _end_offset(end_offset), + _size(buffer_size), + _whole_buffer_size(whole_buffer_size), + _reader(reader), + _buf(buffer_size, '0') {} + PrefetchBuffer(PrefetchBuffer&& other) + : _offset(other._offset), + _start_offset(other._start_offset), + _end_offset(other._end_offset), + _size(other._size), + _whole_buffer_size(other._whole_buffer_size), + _reader(other._reader), + _buf(std::move(other._buf)) {} + ~PrefetchBuffer() = default; + size_t _offset; + // [_start_offset, _end_offset) is the range that can be prefetched. + // Notice that the reader can read out of [_start_offset, _end_offset), because FE does not align the file + // according to the format when splitting it. + size_t _start_offset; + size_t _end_offset; + size_t _size; + size_t _len {0}; + size_t _whole_buffer_size; + io::FileReader* _reader; + std::string _buf; + BufferStatus _buffer_status {BufferStatus::RESET}; + std::mutex _lock; + std::condition_variable _prefetched; + Status _prefetch_status {Status::OK()}; + // @brief: reset the start offset of this buffer to offset + // @param: the new start offset for this buffer + void reset_offset(size_t offset); + // @brief: start to fetch the content between [_offset, _offset + _size) + void prefetch_buffer(); + // @brief: used by BufferedReader to read the prefetched data + // @param[off] read start address + // @param[buf] buffer to put the actual content + // @param[buf_len] maximum len trying to read + // @param[bytes_read] actual bytes read + Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read); + // @brief: shut down the buffer until the prior prefetching task is done + void close(); + // @brief: to detect whether this buffer contains off + // @param[off] detect offset + bool inline contains(size_t off) const { return _offset <= off && off < _offset + _size; } +}; + +/** + * A buffered reader that prefetch data in the daemon thread pool. + * The data is prefetched sequentially until the underlying buffers(4 * 4M as default) are full. + * When a buffer is read out, it will fetch data backward in daemon, so the underlying reader should be + * thread-safe, and the access mode of data needs to be sequential. + * Therefore, PrefetchBufferedReader now only support csv&json format when reading s3&broker file. + */ +class PrefetchBufferedReader : public io::FileReader { +public: + PrefetchBufferedReader(io::FileReaderSPtr reader, int64_t offset, int64_t length, + int64_t buffer_size = -1L); + ~PrefetchBufferedReader() override; + + Status close() override; + + const io::Path& path() const override { return _reader->path(); } + + size_t size() const override { return _size; } + + bool closed() const override { return _closed; } + + std::shared_ptr fs() const override { return _reader->fs(); } + +protected: + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) override; + +private: + size_t get_buffer_pos(int64_t position) const { + return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size; + } + size_t get_buffer_offset(int64_t position) const { + return (position / s_max_pre_buffer_size) * s_max_pre_buffer_size; + } + void reset_all_buffer(size_t position) { + for (int64_t i = 0; i < _pre_buffers.size(); i++) { + int64_t cur_pos = position + i * s_max_pre_buffer_size; + int cur_buf_pos = get_buffer_pos(cur_pos); + // reset would do all the prefetch work + _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos)); + } + } + + io::FileReaderSPtr _reader; + int64_t _start_offset; + int64_t _end_offset; + int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB + std::vector> _pre_buffers; + int64_t _whole_pre_buffer_size; + bool _initialized = false; + bool _closed = false; + size_t _size; +}; + /** * Load all the needed data in underlying buffer, so the caller does not need to prepare the data container. */ diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4ce64ff153..5483f5496c 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -125,6 +125,9 @@ public: MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); } ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); } + ThreadPool* buffered_reader_prefetch_thread_pool() { + return _buffered_reader_prefetch_thread_pool.get(); + } ThreadPool* send_report_thread_pool() { return _send_report_thread_pool.get(); } ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); } @@ -215,6 +218,8 @@ private: // Threadpool used to download cache from remote storage std::unique_ptr _download_cache_thread_pool; + // Threadpool used to prefetch remote file for buffered reader + std::unique_ptr _buffered_reader_prefetch_thread_pool; // A token used to submit download cache task serially std::unique_ptr _serial_download_cache_thread_token; // Pool used by fragment manager to send profile or status to FE coordinator diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 4287ed8034..b7066bbdbe 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -100,6 +100,11 @@ Status ExecEnv::_init(const std::vector& store_paths) { init_download_cache_required_components(); + ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") + .set_min_threads(16) + .set_max_threads(64) + .build(&_buffered_reader_prefetch_thread_pool); + // min num equal to fragment pool's min num // max num is useless because it will start as many as requested in the past // queue size is useless because the max thread num is very large diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 65d1fb355b..f11a5c2273 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -26,6 +26,9 @@ #include "exec/text_converter.h" #include "exec/text_converter.hpp" #include "io/file_factory.h" +#include "io/fs/broker_file_reader.h" +#include "io/fs/buffered_reader.h" +#include "io/fs/s3_file_reader.h" #include "olap/iterators.h" #include "olap/olap_common.h" #include "util/string_util.h" @@ -135,11 +138,20 @@ Status CsvReader::init_reader(bool is_load) { _file_description.start_offset = start_offset; + io::FileReaderSPtr csv_file_reader; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &csv_file_reader)); } else { RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &_file_reader)); + _profile, _system_properties, _file_description, &_file_system, &csv_file_reader)); + } + if (typeid_cast(csv_file_reader.get()) != nullptr || + typeid_cast(csv_file_reader.get()) != nullptr) { + // PrefetchBufferedReader now only support csv&json format when reading s3&broker file + _file_reader.reset( + new io::PrefetchBufferedReader(csv_file_reader, _range.start_offset, _range.size)); + } else { + _file_reader = std::move(csv_file_reader); } if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index d34b9ee5c8..3984e68f33 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -27,7 +27,8 @@ // larger than 300B for correct lzo header decompressing #define INPUT_CHUNK (2 * 1024 * 1024) // #define INPUT_CHUNK (34) -#define OUTPUT_CHUNK (8 * 1024 * 1024) +// align with prefetch buffer size +#define OUTPUT_CHUNK (4 * 1024 * 1024) // #define OUTPUT_CHUNK (32) // leave these 2 size small for debugging diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index b28a97cfdf..6ae0c74e85 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -20,6 +20,9 @@ #include "common/compiler_util.h" #include "exprs/json_functions.h" #include "io/file_factory.h" +#include "io/fs/broker_file_reader.h" +#include "io/fs/buffered_reader.h" +#include "io/fs/s3_file_reader.h" #include "io/fs/stream_load_pipe.h" #include "olap/iterators.h" #include "runtime/descriptors.h" @@ -332,11 +335,20 @@ Status NewJsonReader::_open_file_reader() { _current_offset = start_offset; _file_description.start_offset = start_offset; + io::FileReaderSPtr json_file_reader; if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &json_file_reader)); } else { RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &_file_reader)); + _profile, _system_properties, _file_description, &_file_system, &json_file_reader)); + } + if (typeid_cast(json_file_reader.get()) != nullptr || + typeid_cast(json_file_reader.get()) != nullptr) { + // PrefetchBufferedReader now only support csv&json format when reading s3&broker file + _file_reader.reset( + new io::PrefetchBufferedReader(json_file_reader, _range.start_offset, _range.size)); + } else { + _file_reader = std::move(json_file_reader); } return Status::OK(); } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1960f8a97c..f8aa2c310e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1742,8 +1742,11 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = false) public static long file_scan_node_split_num = 128; + // 0 means use the block size in HDFS/S3 as split size. + // HDFS block size is 128MB, while S3 block size is 32MB. + // 32MB is too small for a S3 file split, so set 128MB as default split size. @ConfField(mutable = true, masterOnly = false) - public static long file_split_size = 0; // 0 means use the block size in HDFS/S3 as split size + public static long file_split_size = 134217728; /** * If set to TRUE, FE will: