From 54f878b78106d2f484b134dfad706b65db43021f Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Mon, 25 Jul 2022 11:41:46 +0800 Subject: [PATCH] [feature-wip](multi-catalog) Support orc format file split for file scan node (#11046) --- be/src/exec/arrow/orc_reader.cpp | 56 ++++++++++++++++++++++++-- be/src/exec/arrow/orc_reader.h | 6 ++- be/src/vec/exec/file_arrow_scanner.cpp | 3 +- be/src/vec/exec/file_scan_node.cpp | 2 +- be/src/vec/exec/vorc_scanner.cpp | 3 +- 5 files changed, 62 insertions(+), 8 deletions(-) diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp index f94d24610c..78d51d8376 100644 --- a/be/src/exec/arrow/orc_reader.cpp +++ b/be/src/exec/arrow/orc_reader.cpp @@ -28,8 +28,11 @@ namespace doris { ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) - : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file) { + int32_t num_of_columns_from_file, int64_t range_start_offset, + int64_t range_size) + : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file), + _range_start_offset(range_start_offset), + _range_size(range_size) { _reader = nullptr; _cur_file_eof = false; } @@ -51,6 +54,8 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, if (_total_groups == 0) { return Status::EndOfFile("Empty Orc File"); } + // seek file position after _reader created. + RETURN_IF_ERROR(_seek_start_stripe()); // map arrow::Result> maybe_schema = _reader->ReadSchema(); @@ -61,8 +66,10 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, } std::shared_ptr schema = maybe_schema.ValueOrDie(); for (size_t i = 0; i < schema->num_fields(); ++i) { - _map_column.emplace(schema->field(i)->name(), i); + // orc index started from 1. + _map_column.emplace(schema->field(i)->name(), i + 1); } + RETURN_IF_ERROR(column_indices(tuple_slot_descs)); bool eof = false; RETURN_IF_ERROR(_next_stripe_reader(&eof)); @@ -70,7 +77,48 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, return Status::EndOfFile("end of file"); } - RETURN_IF_ERROR(column_indices(tuple_slot_descs)); + return Status::OK(); +} + +Status ORCReaderWrap::_seek_start_stripe() { + // If file was from Hms table, _range_start_offset is started from 3(magic word). + // And if file was from load, _range_start_offset is always set to zero. + // So now we only support file split for hms table. + // TODO: support file split for loading. + if (_range_size <= 0 || _range_start_offset == 0) { + return Status::OK(); + } + int64_t row_number = 0; + int start_group = _current_group; + int end_group = _total_groups; + for (int i = 0; i < _total_groups; i++) { + int64_t _offset = _reader->GetRawORCReader()->getStripe(i)->getOffset(); + int64_t row = _reader->GetRawORCReader()->getStripe(i)->getNumberOfRows(); + if (_offset < _range_start_offset) { + row_number += row; + } else if (_offset == _range_start_offset) { + // If using the external file scan, _range_start_offset is always in the offset lists. + // If using broker load, _range_start_offset is always set to be 0. + start_group = i; + } + if (_range_start_offset + _range_size <= _offset) { + end_group = i; + break; + } + } + + LOG(INFO) << "This reader read orc file from offset: " << _range_start_offset + << " with size: " << _range_size << ". Also mean that read from strip id from " + << start_group << " to " << end_group; + + if (!_reader->Seek(row_number).ok()) { + LOG(WARNING) << "Failed to seek to the line number: " << row_number; + return Status::InternalError("Failed to seek to the line number"); + } + + _current_group = start_group; + _total_groups = end_group; + return Status::OK(); } diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h index 3b46cc6073..fe8c5c54a4 100644 --- a/be/src/exec/arrow/orc_reader.h +++ b/be/src/exec/arrow/orc_reader.h @@ -32,7 +32,8 @@ namespace doris { // Reader of ORC file class ORCReaderWrap final : public ArrowReaderWrap { public: - ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file); + ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size); ~ORCReaderWrap() override = default; Status init_reader(const TupleDescriptor* tuple_desc, @@ -43,11 +44,14 @@ public: private: Status _next_stripe_reader(bool* eof); + Status _seek_start_stripe(); private: // orc file reader object std::unique_ptr _reader; bool _cur_file_eof; // is read over? + int64_t _range_start_offset; + int64_t _range_size; }; } // namespace doris diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index 702dae9a6e..30511b2392 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -251,7 +251,8 @@ ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { - return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file); + return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file, range_start_offset, + range_size); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index 87219f9326..c653c84a15 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -313,7 +313,7 @@ Status FileScanNode::get_next(RuntimeState* state, vectorized::Block* block, boo } else { if (_mutable_block->empty()) { // directly use scanner_block - *block = *scanner_block; + *block = std::move(*scanner_block); } else { // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next. *block = _mutable_block->to_block(); diff --git a/be/src/vec/exec/vorc_scanner.cpp b/be/src/vec/exec/vorc_scanner.cpp index 9f71d01cfb..81db28f2e6 100644 --- a/be/src/vec/exec/vorc_scanner.cpp +++ b/be/src/vec/exec/vorc_scanner.cpp @@ -32,7 +32,8 @@ VORCScanner::VORCScanner(RuntimeState* state, RuntimeProfile* profile, ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { - return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file); + return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file, range_start_offset, + range_size); } } // namespace doris::vectorized