From cb4a57f44fdfd3d78657792721422a30ba834ca1 Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Fri, 26 May 2023 21:06:12 +0800 Subject: [PATCH] [Opt](orc-reader) Support merge small IO facility in orc reader. (#20092) #18976 introduced merge small IO facility to optimize performance, and used by parquet reader. This PR support this facility in orc reader. Current ORC reader implementation need to reposition parent present stream when reading lazy columns in lazy materialization facility. So let it works by removing `DCHECK_GE(offset, cached_data.end_offset)`. --- be/src/apache-orc | 2 +- be/src/io/fs/buffered_reader.cpp | 3 +-- be/src/vec/exec/format/orc/vorc_reader.cpp | 28 +++++++++++++++++++--- be/src/vec/exec/format/orc/vorc_reader.h | 10 ++++++-- 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/be/src/apache-orc b/be/src/apache-orc index 0e53506146..380df03331 160000 --- a/be/src/apache-orc +++ b/be/src/apache-orc @@ -1 +1 @@ -Subproject commit 0e53506146c965a5a71f0582691ab2ea148dae7c +Subproject commit 380df03331c12fa4095dd2613eb5f08ad541eb3e diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 847489f5fc..1dec7d0073 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -74,8 +74,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b return Status::OK(); } } else if (!cached_data.empty()) { - // the data in range may be skipped - DCHECK_GE(offset, cached_data.end_offset); + // the data in range may be skipped or ignored for (int16 box_index : cached_data.ref_box) { _dec_box_ref(box_index); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 990dfc0477..f6f9d136e2 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -222,8 +222,8 @@ Status OrcReader::_create_file_reader() { RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &inner_reader, io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx)); - _file_input_stream.reset( - new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics, _io_ctx)); + _file_input_stream.reset(new ORCFileInputStream(_scan_range.path, inner_reader, + &_statistics, _io_ctx, _profile)); } if (_file_input_stream->getLength() == 0) { return Status::EndOfFile("empty orc file: " + _scan_range.path); @@ -1322,4 +1322,26 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s return Status::OK(); } -} // namespace doris::vectorized \ No newline at end of file +void ORCFileInputStream::beforeReadStripe( + std::unique_ptr current_strip_information, + std::vector selected_columns) { + // Generate prefetch ranges, build stripe file reader. + uint64_t offset = current_strip_information->getOffset(); + std::vector prefetch_ranges; + for (uint64_t stream_id = 0; stream_id < current_strip_information->getNumberOfStreams(); + ++stream_id) { + std::unique_ptr stream = + current_strip_information->getStreamInformation(stream_id); + uint32_t columnId = stream->getColumnId(); + uint64_t length = stream->getLength(); + if (selected_columns[columnId]) { + doris::io::PrefetchRange prefetch_range = {offset, offset + length}; + prefetch_ranges.emplace_back(std::move(prefetch_range)); + } + offset += length; + } + // The underlying page reader will prefetch data in column. + _file_reader.reset(new io::MergeRangeFileReader(_profile, _file_reader, prefetch_ranges)); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 659bc61595..4af091981e 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -466,11 +466,13 @@ private: class ORCFileInputStream : public orc::InputStream { public: ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr file_reader, - OrcReader::Statistics* statistics, const io::IOContext* io_ctx) + OrcReader::Statistics* statistics, const io::IOContext* io_ctx, + RuntimeProfile* profile) : _file_name(file_name), _file_reader(file_reader), _statistics(statistics), - _io_ctx(io_ctx) {} + _io_ctx(io_ctx), + _profile(profile) {} ~ORCFileInputStream() override = default; @@ -482,12 +484,16 @@ public: const std::string& getName() const override { return _file_name; } + void beforeReadStripe(std::unique_ptr current_strip_information, + std::vector selected_columns) override; + private: const std::string& _file_name; io::FileReaderSPtr _file_reader; // Owned by OrcReader OrcReader::Statistics* _statistics; const io::IOContext* _io_ctx; + RuntimeProfile* _profile; }; } // namespace doris::vectorized