From 54b5d04ff9d2cac9bc88a37b87ee7b951f348232 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Sat, 2 Dec 2023 10:05:02 +0800 Subject: [PATCH] [improve](csv_reader) handle csv reader error (#27892) --- be/src/vec/exec/format/csv/csv_reader.cpp | 14 ++++++++------ be/src/vec/exec/format/csv/csv_reader.h | 2 +- be/src/vec/exec/format/generic_reader.h | 2 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 3 ++- be/src/vec/exec/format/parquet/vparquet_reader.h | 2 +- be/src/vec/exec/scan/vfile_scanner.cpp | 4 ++-- 6 files changed, 15 insertions(+), 12 deletions(-) diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 5aa155e69e..9440794167 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -667,10 +667,10 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, // So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls. switch (_text_serde_type) { case TTextSerdeType::JSON_TEXT_SERDE: - static_cast(deserialize_nullable_string(*col_ptr, slice)); + RETURN_IF_ERROR(deserialize_nullable_string(*col_ptr, slice)); break; case TTextSerdeType::HIVE_TEXT_SERDE: - static_cast(deserialize_nullable_string(*col_ptr, slice)); + RETURN_IF_ERROR(deserialize_nullable_string(*col_ptr, slice)); break; default: break; @@ -678,11 +678,11 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, } else { switch (_text_serde_type) { case TTextSerdeType::JSON_TEXT_SERDE: - static_cast( + RETURN_IF_ERROR( _serdes[i]->deserialize_one_cell_from_json(*col_ptr, slice, _options)); break; case TTextSerdeType::HIVE_TEXT_SERDE: - static_cast( + RETURN_IF_ERROR( _serdes[i]->deserialize_one_cell_from_hive_text(*col_ptr, slice, _options)); break; default: @@ -965,14 +965,16 @@ Status CsvReader::_parse_col_types(size_t col_nums, std::vector* return Status::OK(); } -void CsvReader::close() { +Status CsvReader::close() { if (_line_reader) { _line_reader->close(); } if (_file_reader) { - static_cast(_file_reader->close()); + RETURN_IF_ERROR(_file_reader->close()); } + + return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 28d82183d0..19561b39ee 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -197,7 +197,7 @@ public: Status get_parsed_schema(std::vector* col_names, std::vector* col_types) override; - void close() override; + Status close() override; private: // used for stream/broker load of csv file. diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index beac88f1ab..d6dd3ed81b 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -64,7 +64,7 @@ public: return Status::OK(); } - virtual void close() {} + virtual Status close() { return Status::OK(); } protected: const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding) diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 9db803789f..096a7cf484 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -173,8 +173,9 @@ void ParquetReader::_init_profile() { } } -void ParquetReader::close() { +Status ParquetReader::close() { _close_internal(); + return Status::OK(); } void ParquetReader::_close_internal() { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 25b00c5bfe..376b3791b0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -119,7 +119,7 @@ public: Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; - void close() override; + Status close() override; RowRange get_whole_range() { return _whole_range; } diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 149ec75fa4..80d6518e72 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -713,7 +713,7 @@ void VFileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int l Status VFileScanner::_get_next_reader() { while (true) { if (_cur_reader) { - _cur_reader->close(); + RETURN_IF_ERROR(_cur_reader->close()); } _cur_reader.reset(nullptr); _src_block_init = false; @@ -1096,7 +1096,7 @@ Status VFileScanner::close(RuntimeState* state) { } if (_cur_reader) { - _cur_reader->close(); + RETURN_IF_ERROR(_cur_reader->close()); } RETURN_IF_ERROR(VScanner::close(state));