[feature-wip][multi-catalog]Support prefetch for orc file format (#11292)

Refactor the prefetch code in parquet and support prefetch for orc file format
This commit is contained in:
huangzhaowei
2022-08-02 11:01:15 +08:00
committed by GitHub
parent bd6e3cf132
commit 0ac5228c05
6 changed files with 120 additions and 109 deletions

View File

@ -48,6 +48,11 @@ ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
ArrowReaderWrap::~ArrowReaderWrap() {
close();
_closed = true;
_queue_writer_cond.notify_one();
if (_thread.joinable()) {
_thread.join();
}
}
void ArrowReaderWrap::close() {
@ -76,6 +81,62 @@ Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple
return Status::OK();
}
Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.empty()) {
if (_batch_eof) {
_include_column_ids.clear();
*eof = true;
_batch_eof = false;
return Status::OK();
}
_queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
}
if (UNLIKELY(_closed)) {
return Status::InternalError(_status.message());
}
*batch = _queue.front();
_queue.pop_front();
_queue_writer_cond.notify_one();
return Status::OK();
}
void ArrowReaderWrap::prefetch_batch() {
auto insert_batch = [this](const auto& batch) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.size() == _max_queue_size) {
_queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
}
if (UNLIKELY(_closed)) {
return;
}
_queue.push_back(batch);
_queue_reader_cond.notify_one();
};
int current_group = _current_group;
int total_groups = _total_groups;
while (true) {
if (_closed || current_group >= total_groups) {
_batch_eof = true;
_queue_reader_cond.notify_one();
return;
}
if (filter_row_group(current_group)) {
current_group++;
continue;
}
arrow::RecordBatchVector batches;
read_batches(batches, current_group);
if (!_status.ok()) {
_closed = true;
return;
}
std::for_each(batches.begin(), batches.end(), insert_batch);
current_group++;
}
}
ArrowFile::ArrowFile(FileReader* file) : _file(file) {}
ArrowFile::~ArrowFile() {

View File

@ -92,13 +92,17 @@ public:
return Status::NotSupported("Not Implemented read");
}
// for vec
virtual Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) = 0;
Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof);
std::shared_ptr<Statistics>& statistics() { return _statistics; }
void close();
virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); }
void prefetch_batch();
protected:
virtual Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
virtual void read_batches(arrow::RecordBatchVector& batches, int current_group) = 0;
virtual bool filter_row_group(int current_group) = 0;
protected:
const int64_t _batch_size;
@ -110,6 +114,16 @@ protected:
std::map<std::string, int> _map_column; // column-name <---> column-index
std::vector<int> _include_column_ids; // columns that need to get from file
std::shared_ptr<Statistics> _statistics;
std::atomic<bool> _closed = false;
std::atomic<bool> _batch_eof = false;
arrow::Status _status;
std::mutex _mtx;
std::condition_variable _queue_reader_cond;
std::condition_variable _queue_writer_cond;
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
std::thread _thread;
};
} // namespace doris

View File

@ -71,11 +71,7 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
}
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
bool eof = false;
RETURN_IF_ERROR(_next_stripe_reader(&eof));
if (eof) {
return Status::EndOfFile("end of file");
}
_thread = std::thread(&ArrowReaderWrap::prefetch_batch, this);
return Status::OK();
}
@ -143,23 +139,23 @@ Status ORCReaderWrap::_next_stripe_reader(bool* eof) {
return Status::OK();
}
Status ORCReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
*eof = false;
do {
auto st = _rb_reader->ReadNext(batch);
if (!st.ok()) {
LOG(WARNING) << "failed to get next batch, errmsg=" << st;
return Status::InternalError(st.ToString());
}
if (*batch == nullptr) {
// try next stripe
RETURN_IF_ERROR(_next_stripe_reader(eof));
if (*eof) {
break;
}
}
} while (*batch == nullptr);
return Status::OK();
void ORCReaderWrap::read_batches(arrow::RecordBatchVector& batches, int current_group) {
bool eof = false;
Status status = _next_stripe_reader(&eof);
if (!status.ok()) {
_closed = true;
return;
}
if (eof) {
_closed = true;
return;
}
_status = _rb_reader->ReadAll(&batches);
}
bool ORCReaderWrap::filter_row_group(int current_group) {
return false;
}
} // namespace doris

View File

