diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp index 29ee451e3a..33ac46a827 100644 --- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp +++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp @@ -65,22 +65,24 @@ Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool* } // create a reader to read data - arrow::Result> tRet = + arrow::Result> res_open = arrow::ipc::RecordBatchStreamReader::Open(_pip_stream.get(), arrow::ipc::IpcReadOptions::Defaults()); - if (!tRet.ok()) { - LOG(WARNING) << "failed to open stream reader: " << tRet.status().message(); - return Status::InternalError("failed to open stream reader: {}", tRet.status().message()); + if (!res_open.ok()) { + LOG(WARNING) << "failed to open stream reader: " << res_open.status().message(); + return Status::InternalError("failed to open stream reader: {}", + res_open.status().message()); } - auto reader = std::move(tRet).ValueUnsafe(); + auto reader = std::move(res_open).ValueUnsafe(); // get arrow data from reader - arrow::Result tRet2 = reader->ToRecordBatches(); - if (!tRet2.ok()) { - LOG(WARNING) << "failed to read batch: " << tRet2.status().message(); - return Status::InternalError("failed to read batch: {}", tRet2.status().message()); + arrow::Result res_reader = reader->ToRecordBatches(); + if (!res_reader.ok()) { + LOG(WARNING) << "failed to read batch: " << res_reader.status().message(); + return Status::InternalError("failed to read batch: {}", res_reader.status().message()); } - std::vector> out_batches = std::move(tRet2).ValueUnsafe(); + std::vector> out_batches = + std::move(res_reader).ValueUnsafe(); // convert arrow batch to block auto columns = block->mutate_columns(); @@ -94,10 +96,14 @@ Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool* std::string column_name = batch.schema()->field(c)->name(); - vectorized::ColumnWithTypeAndName& column_with_name = block->get_by_name(column_name); - - column_with_name.type->get_serde()->read_column_from_arrow( - column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz); + try { + vectorized::ColumnWithTypeAndName& column_with_name = + block->get_by_name(column_name); + column_with_name.type->get_serde()->read_column_from_arrow( + column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz); + } catch (Exception& e) { + return Status::InternalError("Failed to convert from arrow to block: {}", e.what()); + } } *read_rows += batch.num_rows(); }