[fix](streamload) catch exception when reading arrow data (#28558)

This commit is contained in:
wuwenchi
2023-12-18 22:03:57 +08:00
committed by GitHub
parent ddcfba0368
commit 97e63516b7

View File

@ -65,22 +65,24 @@ Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool*
}
// create a reader to read data
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchStreamReader>> tRet =
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchStreamReader>> res_open =
arrow::ipc::RecordBatchStreamReader::Open(_pip_stream.get(),
arrow::ipc::IpcReadOptions::Defaults());
if (!tRet.ok()) {
LOG(WARNING) << "failed to open stream reader: " << tRet.status().message();
return Status::InternalError("failed to open stream reader: {}", tRet.status().message());
if (!res_open.ok()) {
LOG(WARNING) << "failed to open stream reader: " << res_open.status().message();
return Status::InternalError("failed to open stream reader: {}",
res_open.status().message());
}
auto reader = std::move(tRet).ValueUnsafe();
auto reader = std::move(res_open).ValueUnsafe();
// get arrow data from reader
arrow::Result<arrow::RecordBatchVector> tRet2 = reader->ToRecordBatches();
if (!tRet2.ok()) {
LOG(WARNING) << "failed to read batch: " << tRet2.status().message();
return Status::InternalError("failed to read batch: {}", tRet2.status().message());
arrow::Result<arrow::RecordBatchVector> res_reader = reader->ToRecordBatches();
if (!res_reader.ok()) {
LOG(WARNING) << "failed to read batch: " << res_reader.status().message();
return Status::InternalError("failed to read batch: {}", res_reader.status().message());
}
std::vector<std::shared_ptr<arrow::RecordBatch>> out_batches = std::move(tRet2).ValueUnsafe();
std::vector<std::shared_ptr<arrow::RecordBatch>> out_batches =
std::move(res_reader).ValueUnsafe();
// convert arrow batch to block
auto columns = block->mutate_columns();
@ -94,10 +96,14 @@ Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool*
std::string column_name = batch.schema()->field(c)->name();
vectorized::ColumnWithTypeAndName& column_with_name = block->get_by_name(column_name);
column_with_name.type->get_serde()->read_column_from_arrow(
column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz);
try {
vectorized::ColumnWithTypeAndName& column_with_name =
block->get_by_name(column_name);
column_with_name.type->get_serde()->read_column_from_arrow(
column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz);
} catch (Exception& e) {
return Status::InternalError("Failed to convert from arrow to block: {}", e.what());
}
}
*read_rows += batch.num_rows();
}