[feature-wip][multi-catalog]Support caseSensitive field name in file scan node (#11310)

* Impl case sentive in file scan node
This commit is contained in:
huangzhaowei
2022-08-05 08:03:16 +08:00
committed by GitHub
parent b5531c5caf
commit 6eb8ac0ebf
10 changed files with 57 additions and 18 deletions

View File

@ -30,6 +30,7 @@
#include "runtime/exec_env.h"
#include "runtime/mem_pool.h"
#include "runtime/tuple.h"
#include "util/string_util.h"
#include "util/thrift_util.h"
namespace doris {
@ -37,8 +38,10 @@ namespace doris {
// Broker
ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file)
: _batch_size(batch_size), _num_of_columns_from_file(num_of_columns_from_file) {
int32_t num_of_columns_from_file, bool caseSensitive)
: _batch_size(batch_size),
_num_of_columns_from_file(num_of_columns_from_file),
_caseSensitive(caseSensitive) {
_arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader));
_rb_reader = nullptr;
_total_groups = 0;
@ -81,6 +84,19 @@ Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple
return Status::OK();
}
int ArrowReaderWrap::get_cloumn_index(std::string column_name) {
std::string real_column_name = _caseSensitive ? column_name : to_lower(column_name);
auto iter = _map_column.find(real_column_name);
if (iter != _map_column.end()) {
return iter->second;
} else {
std::stringstream str_error;
str_error << "Invalid Column Name:" << real_column_name;
LOG(WARNING) << str_error.str();
return -1;
}
}
Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.empty()) {

View File

@ -79,7 +79,8 @@ private:
// base of arrow reader
class ArrowReaderWrap {
public:
ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file,
bool caseSensitive);
virtual ~ArrowReaderWrap();
virtual Status init_reader(const TupleDescriptor* tuple_desc,
@ -96,6 +97,7 @@ public:
std::shared_ptr<Statistics>& statistics() { return _statistics; }
void close();
virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); }
int get_cloumn_index(std::string column_name);
void prefetch_batch();
@ -124,6 +126,7 @@ protected:
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
std::thread _thread;
bool _caseSensitive;
};
} // namespace doris

View File

@ -24,13 +24,14 @@
#include "io/file_reader.h"
#include "runtime/mem_pool.h"
#include "runtime/tuple.h"
#include "util/string_util.h"
namespace doris {
ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file, int64_t range_start_offset,
int64_t range_size)
: ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
int64_t range_size, bool caseSensitive)
: ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, caseSensitive),
_range_start_offset(range_start_offset),
_range_size(range_size) {
_reader = nullptr;
@ -66,8 +67,11 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
}
std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie();
for (size_t i = 0; i < schema->num_fields(); ++i) {
std::string schemaName =
_caseSensitive ? schema->field(i)->name() : to_lower(schema->field(i)->name());
// orc index started from 1.
_map_column.emplace(schema->field(i)->name(), i + 1);
_map_column.emplace(schemaName, i + 1);
}
RETURN_IF_ERROR(column_indices(tuple_slot_descs));

View File

@ -33,7 +33,7 @@ namespace doris {
class ORCReaderWrap final : public ArrowReaderWrap {
public:
ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size);
int64_t range_start_offset, int64_t range_size, bool caseSensitive = true);
~ORCReaderWrap() override = default;
Status init_reader(const TupleDescriptor* tuple_desc,

View File

@ -32,14 +32,15 @@
#include "runtime/mem_pool.h"
#include "runtime/string_value.h"
#include "runtime/tuple.h"
#include "util/string_util.h"
namespace doris {
// Broker
ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file, int64_t range_start_offset,
int64_t range_size)
: ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
int64_t range_size, bool caseSensitive)
: ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, caseSensitive),
_rows_of_group(0),
_current_line_of_group(0),
_current_line_of_batch(0),
@ -84,12 +85,14 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
// map
auto* schemaDescriptor = _file_metadata->schema();
for (int i = 0; i < _file_metadata->num_columns(); ++i) {
std::string schemaName;
// Get the Column Reader for the boolean column
if (schemaDescriptor->Column(i)->max_definition_level() > 1) {
_map_column.emplace(schemaDescriptor->Column(i)->path()->ToDotVector()[0], i);
schemaName = schemaDescriptor->Column(i)->path()->ToDotVector()[0];
} else {
_map_column.emplace(schemaDescriptor->Column(i)->name(), i);
schemaName = schemaDescriptor->Column(i)->name();
}
_map_column.emplace(_caseSensitive ? schemaName : to_lower(schemaName), i);
}
_timezone = timezone;

View File

@ -63,7 +63,7 @@ class ParquetReaderWrap final : public ArrowReaderWrap {
public:
// batch_size is not use here
ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size);
int64_t range_start_offset, int64_t range_size, bool caseSensitive = true);
~ParquetReaderWrap() override = default;
// Read

View File

@ -186,7 +186,11 @@ Status FileArrowScanner::_append_batch_to_block(Block* block) {
if (slot_desc == nullptr) {
continue;
}
auto* array = _batch->GetColumnByName(slot_desc->col_name()).get();
int file_index = _cur_file_reader->get_cloumn_index(slot_desc->col_name());
if (file_index == -1) {
continue;
}
auto* array = _batch->column(file_index).get();
auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
RETURN_IF_ERROR(arrow_column_to_doris_column(
array, _arrow_batch_cur_idx, column_with_type_and_name.column,
@ -228,7 +232,7 @@ ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader,
int64_t range_start_offset,
int64_t range_size) {
return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file,
range_start_offset, range_size);
range_start_offset, range_size, false);
}
void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) {
@ -252,7 +256,7 @@ ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int
int64_t range_start_offset,
int64_t range_size) {
return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file, range_start_offset,
range_size);
range_size, false);
}
} // namespace doris::vectorized

View File

@ -222,6 +222,14 @@ public class BrokerUtil {
public static List<String> parseColumnsFromPath(String filePath, List<String> columnsFromPath)
throws UserException {
return parseColumnsFromPath(filePath, columnsFromPath, true);
}
public static List<String> parseColumnsFromPath(
String filePath,
List<String> columnsFromPath,
boolean caseSensitive)
throws UserException {
if (columnsFromPath == null || columnsFromPath.isEmpty()) {
return Collections.emptyList();
}
@ -246,7 +254,8 @@ public class BrokerUtil {
throw new UserException("Fail to parse columnsFromPath, expected: "
+ columnsFromPath + ", filePath: " + filePath);
}
int index = columnsFromPath.indexOf(pair[0]);
String parsedColumnName = caseSensitive ? pair[0] : pair[0].toLowerCase();
int index = columnsFromPath.indexOf(parsedColumnName);
if (index == -1) {
continue;
}

View File

@ -274,7 +274,7 @@ public class HudiScanNode extends BrokerScanNode {
TScanRangeLocations curLocations = newLocations(context.params, brokerDesc);
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
getPartitionKeys());
getPartitionKeys(), false);
int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size();
TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, fileFormatType,

View File

@ -309,7 +309,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
totalFileSize += split.getLength();
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
partitionKeys);
partitionKeys, false);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath);