[improve](csv_reader) handle csv reader error (#27892)

This commit is contained in:
HHoflittlefish777
2023-12-02 10:05:02 +08:00
committed by GitHub
parent f65103e2a6
commit 54b5d04ff9
6 changed files with 15 additions and 12 deletions

View File

@ -667,10 +667,10 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
// So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls.
switch (_text_serde_type) {
case TTextSerdeType::JSON_TEXT_SERDE:
static_cast<void>(deserialize_nullable_string<true>(*col_ptr, slice));
RETURN_IF_ERROR(deserialize_nullable_string<true>(*col_ptr, slice));
break;
case TTextSerdeType::HIVE_TEXT_SERDE:
static_cast<void>(deserialize_nullable_string<false>(*col_ptr, slice));
RETURN_IF_ERROR(deserialize_nullable_string<false>(*col_ptr, slice));
break;
default:
break;
@ -678,11 +678,11 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
} else {
switch (_text_serde_type) {
case TTextSerdeType::JSON_TEXT_SERDE:
static_cast<void>(
RETURN_IF_ERROR(
_serdes[i]->deserialize_one_cell_from_json(*col_ptr, slice, _options));
break;
case TTextSerdeType::HIVE_TEXT_SERDE:
static_cast<void>(
RETURN_IF_ERROR(
_serdes[i]->deserialize_one_cell_from_hive_text(*col_ptr, slice, _options));
break;
default:
@ -965,14 +965,16 @@ Status CsvReader::_parse_col_types(size_t col_nums, std::vector<TypeDescriptor>*
return Status::OK();
}
void CsvReader::close() {
Status CsvReader::close() {
if (_line_reader) {
_line_reader->close();
}
if (_file_reader) {
static_cast<void>(_file_reader->close());
RETURN_IF_ERROR(_file_reader->close());
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -197,7 +197,7 @@ public:
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
void close() override;
Status close() override;
private:
// used for stream/broker load of csv file.

View File

@ -64,7 +64,7 @@ public:
return Status::OK();
}
virtual void close() {}
virtual Status close() { return Status::OK(); }
protected:
const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding)

View File

@ -173,8 +173,9 @@ void ParquetReader::_init_profile() {
}
}
void ParquetReader::close() {
Status ParquetReader::close() {
_close_internal();
return Status::OK();
}
void ParquetReader::_close_internal() {

View File

@ -119,7 +119,7 @@ public:
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
void close() override;
Status close() override;
RowRange get_whole_range() { return _whole_range; }

View File

@ -713,7 +713,7 @@ void VFileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int l
Status VFileScanner::_get_next_reader() {
while (true) {
if (_cur_reader) {
_cur_reader->close();
RETURN_IF_ERROR(_cur_reader->close());
}
_cur_reader.reset(nullptr);
_src_block_init = false;
@ -1096,7 +1096,7 @@ Status VFileScanner::close(RuntimeState* state) {
}
if (_cur_reader) {
_cur_reader->close();
RETURN_IF_ERROR(_cur_reader->close());
}
RETURN_IF_ERROR(VScanner::close(state));