[fix](parquet) fix write error data as parquet format. (#12864)
* [fix](parquet) fix write error data as parquet format. Fix incorrect data conversion when writing tiny int and small int data to parquet files in non-vectorized engine.
This commit is contained in:
@ -382,6 +382,17 @@ parquet::RowGroupWriter* ParquetWriterWrapper::get_rg_writer() {
|
||||
return _rg_writer;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void ParquetWriterWrapper::write_int32_column(int index, T* item) {
|
||||
parquet::RowGroupWriter* rgWriter = get_rg_writer();
|
||||
parquet::Int32Writer* col_writer = static_cast<parquet::Int32Writer*>(rgWriter->column(index));
|
||||
int32_t value = 0;
|
||||
if (item != nullptr) {
|
||||
value = *item;
|
||||
}
|
||||
col_writer->WriteBatch(1, nullptr, nullptr, &value);
|
||||
}
|
||||
|
||||
Status ParquetWriterWrapper::_write_one_row(TupleRow* row) {
|
||||
int num_columns = _output_expr_ctxs.size();
|
||||
try {
|
||||
@ -400,18 +411,16 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) {
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TYPE_TINYINT:
|
||||
case TYPE_SMALLINT:
|
||||
case TYPE_TINYINT: {
|
||||
write_int32_column(index, static_cast<int8_t*>(item));
|
||||
break;
|
||||
}
|
||||
case TYPE_SMALLINT: {
|
||||
write_int32_column(index, static_cast<int16_t*>(item));
|
||||
break;
|
||||
}
|
||||
case TYPE_INT: {
|
||||
parquet::RowGroupWriter* rgWriter = get_rg_writer();
|
||||
parquet::Int32Writer* col_writer =
|
||||
static_cast<parquet::Int32Writer*>(rgWriter->column(index));
|
||||
if (item != nullptr) {
|
||||
col_writer->WriteBatch(1, nullptr, nullptr, static_cast<int32_t*>(item));
|
||||
} else {
|
||||
int32_t default_int32 = 0;
|
||||
col_writer->WriteBatch(1, nullptr, nullptr, &default_int32);
|
||||
}
|
||||
write_int32_column(index, static_cast<int32_t*>(item));
|
||||
break;
|
||||
}
|
||||
case TYPE_BIGINT: {
|
||||
|
||||
@ -120,6 +120,9 @@ public:
|
||||
int64_t written_len();
|
||||
|
||||
private:
|
||||
template <typename T>
|
||||
void write_int32_column(int index, T* item);
|
||||
|
||||
std::shared_ptr<ParquetOutputStream> _outstream;
|
||||
std::shared_ptr<parquet::WriterProperties> _properties;
|
||||
std::shared_ptr<parquet::schema::GroupNode> _schema;
|
||||
|
||||
Reference in New Issue
Block a user