[fix](csv-reader) fix new csv reader's performance issue (#15581)
This commit is contained in:
@ -167,9 +167,23 @@ Status CsvReader::init_reader(bool is_load) {
|
||||
|
||||
_is_load = is_load;
|
||||
if (!_is_load) {
|
||||
// For query task, we need to save the mapping from table schema to file column
|
||||
// For query task, there are 2 slot mapping.
|
||||
// One is from file slot to values in line.
|
||||
// eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5
|
||||
// the _col_idxs will save: 0, 2, 4
|
||||
// The other is from file slot to columns in output block
|
||||
// eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5
|
||||
// where "p1" is the partition col which does not exist in file
|
||||
// the _file_slot_idx_map will save: 1, 2, 3
|
||||
DCHECK(_params.__isset.column_idxs);
|
||||
_col_idxs = _params.column_idxs;
|
||||
int idx = 0;
|
||||
for (const auto& slot_info : _params.required_slots) {
|
||||
if (slot_info.is_file_slot) {
|
||||
_file_slot_idx_map.push_back(idx);
|
||||
}
|
||||
idx++;
|
||||
}
|
||||
} else {
|
||||
// For load task, the column order is same as file column order
|
||||
int i = 0;
|
||||
@ -190,6 +204,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
|
||||
|
||||
const int batch_size = _state->batch_size();
|
||||
size_t rows = 0;
|
||||
auto columns = block->mutate_columns();
|
||||
while (rows < batch_size && !_line_reader_eof) {
|
||||
const uint8_t* ptr = nullptr;
|
||||
size_t size = 0;
|
||||
@ -203,7 +218,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
|
||||
continue;
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, &rows));
|
||||
RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows));
|
||||
}
|
||||
|
||||
*eof = (rows == 0);
|
||||
@ -303,7 +318,8 @@ Status CsvReader::_create_decompressor() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* rows) {
|
||||
Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
|
||||
std::vector<MutableColumnPtr>& columns, size_t* rows) {
|
||||
bool is_success = false;
|
||||
|
||||
RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
|
||||
@ -312,18 +328,32 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* ro
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// if _split_values.size > _file_slot_descs.size()
|
||||
// we only take the first few columns
|
||||
for (int i = 0; i < _file_slot_descs.size(); ++i) {
|
||||
auto src_slot_desc = _file_slot_descs[i];
|
||||
int col_idx = _col_idxs[i];
|
||||
// col idx is out of range, fill with null.
|
||||
const Slice& value =
|
||||
col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
|
||||
IColumn* col_ptr =
|
||||
const_cast<IColumn*>(block->get_by_name(src_slot_desc->col_name()).column.get());
|
||||
_text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true,
|
||||
false);
|
||||
if (_is_load) {
|
||||
for (int i = 0; i < _file_slot_descs.size(); ++i) {
|
||||
auto src_slot_desc = _file_slot_descs[i];
|
||||
int col_idx = _col_idxs[i];
|
||||
// col idx is out of range, fill with null.
|
||||
const Slice& value =
|
||||
col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
|
||||
// For load task, we always read "string" from file, so use "write_string_column"
|
||||
_text_converter->write_string_column(src_slot_desc, &columns[i], value.data,
|
||||
value.size);
|
||||
}
|
||||
} else {
|
||||
// if _split_values.size > _file_slot_descs.size()
|
||||
// we only take the first few columns
|
||||
for (int i = 0; i < _file_slot_descs.size(); ++i) {
|
||||
auto src_slot_desc = _file_slot_descs[i];
|
||||
int col_idx = _col_idxs[i];
|
||||
// col idx is out of range, fill with null.
|
||||
const Slice& value =
|
||||
col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
|
||||
IColumn* col_ptr = const_cast<IColumn*>(
|
||||
block->get_by_position(_file_slot_idx_map[i]).column.get());
|
||||
// For query task, we will convert values to final column type, so use "write_vec_column"
|
||||
_text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true,
|
||||
false);
|
||||
}
|
||||
}
|
||||
++(*rows);
|
||||
|
||||
|
||||
@ -56,7 +56,8 @@ public:
|
||||
private:
|
||||
// used for stream/broker load of csv file.
|
||||
Status _create_decompressor();
|
||||
Status _fill_dest_columns(const Slice& line, Block* block, size_t* rows);
|
||||
Status _fill_dest_columns(const Slice& line, Block* block,
|
||||
std::vector<MutableColumnPtr>& columns, size_t* rows);
|
||||
Status _line_split_to_values(const Slice& line, bool* success);
|
||||
void _split_line(const Slice& line);
|
||||
Status _check_array_format(std::vector<Slice>& split_values, bool* is_success);
|
||||
@ -77,6 +78,11 @@ private:
|
||||
const TFileScanRangeParams& _params;
|
||||
const TFileRangeDesc& _range;
|
||||
const std::vector<SlotDescriptor*>& _file_slot_descs;
|
||||
// Only for query task, save the file slot to columns in block map.
|
||||
// eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
|
||||
// and this 3 columns in block are k2, k3, k1,
|
||||
// the _file_slot_idx_map will save: 2, 0, 1
|
||||
std::vector<int> _file_slot_idx_map;
|
||||
// Only for query task, save the columns' index which need to be read.
|
||||
// eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
|
||||
// and the corresponding position in file is 0, 3, 5.
|
||||
|
||||
@ -68,6 +68,7 @@ protected:
|
||||
std::map<std::string, int> _file_slot_name_map;
|
||||
// col names from _file_slot_descs
|
||||
std::vector<std::string> _file_col_names;
|
||||
|
||||
// Partition source slot descriptors
|
||||
std::vector<SlotDescriptor*> _partition_slot_descs;
|
||||
// Partition slot id to index in _partition_slot_descs
|
||||
|
||||
Reference in New Issue
Block a user