[Opt](orc-reader) Support merge small IO facility in orc reader. (#20092)

#18976 introduced merge small IO facility to optimize performance, and used by parquet reader. 
This PR support this facility in orc reader.  Current ORC reader implementation need to reposition parent present stream when reading lazy columns in lazy materialization facility. So let it works by removing `DCHECK_GE(offset, cached_data.end_offset)`.
This commit is contained in:
Qi Chen
2023-05-26 21:06:12 +08:00
committed by GitHub
parent 346c51faa2
commit cb4a57f44f
4 changed files with 35 additions and 8 deletions

View File

@ -74,8 +74,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
return Status::OK();
}
} else if (!cached_data.empty()) {
// the data in range may be skipped
DCHECK_GE(offset, cached_data.end_offset);
// the data in range may be skipped or ignored
for (int16 box_index : cached_data.ref_box) {
_dec_box_ref(box_index);
}

View File

@ -222,8 +222,8 @@ Status OrcReader::_create_file_reader() {
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, &_file_system, &inner_reader,
io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx));
_file_input_stream.reset(
new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics, _io_ctx));
_file_input_stream.reset(new ORCFileInputStream(_scan_range.path, inner_reader,
&_statistics, _io_ctx, _profile));
}
if (_file_input_stream->getLength() == 0) {
return Status::EndOfFile("empty orc file: " + _scan_range.path);
@ -1322,4 +1322,26 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
return Status::OK();
}
} // namespace doris::vectorized
void ORCFileInputStream::beforeReadStripe(
std::unique_ptr<orc::StripeInformation> current_strip_information,
std::vector<bool> selected_columns) {
// Generate prefetch ranges, build stripe file reader.
uint64_t offset = current_strip_information->getOffset();
std::vector<io::PrefetchRange> prefetch_ranges;
for (uint64_t stream_id = 0; stream_id < current_strip_information->getNumberOfStreams();
++stream_id) {
std::unique_ptr<orc::StreamInformation> stream =
current_strip_information->getStreamInformation(stream_id);
uint32_t columnId = stream->getColumnId();
uint64_t length = stream->getLength();
if (selected_columns[columnId]) {
doris::io::PrefetchRange prefetch_range = {offset, offset + length};
prefetch_ranges.emplace_back(std::move(prefetch_range));
}
offset += length;
}
// The underlying page reader will prefetch data in column.
_file_reader.reset(new io::MergeRangeFileReader(_profile, _file_reader, prefetch_ranges));
}
} // namespace doris::vectorized

View File

@ -466,11 +466,13 @@ private:
class ORCFileInputStream : public orc::InputStream {
public:
ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr file_reader,
OrcReader::Statistics* statistics, const io::IOContext* io_ctx)
OrcReader::Statistics* statistics, const io::IOContext* io_ctx,
RuntimeProfile* profile)
: _file_name(file_name),
_file_reader(file_reader),
_statistics(statistics),
_io_ctx(io_ctx) {}
_io_ctx(io_ctx),
_profile(profile) {}
~ORCFileInputStream() override = default;
@ -482,12 +484,16 @@ public:
const std::string& getName() const override { return _file_name; }
void beforeReadStripe(std::unique_ptr<orc::StripeInformation> current_strip_information,
std::vector<bool> selected_columns) override;
private:
const std::string& _file_name;
io::FileReaderSPtr _file_reader;
// Owned by OrcReader
OrcReader::Statistics* _statistics;
const io::IOContext* _io_ctx;
RuntimeProfile* _profile;
};
} // namespace doris::vectorized