From 4075e3aec6e1041bc855cd30cc85b7ff584c4a28 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 4 Jan 2023 18:25:08 +0800 Subject: [PATCH] [fix](csv-reader) fix new csv reader's performance issue (#15581) --- be/src/vec/exec/format/csv/csv_reader.cpp | 60 +++++++++++++++++------ be/src/vec/exec/format/csv/csv_reader.h | 8 ++- be/src/vec/exec/scan/vfile_scanner.h | 1 + 3 files changed, 53 insertions(+), 16 deletions(-) diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 5e3a975341..38fd30ba74 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -167,9 +167,23 @@ Status CsvReader::init_reader(bool is_load) { _is_load = is_load; if (!_is_load) { - // For query task, we need to save the mapping from table schema to file column + // For query task, there are 2 slot mapping. + // One is from file slot to values in line. + // eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5 + // the _col_idxs will save: 0, 2, 4 + // The other is from file slot to columns in output block + // eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5 + // where "p1" is the partition col which does not exist in file + // the _file_slot_idx_map will save: 1, 2, 3 DCHECK(_params.__isset.column_idxs); _col_idxs = _params.column_idxs; + int idx = 0; + for (const auto& slot_info : _params.required_slots) { + if (slot_info.is_file_slot) { + _file_slot_idx_map.push_back(idx); + } + idx++; + } } else { // For load task, the column order is same as file column order int i = 0; @@ -190,6 +204,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { const int batch_size = _state->batch_size(); size_t rows = 0; + auto columns = block->mutate_columns(); while (rows < batch_size && !_line_reader_eof) { const uint8_t* ptr = nullptr; size_t size = 0; @@ -203,7 +218,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { continue; } - RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, &rows)); + RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows)); } *eof = (rows == 0); @@ -303,7 +318,8 @@ Status CsvReader::_create_decompressor() { return Status::OK(); } -Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* rows) { +Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, + std::vector& columns, size_t* rows) { bool is_success = false; RETURN_IF_ERROR(_line_split_to_values(line, &is_success)); @@ -312,18 +328,32 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* ro return Status::OK(); } - // if _split_values.size > _file_slot_descs.size() - // we only take the first few columns - for (int i = 0; i < _file_slot_descs.size(); ++i) { - auto src_slot_desc = _file_slot_descs[i]; - int col_idx = _col_idxs[i]; - // col idx is out of range, fill with null. - const Slice& value = - col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice; - IColumn* col_ptr = - const_cast(block->get_by_name(src_slot_desc->col_name()).column.get()); - _text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true, - false); + if (_is_load) { + for (int i = 0; i < _file_slot_descs.size(); ++i) { + auto src_slot_desc = _file_slot_descs[i]; + int col_idx = _col_idxs[i]; + // col idx is out of range, fill with null. + const Slice& value = + col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice; + // For load task, we always read "string" from file, so use "write_string_column" + _text_converter->write_string_column(src_slot_desc, &columns[i], value.data, + value.size); + } + } else { + // if _split_values.size > _file_slot_descs.size() + // we only take the first few columns + for (int i = 0; i < _file_slot_descs.size(); ++i) { + auto src_slot_desc = _file_slot_descs[i]; + int col_idx = _col_idxs[i]; + // col idx is out of range, fill with null. + const Slice& value = + col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice; + IColumn* col_ptr = const_cast( + block->get_by_position(_file_slot_idx_map[i]).column.get()); + // For query task, we will convert values to final column type, so use "write_vec_column" + _text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true, + false); + } } ++(*rows); diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 2b1c35193a..c237958d07 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -56,7 +56,8 @@ public: private: // used for stream/broker load of csv file. Status _create_decompressor(); - Status _fill_dest_columns(const Slice& line, Block* block, size_t* rows); + Status _fill_dest_columns(const Slice& line, Block* block, + std::vector& columns, size_t* rows); Status _line_split_to_values(const Slice& line, bool* success); void _split_line(const Slice& line); Status _check_array_format(std::vector& split_values, bool* is_success); @@ -77,6 +78,11 @@ private: const TFileScanRangeParams& _params; const TFileRangeDesc& _range; const std::vector& _file_slot_descs; + // Only for query task, save the file slot to columns in block map. + // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 + // and this 3 columns in block are k2, k3, k1, + // the _file_slot_idx_map will save: 2, 0, 1 + std::vector _file_slot_idx_map; // Only for query task, save the columns' index which need to be read. // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 // and the corresponding position in file is 0, 3, 5. diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index cfe26d9753..6ac802f60c 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -68,6 +68,7 @@ protected: std::map _file_slot_name_map; // col names from _file_slot_descs std::vector _file_col_names; + // Partition source slot descriptors std::vector _partition_slot_descs; // Partition slot id to index in _partition_slot_descs