From f6ce072297b289974398a2d74dd1cf540f72163b Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Sun, 26 Feb 2023 09:03:04 +0800 Subject: [PATCH] [Enhencement](csv-reader) Optimize csv_reader `_split_value` and fix json_reader case sensitive (#17093) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Enhencement: For single-charset column separator,csv_reader use another method of `split value`. 2. BugFix Set `json` file format loading to be sensitive. --- be/src/vec/exec/format/csv/csv_reader.cpp | 73 ++++++++++++++++--- be/src/vec/exec/format/csv/csv_reader.h | 2 + .../doris/analysis/DataDescription.java | 8 +- .../main/java/org/apache/doris/load/Load.java | 7 +- 4 files changed, 76 insertions(+), 14 deletions(-) diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 18da999815..42f4842141 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -394,7 +394,11 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { } } - _split_line(line); + if (_value_separator_length == 1) { + _split_line_for_single_char_delimiter(line); + } else { + _split_line(line); + } if (_is_load) { // Only check for load task. For query task, the non exist column will be filled "null". @@ -428,19 +432,66 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { return Status::OK(); } +void CsvReader::_split_line_for_proto_format(const Slice& line) { + PDataRow** ptr = reinterpret_cast(line.data); + PDataRow* row = *ptr; + for (const PDataColumn& col : (row)->col()) { + int len = col.value().size(); + uint8_t* buf = new uint8_t[len]; + memcpy(buf, col.value().c_str(), len); + _split_values.emplace_back(buf, len); + } + delete row; + delete[] ptr; +} + +void CsvReader::_split_line_for_single_char_delimiter(const Slice& line) { + _split_values.clear(); + if (_file_format_type == TFileFormatType::FORMAT_PROTO) { + _split_line_for_proto_format(line); + } else { + const char* value = line.data; + size_t cur_pos = 0; + size_t start_field = 0; + const size_t size = line.size; + for (; cur_pos < size; ++cur_pos) { + if (*(value + cur_pos) == _value_separator[0]) { + size_t non_space = cur_pos; + if (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()) { + while (non_space > start_field && *(value + non_space - 1) == ' ') { + non_space--; + } + } + if (_trim_double_quotes && non_space > (start_field + 1) && + *(value + start_field) == '\"' && *(value + non_space - 1) == '\"') { + start_field++; + non_space--; + } + _split_values.emplace_back(value + start_field, non_space - start_field); + start_field = cur_pos + 1; + } + } + + CHECK(cur_pos == line.size) << cur_pos << " vs " << line.size; + size_t non_space = cur_pos; + if (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()) { + while (non_space > start_field && *(value + non_space - 1) == ' ') { + non_space--; + } + } + if (_trim_double_quotes && non_space > (start_field + 1) && + *(value + start_field) == '\"' && *(value + non_space - 1) == '\"') { + start_field++; + non_space--; + } + _split_values.emplace_back(value + start_field, non_space - start_field); + } +} + void CsvReader::_split_line(const Slice& line) { _split_values.clear(); if (_file_format_type == TFileFormatType::FORMAT_PROTO) { - PDataRow** ptr = reinterpret_cast(line.data); - PDataRow* row = *ptr; - for (const PDataColumn& col : (row)->col()) { - int len = col.value().size(); - uint8_t* buf = new uint8_t[len]; - memcpy(buf, col.value().c_str(), len); - _split_values.emplace_back(buf, len); - } - delete row; - delete[] ptr; + _split_line_for_proto_format(line); } else { const char* value = line.data; size_t start = 0; // point to the start pos of next col value. diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index a2fba641da..42ba82b240 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -62,6 +62,8 @@ private: std::vector& columns, size_t* rows); Status _line_split_to_values(const Slice& line, bool* success); void _split_line(const Slice& line); + void _split_line_for_single_char_delimiter(const Slice& line); + void _split_line_for_proto_format(const Slice& line); Status _check_array_format(std::vector& split_values, bool* is_success); bool _is_null(const Slice& slice); bool _is_array(const Slice& slice); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 7f2b99df3e..80a1c33d07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -973,7 +973,7 @@ public class DataDescription { // Change all the columns name to lower case, because Doris column is case-insensitive. private void columnsNameToLowerCase(List columns) { - if (columns == null || columns.isEmpty()) { + if (columns == null || columns.isEmpty() || "json".equals(this.fileFormat)) { return; } for (int i = 0; i < columns.size(); i++) { @@ -1081,7 +1081,11 @@ public class DataDescription { if (!mappingColNames.contains(column.getName())) { parsedColumnExprList.add(new ImportColumnDesc(column.getName(), null)); } - fileFieldNames.add(column.getName().toLowerCase()); + if ("json".equals(this.fileFormat)) { + fileFieldNames.add(column.getName()); + } else { + fileFieldNames.add(column.getName().toLowerCase()); + } } LOG.debug("after fill column info. columns: {}, parsed column exprs: {}", fileFieldNames, parsedColumnExprList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 854d96cb2a..180ccd12a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -603,7 +603,12 @@ public class Load { if (hasSequenceCol && column.isSequenceColumn()) { continue; } - ImportColumnDesc columnDesc = new ImportColumnDesc(column.getName().toLowerCase()); + ImportColumnDesc columnDesc = null; + if (formatType == TFileFormatType.FORMAT_JSON) { + columnDesc = new ImportColumnDesc(column.getName()); + } else { + columnDesc = new ImportColumnDesc(column.getName().toLowerCase()); + } LOG.debug("add base column {} to stream load task", column.getName()); copiedColumnExprs.add(columnDesc); }