[Enhancement] improve parquet reader via arrow's prefetch and multi thread (#9472)
* add ArrowReaderProperties to parquet::arrow::FileReader * support perfecth batch
This commit is contained in:
@ -736,6 +736,10 @@ CONF_Validator(string_type_length_soft_limit_bytes,
|
||||
// used for olap scanner to save memory, when the size of unused_object_pool
|
||||
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
|
||||
CONF_Int32(object_pool_buffer_size, "100");
|
||||
|
||||
// ParquetReaderWrap prefetch buffer size
|
||||
CONF_Int32(parquet_reader_max_buffer_size, "50");
|
||||
|
||||
} // namespace config
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -18,9 +18,15 @@
|
||||
|
||||
#include <arrow/array.h>
|
||||
#include <arrow/status.h>
|
||||
#include <arrow/type_fwd.h>
|
||||
#include <time.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "common/status.h"
|
||||
#include "exec/file_reader.h"
|
||||
#include "gen_cpp/PaloBrokerService_types.h"
|
||||
#include "gen_cpp/TPaloBrokerService.h"
|
||||
@ -44,9 +50,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int32_t num_of_col
|
||||
_current_line_of_group(0),
|
||||
_current_line_of_batch(0) {
|
||||
_parquet = std::shared_ptr<ParquetFile>(new ParquetFile(file_reader));
|
||||
_properties = parquet::ReaderProperties();
|
||||
_properties.enable_buffered_stream();
|
||||
_properties.set_buffer_size(65535);
|
||||
}
|
||||
|
||||
ParquetReaderWrap::~ParquetReaderWrap() {
|
||||
@ -55,10 +58,23 @@ ParquetReaderWrap::~ParquetReaderWrap() {
|
||||
Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
|
||||
const std::string& timezone) {
|
||||
try {
|
||||
// new file reader for parquet file
|
||||
auto st = parquet::arrow::FileReader::Make(
|
||||
arrow::default_memory_pool(),
|
||||
parquet::ParquetFileReader::Open(_parquet, _properties), &_reader);
|
||||
parquet::ArrowReaderProperties arrow_reader_properties =
|
||||
parquet::default_arrow_reader_properties();
|
||||
arrow_reader_properties.set_pre_buffer(true);
|
||||
arrow_reader_properties.set_use_threads(true);
|
||||
// Open Parquet file reader
|
||||
auto reader_builder = parquet::arrow::FileReaderBuilder();
|
||||
reader_builder.properties(arrow_reader_properties);
|
||||
|
||||
auto st = reader_builder.Open(_parquet);
|
||||
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
|
||||
return Status::InternalError("Failed to create file reader");
|
||||
}
|
||||
|
||||
st = reader_builder.Build(&_reader);
|
||||
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
|
||||
return Status::InternalError("Failed to create file reader");
|
||||
@ -85,31 +101,23 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
|
||||
|
||||
_timezone = timezone;
|
||||
|
||||
if (_current_line_of_group == 0) { // the first read
|
||||
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
|
||||
// read batch
|
||||
arrow::Status status = _reader->GetRecordBatchReader({_current_group},
|
||||
_parquet_column_ids, &_rb_batch);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "Get RecordBatch Failed. " << status.ToString();
|
||||
return Status::InternalError(status.ToString());
|
||||
}
|
||||
status = _rb_batch->ReadNext(&_batch);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "The first read record. " << status.ToString();
|
||||
return Status::InternalError(status.ToString());
|
||||
}
|
||||
_current_line_of_batch = 0;
|
||||
//save column type
|
||||
std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
|
||||
for (int i = 0; i < _parquet_column_ids.size(); i++) {
|
||||
std::shared_ptr<arrow::Field> field = field_schema->field(i);
|
||||
if (!field) {
|
||||
LOG(WARNING) << "Get field schema failed. Column order:" << i;
|
||||
return Status::InternalError(status.ToString());
|
||||
}
|
||||
_parquet_column_type.emplace_back(field->type()->id());
|
||||
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
|
||||
|
||||
std::thread thread(&ParquetReaderWrap::prefetch_batch, this);
|
||||
thread.detach();
|
||||
|
||||
// read batch
|
||||
RETURN_IF_ERROR(read_next_batch());
|
||||
_current_line_of_batch = 0;
|
||||
//save column type
|
||||
std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
|
||||
for (int i = 0; i < _parquet_column_ids.size(); i++) {
|
||||
std::shared_ptr<arrow::Field> field = field_schema->field(i);
|
||||
if (!field) {
|
||||
LOG(WARNING) << "Get field schema failed. Column order:" << i;
|
||||
return Status::InternalError(_status.ToString());
|
||||
}
|
||||
_parquet_column_type.emplace_back(field->type()->id());
|
||||
}
|
||||
return Status::OK();
|
||||
} catch (parquet::ParquetException& e) {
|
||||
@ -121,6 +129,8 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
|
||||
}
|
||||
|
||||
void ParquetReaderWrap::close() {
|
||||
_closed = true;
|
||||
_queue_writer_cond.notify_one();
|
||||
arrow::Status st = _parquet->Close();
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "close parquet file error: " << st.ToString();
|
||||
@ -195,25 +205,15 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
|
||||
_rows_of_group = _file_metadata->RowGroup(_current_group)
|
||||
->num_rows(); //get rows of the current row group
|
||||
// read batch
|
||||
arrow::Status status =
|
||||
_reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch);
|
||||
if (!status.ok()) {
|
||||
return Status::InternalError("Get RecordBatchReader Failed.");
|
||||
}
|
||||
status = _rb_batch->ReadNext(&_batch);
|
||||
if (!status.ok()) {
|
||||
return Status::InternalError("Read Batch Error With Libarrow.");
|
||||
}
|
||||
RETURN_IF_ERROR(read_next_batch());
|
||||
_current_line_of_batch = 0;
|
||||
} else if (_current_line_of_batch >= _batch->num_rows()) {
|
||||
VLOG_DEBUG << "read_record_batch, current group id:" << _current_group
|
||||
<< " current line of batch:" << _current_line_of_batch
|
||||
<< " is larger than batch size:" << _batch->num_rows()
|
||||
<< ". start to read next batch";
|
||||
arrow::Status status = _rb_batch->ReadNext(&_batch);
|
||||
if (!status.ok()) {
|
||||
return Status::InternalError("Read Batch Error With Libarrow.");
|
||||
}
|
||||
// read batch
|
||||
RETURN_IF_ERROR(read_next_batch());
|
||||
_current_line_of_batch = 0;
|
||||
}
|
||||
return Status::OK();
|
||||
@ -553,6 +553,55 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
|
||||
return read_record_batch(tuple_slot_descs, eof);
|
||||
}
|
||||
|
||||
void ParquetReaderWrap::prefetch_batch() {
|
||||
auto insert_batch = [this](const auto& batch) {
|
||||
std::unique_lock<std::mutex> lock(_mtx);
|
||||
while (!_closed && _queue.size() == _max_queue_size) {
|
||||
_queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
|
||||
}
|
||||
if (UNLIKELY(_closed)) {
|
||||
return;
|
||||
}
|
||||
_queue.push_back(batch);
|
||||
_queue_reader_cond.notify_one();
|
||||
};
|
||||
int current_group = 0;
|
||||
while (true) {
|
||||
if (_closed || current_group >= _total_groups) {
|
||||
return;
|
||||
}
|
||||
_status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch);
|
||||
if (!_status.ok()) {
|
||||
_closed = true;
|
||||
return;
|
||||
}
|
||||
arrow::RecordBatchVector batches;
|
||||
_status = _rb_batch->ReadAll(&batches);
|
||||
if (!_status.ok()) {
|
||||
_closed = true;
|
||||
return;
|
||||
}
|
||||
std::for_each(batches.begin(), batches.end(), insert_batch);
|
||||
current_group++;
|
||||
}
|
||||
}
|
||||
|
||||
Status ParquetReaderWrap::read_next_batch() {
|
||||
std::unique_lock<std::mutex> lock(_mtx);
|
||||
while (!_closed && _queue.empty()) {
|
||||
_queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
|
||||
}
|
||||
|
||||
if (UNLIKELY(_closed)) {
|
||||
return Status::InternalError(_status.message());
|
||||
}
|
||||
|
||||
_batch = _queue.front();
|
||||
_queue.pop_front();
|
||||
_queue_writer_cond.notify_one();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
ParquetFile::ParquetFile(FileReader* file) : _file(file) {}
|
||||
|
||||
ParquetFile::~ParquetFile() {
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
#include <arrow/io/api.h>
|
||||
#include <arrow/io/file.h>
|
||||
#include <arrow/io/interfaces.h>
|
||||
#include <arrow/status.h>
|
||||
#include <parquet/api/reader.h>
|
||||
#include <parquet/api/writer.h>
|
||||
#include <parquet/arrow/reader.h>
|
||||
@ -29,10 +30,16 @@
|
||||
#include <parquet/exception.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "common/config.h"
|
||||
#include "gen_cpp/PaloBrokerService_types.h"
|
||||
#include "gen_cpp/PlanNodes_types.h"
|
||||
#include "gen_cpp/Types_types.h"
|
||||
@ -51,7 +58,7 @@ class FileReader;
|
||||
class ParquetFile : public arrow::io::RandomAccessFile {
|
||||
public:
|
||||
ParquetFile(FileReader* file);
|
||||
virtual ~ParquetFile();
|
||||
~ParquetFile() override;
|
||||
arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
|
||||
arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
|
||||
arrow::Result<int64_t> GetSize() override;
|
||||
@ -92,9 +99,12 @@ private:
|
||||
Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
|
||||
int32_t* wbtyes);
|
||||
|
||||
private:
|
||||
void prefetch_batch();
|
||||
Status read_next_batch();
|
||||
|
||||
private:
|
||||
const int32_t _num_of_columns_from_file;
|
||||
parquet::ReaderProperties _properties;
|
||||
std::shared_ptr<ParquetFile> _parquet;
|
||||
|
||||
// parquet file reader object
|
||||
@ -113,6 +123,15 @@ private:
|
||||
int _current_line_of_batch;
|
||||
|
||||
std::string _timezone;
|
||||
|
||||
private:
|
||||
std::atomic<bool> _closed = false;
|
||||
arrow::Status _status;
|
||||
std::mutex _mtx;
|
||||
std::condition_variable _queue_reader_cond;
|
||||
std::condition_variable _queue_writer_cond;
|
||||
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
|
||||
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user