@ -40,11 +40,12 @@ public:
const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) override;
Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
private:
Status _next_stripe_reader(bool* eof);
Status _seek_start_stripe();
void read_batches(arrow::RecordBatchVector& batches, int current_group) override;
bool filter_row_group(int current_group) override;
private:
// orc file reader object

View File

@ -46,14 +46,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size
_range_start_offset(range_start_offset),
_range_size(range_size) {}
ParquetReaderWrap::~ParquetReaderWrap() {
_closed = true;
_queue_writer_cond.notify_one();
if (_thread.joinable()) {
_thread.join();
}
}
Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::vector<ExprContext*>& conjunct_ctxs,
@ -111,7 +103,7 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
_row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids,
file_size);
}
_thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
_thread = std::thread(&ArrowReaderWrap::prefetch_batch, this);
return Status::OK();
} catch (parquet::ParquetException& e) {
std::stringstream str_error;
@ -184,26 +176,6 @@ Status ParquetReaderWrap::read_record_batch(bool* eof) {
return Status::OK();
}
Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.empty()) {
if (_batch_eof) {
_include_column_ids.clear();
*eof = true;
_batch_eof = false;
return Status::OK();
}
_queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
}
if (UNLIKELY(_closed)) {
return Status::InternalError(_status.message());
}
*batch = _queue.front();
_queue.pop_front();
_queue_writer_cond.notify_one();
return Status::OK();
}
Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array,
uint8_t* buf, int32_t* wbytes) {
const auto type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
@ -546,50 +518,6 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
return read_record_batch(eof);
}
void ParquetReaderWrap::prefetch_batch() {
auto insert_batch = [this](const auto& batch) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.size() == _max_queue_size) {
_queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
}
if (UNLIKELY(_closed)) {
return;
}
_queue.push_back(batch);
_queue_reader_cond.notify_one();
};
int current_group = 0;
int total_groups = _total_groups;
while (true) {
if (_closed || current_group >= total_groups) {
_batch_eof = true;
_queue_reader_cond.notify_one();
return;
}
if (config::parquet_predicate_push_down) {
auto filter_group_set = _row_group_reader->filter_groups();
if (filter_group_set.end() != filter_group_set.find(current_group)) {
// find filter group, skip
current_group++;
continue;
}
}
_status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader);
if (!_status.ok()) {
_closed = true;
return;
}
arrow::RecordBatchVector batches;
_status = _rb_reader->ReadAll(&batches);
if (!_status.ok()) {
_closed = true;
return;
}
std::for_each(batches.begin(), batches.end(), insert_batch);
current_group++;
}
}
Status ParquetReaderWrap::read_next_batch() {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.empty()) {
@ -609,4 +537,24 @@ Status ParquetReaderWrap::read_next_batch() {
return Status::OK();
}
void ParquetReaderWrap::read_batches(arrow::RecordBatchVector& batches, int current_group) {
_status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader);
if (!_status.ok()) {
_closed = true;
return;
}
_status = _rb_reader->ReadAll(&batches);
}
bool ParquetReaderWrap::filter_row_group(int current_group) {
if (config::parquet_predicate_push_down) {
auto filter_group_set = _row_group_reader->filter_groups();
if (filter_group_set.end() != filter_group_set.find(current_group)) {
// find filter group, skip
return true;
}
}
return false;
}
} // namespace doris

View File

@ -64,7 +64,7 @@ public:
// batch_size is not use here
ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size);
~ParquetReaderWrap() override;
~ParquetReaderWrap() override = default;
// Read
Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
@ -75,7 +75,6 @@ public:
const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) override;
Status init_parquet_type();
Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
private:
void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value,
@ -86,8 +85,9 @@ private:
int32_t* wbtyes);
private:
void prefetch_batch();
Status read_next_batch();
void read_batches(arrow::RecordBatchVector& batches, int current_group) override;
bool filter_row_group(int current_group) override;
private:
// parquet file reader object
@ -104,16 +104,7 @@ private:
int64_t _range_size;
private:
std::atomic<bool> _closed = false;
std::atomic<bool> _batch_eof = false;
arrow::Status _status;
std::mutex _mtx;
std::condition_variable _queue_reader_cond;
std::condition_variable _queue_writer_cond;
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
std::unique_ptr<doris::RowGroupReader> _row_group_reader;
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
std::thread _thread;
};
} // namespace doris