From 3a3def447d9077f49821b341f4b78c6cdeb9c285 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sat, 22 Oct 2022 22:40:03 +0800 Subject: [PATCH] [fix](csv-reader) fix bug that csv reader can not read text format hms table (#13515) 1. Missing field and line delimiter 2. When query external table with text(csv) format, we should pass the column position map to BE, otherwise the column order is wrong. TODO: 1. For now, if we query csv file with non-exist column, it will return null. But it should return null or default value of that column. 2. Add regression test after hive docker is ready. --- be/src/exec/text_converter.hpp | 2 +- be/src/vec/exec/format/csv/csv_reader.cpp | 124 +++++++++++------- be/src/vec/exec/format/csv/csv_reader.h | 11 +- be/src/vec/exec/scan/vfile_scanner.cpp | 2 +- .../catalog/HiveMetaStoreClientHelper.java | 3 + .../org/apache/doris/catalog/TableIf.java | 12 ++ .../catalog/external/HMSExternalTable.java | 1 + .../doris/datasource/HMSExternalCatalog.java | 1 + .../external/ExternalFileScanNode.java | 26 +++- .../planner/external/HiveScanProvider.java | 20 +++ gensrc/thrift/PlanNodes.thrift | 2 + 11 files changed, 147 insertions(+), 57 deletions(-) diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp index 5db043dc48..4eaa065947 100644 --- a/be/src/exec/text_converter.hpp +++ b/be/src/exec/text_converter.hpp @@ -194,7 +194,7 @@ inline bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc, size_t len, bool copy_string, bool need_escape) { vectorized::IColumn* col_ptr = nullable_col_ptr; // \N means it's NULL - if (true == slot_desc->is_nullable()) { + if (slot_desc->is_nullable()) { auto* nullable_column = reinterpret_cast(nullable_col_ptr); if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) { nullable_column->insert_data(nullptr, 0); diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 34c42d5e22..20c8d71a96 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -33,6 +33,9 @@ #include "vec/exec/scan/vfile_scanner.h" namespace doris::vectorized { + +const static Slice _s_null_slice = Slice("\\N"); + CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector& file_slot_descs) @@ -57,7 +60,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte CsvReader::~CsvReader() {} -Status CsvReader::init_reader() { +Status CsvReader::init_reader(bool is_load) { // set the skip lines and start offset int64_t start_offset = _range.start_offset; if (start_offset == 0 && _params.__isset.file_attributes && @@ -124,20 +127,32 @@ Status CsvReader::init_reader() { _file_format_type); } + _is_load = is_load; + if (!_is_load) { + // For query task, we need to save the mapping from table schema to file column + DCHECK(_params.__isset.column_idxs); + _col_idxs = _params.column_idxs; + } else { + // For load task, the column order is same as file column order + int i = 0; + for (auto& desc [[maybe_unused]] : _file_slot_descs) { + _col_idxs.push_back(i++); + } + } + _line_reader_eof = false; return Status::OK(); } Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - if (_line_reader_eof == true) { + if (_line_reader_eof) { *eof = true; return Status::OK(); } const int batch_size = _state->batch_size(); - auto columns = block->mutate_columns(); - - while (columns[0]->size() < batch_size && !_line_reader_eof) { + size_t rows = 0; + while (rows < batch_size && !_line_reader_eof) { const uint8_t* ptr = nullptr; size_t size = 0; RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof)); @@ -150,16 +165,11 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { continue; } - // TODO(ftw): check read_rows? - ++(*read_rows); - RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), columns)); - - if (_line_reader_eof == true) { - *eof = true; - break; - } + RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, &rows)); } - columns.clear(); + + *eof = (rows == 0); + *read_rows = rows; return Status::OK(); } @@ -228,7 +238,7 @@ Status CsvReader::_create_decompressor() { return Status::OK(); } -Status CsvReader::_fill_dest_columns(const Slice& line, std::vector& columns) { +Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* rows) { bool is_success = false; RETURN_IF_ERROR(_line_split_to_values(line, &is_success)); @@ -240,53 +250,67 @@ Status CsvReader::_fill_dest_columns(const Slice& line, std::vector _file_slot_descs.size() // we only take the first few columns for (int i = 0; i < _file_slot_descs.size(); ++i) { - // TODO(ftw): no need of src_slot_desc auto src_slot_desc = _file_slot_descs[i]; - const Slice& value = _split_values[i]; - _text_converter->write_string_column(src_slot_desc, &columns[i], value.data, value.size); + int col_idx = _col_idxs[i]; + // col idx is out of range, fill with null. + const Slice& value = + col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice; + IColumn* col_ptr = const_cast(block->get_by_position(i).column.get()); + _text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true, + false); } + ++(*rows); return Status::OK(); } Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { if (!validate_utf8(line.data, line.size)) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - []() -> std::string { return "Unable to display"; }, - []() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "{}", "Unable to display"); - return fmt::to_string(error_msg); - }, - &_line_reader_eof)); - _counter->num_rows_filtered++; - *success = false; - return Status::OK(); + if (!_is_load) { + return Status::InternalError("Only support csv data in utf8 codec"); + } else { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + []() -> std::string { return "Unable to display"; }, + []() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "{}", "Unable to display"); + return fmt::to_string(error_msg); + }, + &_line_reader_eof)); + _counter->num_rows_filtered++; + *success = false; + return Status::OK(); + } } _split_line(line); - // if actual column number in csv file is not equal to _file_slot_descs.size() - // then filter this line. - if (_split_values.size() != _file_slot_descs.size()) { - std::string cmp_str = - _split_values.size() > _file_slot_descs.size() ? "more than" : "less than"; - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { return std::string(line.data, line.size); }, - [&]() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "{} {} {}", "actual column number in csv file is ", - cmp_str, " schema column number."); - fmt::format_to(error_msg, "actual number: {}, column separator: [{}], ", - _split_values.size(), _value_separator); - fmt::format_to(error_msg, "line delimiter: [{}], schema column number: {}; ", - _line_delimiter, _file_slot_descs.size()); - return fmt::to_string(error_msg); - }, - &_line_reader_eof)); - _counter->num_rows_filtered++; - *success = false; - return Status::OK(); + if (_is_load) { + // Only check for load task. For query task, the non exist column will be filled "null". + // if actual column number in csv file is not equal to _file_slot_descs.size() + // then filter this line. + if (_split_values.size() != _file_slot_descs.size()) { + std::string cmp_str = + _split_values.size() > _file_slot_descs.size() ? "more than" : "less than"; + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return std::string(line.data, line.size); }, + [&]() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "{} {} {}", + "actual column number in csv file is ", cmp_str, + " schema column number."); + fmt::format_to(error_msg, "actual number: {}, column separator: [{}], ", + _split_values.size(), _value_separator); + fmt::format_to(error_msg, + "line delimiter: [{}], schema column number: {}; ", + _line_delimiter, _file_slot_descs.size()); + return fmt::to_string(error_msg); + }, + &_line_reader_eof)); + _counter->num_rows_filtered++; + *success = false; + return Status::OK(); + } } *success = true; diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 4c520acfed..d1ab1ebb26 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -36,14 +36,14 @@ public: const std::vector& file_slot_descs); ~CsvReader() override; - Status init_reader(); + Status init_reader(bool is_query); Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; private: Status _create_decompressor(); - Status _fill_dest_columns(const Slice& line, std::vector& columns); + Status _fill_dest_columns(const Slice& line, Block* block, size_t* rows); Status _line_split_to_values(const Slice& line, bool* success); void _split_line(const Slice& line); Status _check_array_format(std::vector& split_values, bool* is_success); @@ -57,6 +57,13 @@ private: const TFileScanRangeParams& _params; const TFileRangeDesc& _range; const std::vector& _file_slot_descs; + // Only for query task, save the columns' index which need to be read. + // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 + // and the corressponding position in file is 0, 3, 5. + // So the _col_idx will be: <0, 3, 5> + std::vector _col_idxs; + // True if this is a load task + bool _is_load = false; // _file_reader_s is for stream load pipe reader, // and _file_reader is for other file reader. diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 9bf3b3582f..c9ccfbf46c 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -492,7 +492,7 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_CSV_DEFLATE: { _cur_reader.reset( new CsvReader(_state, _profile, &_counter, _params, range, _file_slot_descs)); - init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(); + init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load); break; } default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 7ddd388338..c8171cb581 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -810,6 +810,8 @@ public class HiveMetaStoreClientHelper { return Type.FLOAT; case "double": return Type.DOUBLE; + case "string": + return Type.STRING; default: break; } @@ -923,3 +925,4 @@ public class HiveMetaStoreClientHelper { return output.toString(); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index bb0097d5d9..5d0426ae52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -73,6 +73,17 @@ public interface TableIf { Column getColumn(String name); + default int getBaseColumnIdxByName(String colName) { + int i = 0; + for (Column col : getBaseSchema()) { + if (col.getName().equalsIgnoreCase(colName)) { + return i; + } + ++i; + } + return -1; + } + String getMysqlType(); String getEngine(); @@ -163,3 +174,4 @@ public interface TableIf { } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index f13181987b..cbe84744ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -311,3 +311,4 @@ public class HMSExternalTable extends ExternalTable { return catalog.getCatalogProperty().getS3Properties(); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 8bd8c8288f..32847844dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -70,6 +70,7 @@ public class HMSExternalCatalog extends ExternalCatalog { client = new HiveMetaStoreClient(hiveConf); } catch (MetaException e) { LOG.warn("Failed to create HiveMetaStoreClient: {}", e.getMessage()); + return; } List allDatabases; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index fc04c03dc1..dba614e960 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -278,6 +278,7 @@ public class ExternalFileScanNode extends ExternalScanNode { ParamCreateContext context = contexts.get(i); FileScanProviderIf scanProvider = scanProviders.get(i); setDefaultValueExprs(scanProvider, context); + setColumnPositionMappingForTextFile(scanProvider, context); finalizeParamsForLoad(context, analyzer); createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); @@ -285,6 +286,27 @@ public class ExternalFileScanNode extends ExternalScanNode { } } + private void setColumnPositionMappingForTextFile(FileScanProviderIf scanProvider, ParamCreateContext context) + throws UserException { + if (type != Type.QUERY) { + return; + } + TableIf tbl = scanProvider.getTargetTable(); + List columnIdxs = Lists.newArrayList(); + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.isMaterialized()) { + continue; + } + String colName = slot.getColumn().getName(); + int idx = tbl.getBaseColumnIdxByName(colName); + if (idx == -1) { + throw new UserException("Column " + colName + " not found in table " + tbl.getName()); + } + columnIdxs.add(idx); + } + context.params.setColumnIdxs(columnIdxs); + } + protected void setDefaultValueExprs(FileScanProviderIf scanProvider, ParamCreateContext context) throws UserException { TableIf tbl = scanProvider.getTargetTable(); @@ -320,7 +342,7 @@ public class ExternalFileScanNode extends ExternalScanNode { default: Preconditions.checkState(false, type); } - // if slot desc is null, which mean it is a unrelated slot, just skip. + // if slot desc is null, which mean it is an unrelated slot, just skip. // eg: // (a, b, c) set (x=a, y=b, z=c) // c does not exist in file, the z will be filled with null, even if z has default value. @@ -499,5 +521,3 @@ public class ExternalFileScanNode extends ExternalScanNode { } } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index 1df3d639bc..04916fb105 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -35,11 +35,13 @@ import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; +import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TNetworkAddress; @@ -76,6 +78,10 @@ import java.util.stream.Collectors; public class HiveScanProvider implements HMSTableScanProviderIf { private static final Logger LOG = LogManager.getLogger(HiveScanProvider.class); + private static final String PROP_FIELD_DELIMITER = "field.delim"; + private static final String DEFAULT_FIELD_DELIMITER = "|"; + private static final String DEFAULT_LINE_DELIMITER = "\n"; + protected HMSExternalTable hmsTable; protected int inputSplitNum = 0; @@ -268,7 +274,20 @@ public class HiveScanProvider implements HMSTableScanProviderIf { String fsName = fullPath.replace(filePath, ""); TFileType locationType = getLocationType(); context.params.setFileType(locationType); + TFileFormatType fileFormatType = getFileFormatType(); context.params.setFormatType(getFileFormatType()); + if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) { + TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); + String columnSeparator + = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() + .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER); + textParams.setColumnSeparator(columnSeparator); + textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER); + TFileAttributes fileAttributes = new TFileAttributes(); + fileAttributes.setTextParams(textParams); + context.params.setFileAttributes(fileAttributes); + } + // set hdfs params for hdfs file type. Map locationProperties = getLocationProperties(); if (locationType == TFileType.FILE_HDFS) { @@ -364,3 +383,4 @@ public class HiveScanProvider implements HMSTableScanProviderIf { } + diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0f70c29ae2..498e8d2082 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -289,6 +289,8 @@ struct TFileScanRangeParams { 14: optional list broker_addresses 15: optional TFileAttributes file_attributes 16: optional Exprs.TExpr pre_filter_exprs + // For csv query task, same the column index in file, order by dest_tuple + 17: optional list column_idxs } struct TFileRangeDesc {