[refactor](reader) refactor the interface of file reader (#12574)

Currently, Doris has a variety of readers for different file formats,
such as parquet reader, orc reader, csv reader, json reader and so on.

The interfaces of these readers are not unified, which makes it impossible to call them through a unified method.

In this PR, I added a `GenericReader` interface class, and other Readers will implement this interface class
to use the `get_next_block()` method.

This PR currently only modifies `arrow_reader` and `parquet reader`.
Other readers will be modified one by one in subsequent PRs.
This commit is contained in:
Mingyu Chen
2022-09-14 22:31:11 +08:00
committed by GitHub
parent be0a0200cf
commit c5ad989065
23 changed files with 199 additions and 121 deletions

View File

@ -29,17 +29,21 @@
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/mem_pool.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "util/string_util.h"
#include "util/thrift_util.h"
#include "vec/core/block.h"
#include "vec/utils/arrow_column_to_doris_column.h"
namespace doris {
// Broker
ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file, bool case_sensitive)
: _batch_size(batch_size),
ArrowReaderWrap::ArrowReaderWrap(RuntimeState* state,
const std::vector<SlotDescriptor*>& file_slot_descs,
FileReader* file_reader, int32_t num_of_columns_from_file,
bool case_sensitive)
: _state(state),
_file_slot_descs(file_slot_descs),
_num_of_columns_from_file(num_of_columns_from_file),
_case_sensitive(case_sensitive) {
_arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader));
@ -65,11 +69,11 @@ void ArrowReaderWrap::close() {
}
}
Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs) {
DCHECK(_num_of_columns_from_file <= tuple_slot_descs.size());
Status ArrowReaderWrap::column_indices() {
DCHECK(_num_of_columns_from_file <= _file_slot_descs.size());
_include_column_ids.clear();
for (int i = 0; i < _num_of_columns_from_file; i++) {
auto slot_desc = tuple_slot_descs.at(i);
auto slot_desc = _file_slot_descs.at(i);
// Get the Column Reader for the boolean column
auto iter = _map_column.find(slot_desc->col_name());
if (iter != _map_column.end()) {
@ -84,7 +88,7 @@ Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple
return Status::OK();
}
int ArrowReaderWrap::get_cloumn_index(std::string column_name) {
int ArrowReaderWrap::get_column_index(std::string column_name) {
std::string real_column_name = _case_sensitive ? column_name : to_lower(column_name);
auto iter = _map_column.find(real_column_name);
if (iter != _map_column.end()) {
@ -97,6 +101,37 @@ int ArrowReaderWrap::get_cloumn_index(std::string column_name) {
}
}
Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) {
size_t rows = 0;
do {
if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
RETURN_IF_ERROR(next_batch(&_batch, eof));
if (*eof) {
return Status::OK();
}
}
size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
(_batch->num_rows() - _arrow_batch_cur_idx));
for (auto i = 0; i < _file_slot_descs.size(); ++i) {
SlotDescriptor* slot_desc = _file_slot_descs[i];
if (slot_desc == nullptr) {
continue;
}
std::string real_column_name =
is_case_sensitive() ? slot_desc->col_name() : slot_desc->col_name_lower_case();
auto* array = _batch->GetColumnByName(real_column_name).get();
auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
RETURN_IF_ERROR(arrow_column_to_doris_column(
array, _arrow_batch_cur_idx, column_with_type_and_name.column,
column_with_type_and_name.type, num_elements, _state->timezone_obj()));
}
rows += num_elements;
_arrow_batch_cur_idx += num_elements;
} while (!(*eof) && rows < _state->batch_size());
return Status::OK();
}
Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.empty()) {
@ -114,6 +149,7 @@ Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, b
*batch = _queue.front();
_queue.pop_front();
_queue_writer_cond.notify_one();
_arrow_batch_cur_idx = 0;
return Status::OK();
}