bugfix: parquet load decimal to oracle int

This commit is contained in:
dontknow9179 2024-12-12 17:46:33 +00:00 committed by ob-robot
parent 90a058fb27
commit e64627a407
6 changed files with 45 additions and 84 deletions

View File

@ -600,7 +600,7 @@ int ObOrcFileWriter::open_orc_file_writer(const orc::Type &orc_schema,
int ObOrcFileWriter::write_file()
{
int ret = OB_SUCCESS;
orc::StructVectorBatch* root = dynamic_cast<orc::StructVectorBatch *>(orc_row_batch_.get());
orc::StructVectorBatch* root = static_cast<orc::StructVectorBatch *>(orc_row_batch_.get());
orc::ColumnVectorBatch* col_vector_batch = NULL;
if (batch_has_written_) {
// do nothing

View File

@ -260,7 +260,7 @@ public:
bool is_file_writer_null() {return !orc_file_writer_; }
bool is_valid_to_write(orc::StructVectorBatch* &root)
{
root = dynamic_cast<orc::StructVectorBatch *>(orc_row_batch_.get());
root = static_cast<orc::StructVectorBatch *>(orc_row_batch_.get());
return orc_file_writer_ && orc_row_batch_ && OB_NOT_NULL(root);
}
int64_t get_file_size() override

View File

@ -623,9 +623,13 @@ int ObSelectIntoOp::calc_first_file_path(ObString &path)
ObString input_file_name = file_location_ == IntoFileLocation::REMOTE_OSS
? path.split_on('?').trim()
: path;
if (input_file_name.length() == 0 || path.length() == 0 || OB_ISNULL(input)) {
if (OB_ISNULL(input)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected path or input is null", K(ret));
LOG_WARN("op input is null", K(ret));
} else if (input_file_name.length() == 0 || path.length() == 0) {
ret = OB_INVALID_ARGUMENT;
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "invalid outfile path");
LOG_WARN("invalid outfile path", K(ret));
} else {
if (input_file_name.ptr()[input_file_name.length() - 1] == '/'){
OZ(file_name_with_suffix.append_fmt("%.*sdata", input_file_name.length(), input_file_name.ptr()));
@ -744,10 +748,13 @@ int ObSelectIntoOp::split_file(ObExternalFileWriter &data_writer)
{
int ret = OB_SUCCESS;
if (ObExternalFileFormat::FormatType::CSV_FORMAT == format_type_) {
ObCsvFileWriter &csv_data_writer = dynamic_cast<ObCsvFileWriter&>(data_writer);
if (!use_shared_buf_ && OB_FAIL(csv_data_writer.flush_buf())) {
ObCsvFileWriter *csv_data_writer = static_cast<ObCsvFileWriter*>(&data_writer);
if (OB_ISNULL(csv_data_writer)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null data writer", K(ret));
} else if (!use_shared_buf_ && OB_FAIL(csv_data_writer->flush_buf())) {
LOG_WARN("failed to flush buffer", K(ret));
} else if (has_lob_ && use_shared_buf_ && OB_FAIL(csv_data_writer.flush_shared_buf(shared_buf_))) {
} else if (has_lob_ && use_shared_buf_ && OB_FAIL(csv_data_writer->flush_shared_buf(shared_buf_))) {
// 要保证文件中每一行的完整性, 有lob的时候shared buffer里不一定是完整的一行
// 因此剩下的shared buffer里的内容也要刷到当前文件里, 这种情况下无法严格满足max_file_size的限制
LOG_WARN("failed to flush shared buffer", K(ret));
@ -1291,7 +1298,7 @@ int ObSelectIntoOp::into_outfile(ObExternalFileWriter *data_writer)
}
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(data_writer) || OB_ISNULL(csv_data_writer = dynamic_cast<ObCsvFileWriter *>(data_writer))) {
if (OB_ISNULL(csv_data_writer = static_cast<ObCsvFileWriter *>(data_writer))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null data writer", K(ret));
}
@ -2062,8 +2069,7 @@ int ObSelectIntoOp::into_outfile_batch_csv(const ObBatchRows &brs, ObExternalFil
} else if (do_partition_ && OB_FAIL(get_data_writer_for_partition(partition_datum_vector.at(i)->get_string(),
data_writer))) {
LOG_WARN("failed to set data writer for partition", K(ret));
} else if (OB_ISNULL(data_writer)
|| OB_ISNULL(csv_data_writer = dynamic_cast<ObCsvFileWriter *>(data_writer))) {
} else if (OB_ISNULL(csv_data_writer = static_cast<ObCsvFileWriter *>(data_writer))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null data writer", K(ret));
} else if (has_compress_ && OB_ISNULL(csv_data_writer->get_compress_stream_writer())
@ -2394,7 +2400,7 @@ int ObSelectIntoOp::into_outfile_batch_parquet(const ObBatchRows &brs, ObExterna
} else if (do_partition_ && OB_FAIL(get_data_writer_for_partition(partition_vector->get_string(row_idx),
data_writer))) {
LOG_WARN("failed to set data writer for partition", K(ret));
} else if (OB_ISNULL(data_writer) || OB_ISNULL(parquet_data_writer = dynamic_cast<ObParquetFileWriter*>(data_writer))) {
} else if (OB_ISNULL(parquet_data_writer = static_cast<ObParquetFileWriter*>(data_writer))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null data writer", K(ret));
} else if (parquet_data_writer->is_file_writer_null()
@ -2521,7 +2527,7 @@ int ObSelectIntoOp::into_outfile_batch_orc(const ObBatchRows &brs, ObExternalFil
} else if (do_partition_ && OB_FAIL(get_data_writer_for_partition(partition_vector->get_string(row_idx),
data_writer))) {
LOG_WARN("failed to set data writer for partition", K(ret));
} else if (OB_ISNULL(data_writer) || OB_ISNULL(orc_data_writer = dynamic_cast<ObOrcFileWriter*>(data_writer))) {
} else if (OB_ISNULL(orc_data_writer = static_cast<ObOrcFileWriter*>(data_writer))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null data writer", K(ret));
} else if (orc_data_writer->is_file_writer_null()
@ -2588,11 +2594,8 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
LOG_WARN("unexpected error", K(ret), K(col_idx), K(row_idx));
} else if (ob_is_integer_type(datum_meta.type_)
|| ObYearType == datum_meta.type_ || ObDateType == datum_meta.type_) {
orc::LongVectorBatch *long_batch = dynamic_cast<orc::LongVectorBatch *>(col_vector_batch);
if (OB_ISNULL(long_batch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(col_idx));
} else if (expr_vector->is_null(row_idx)) {
orc::LongVectorBatch *long_batch = static_cast<orc::LongVectorBatch *>(col_vector_batch);
if (expr_vector->is_null(row_idx)) {
col_vector_batch->hasNulls = true;
col_vector_batch->notNull[row_offset] = false;
} else {
@ -2624,27 +2627,19 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
}
if (OB_FAIL(ret)) {
} else if (int_bytes <= sizeof(int64_t)) {
orc::Decimal64VectorBatch *decimal64vectorbatch = dynamic_cast<orc::Decimal64VectorBatch *>(col_vector_batch);
orc::Decimal64VectorBatch *decimal64vectorbatch = static_cast<orc::Decimal64VectorBatch *>(col_vector_batch);
decimal64vectorbatch->precision = datum_meta.precision_;
decimal64vectorbatch->scale = datum_meta.scale_;
if (OB_ISNULL(decimal64vectorbatch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(col_idx));
} else if (int_bytes == sizeof(int32_t)) {
if (int_bytes == sizeof(int32_t)) {
decimal64vectorbatch->values[row_offset] = value->int32_v_[0];
} else {
decimal64vectorbatch->values[row_offset] = value->int64_v_[0];
}
} else if (int_bytes <= sizeof(int128_t)) {
orc::Decimal128VectorBatch *decimal128vectorbatch = dynamic_cast<orc::Decimal128VectorBatch *>(col_vector_batch);
orc::Decimal128VectorBatch *decimal128vectorbatch = static_cast<orc::Decimal128VectorBatch *>(col_vector_batch);
decimal128vectorbatch->precision = datum_meta.precision_;
decimal128vectorbatch->scale = datum_meta.scale_;
if (OB_ISNULL(decimal128vectorbatch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(col_idx));
} else {
decimal128vectorbatch->values[row_offset] = orc::Int128(value->int128_v_[0] >> 64, value->int128_v_[0]);
}
decimal128vectorbatch->values[row_offset] = orc::Int128(value->int128_v_[0] >> 64, value->int128_v_[0]);
} else {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "this decimal type for orc");
@ -2652,11 +2647,8 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
}
}
} else if (ObDoubleType == datum_meta.type_ || ObFloatType == datum_meta.type_) {
orc::DoubleVectorBatch *double_vector_batch = dynamic_cast<orc::DoubleVectorBatch *>(col_vector_batch);
if (OB_ISNULL(double_vector_batch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(col_idx));
} else if (expr_vector->is_null(row_idx)) {
orc::DoubleVectorBatch *double_vector_batch = static_cast<orc::DoubleVectorBatch *>(col_vector_batch);
if (expr_vector->is_null(row_idx)) {
col_vector_batch->hasNulls = true;
col_vector_batch->notNull[row_offset] = false;
} else {
@ -2668,14 +2660,11 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
}
} else if (ob_is_text_tc(datum_meta.type_) || ob_is_string_tc(datum_meta.type_) || ObRawType == datum_meta.type_
|| ObNullType == datum_meta.type_) {
orc::StringVectorBatch * string_vector_batch = dynamic_cast<orc::StringVectorBatch *>(col_vector_batch);
orc::StringVectorBatch * string_vector_batch = static_cast<orc::StringVectorBatch *>(col_vector_batch);
bool has_lob_header = obj_meta.has_lob_header();
char *buf = nullptr;
uint32_t res_len = 0;
if (OB_ISNULL(string_vector_batch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(col_idx));
} else if (expr_vector->is_null(row_idx)) {
if (expr_vector->is_null(row_idx)) {
col_vector_batch->hasNulls = true;
col_vector_batch->notNull[row_offset] = false;
} else {
@ -2688,11 +2677,8 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
}
}
} else if (ob_is_datetime_tc(datum_meta.type_)) { // ObDatetimeType | ObTimestampType
orc::TimestampVectorBatch *timestamp_vector_batch = dynamic_cast<orc::TimestampVectorBatch *>(col_vector_batch);
if (OB_ISNULL(timestamp_vector_batch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(col_idx), K(row_idx));
} else if (expr_vector->is_null(row_idx)) {
orc::TimestampVectorBatch *timestamp_vector_batch = static_cast<orc::TimestampVectorBatch *>(col_vector_batch);
if (expr_vector->is_null(row_idx)) {
col_vector_batch->hasNulls = true;
col_vector_batch->notNull[row_offset] = false;
} else {
@ -2702,11 +2688,8 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
timestamp_vector_batch->nanoseconds[row_offset] = (out_usec % USECS_PER_SEC) * NSECS_PER_USEC; // usec to nanosecond
}
} else if (ObTimestampNanoType == datum_meta.type_ || ObTimestampLTZType == datum_meta.type_) {
orc::TimestampVectorBatch *timestamp_vector_batch = dynamic_cast<orc::TimestampVectorBatch *>(col_vector_batch);
if (OB_ISNULL(timestamp_vector_batch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(col_idx), K(row_idx));
} else if (expr_vector->is_null(row_idx)) {
orc::TimestampVectorBatch *timestamp_vector_batch = static_cast<orc::TimestampVectorBatch *>(col_vector_batch);
if (expr_vector->is_null(row_idx)) {
col_vector_batch->hasNulls = true;
col_vector_batch->notNull[row_offset] = false;
} else {
@ -3462,11 +3445,12 @@ int ObSelectIntoOp::get_data_writer_for_partition(const ObString &partition_str,
bool writer_added = false;
if (OB_FAIL(new_data_writer(data_writer))) {
LOG_WARN("failed to new data writer", K(ret));
} else if (OB_ISNULL(data_writer)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (ObExternalFileFormat::FormatType::CSV_FORMAT == format_type_ && MY_SPEC.buffer_size_ > 0) {
if (OB_ISNULL(csv_data_writer = dynamic_cast<ObCsvFileWriter*>(data_writer))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(csv_data_writer->alloc_buf(ctx_.get_allocator(), MY_SPEC.buffer_size_))) {
csv_data_writer = static_cast<ObCsvFileWriter*>(data_writer);
if (OB_FAIL(csv_data_writer->alloc_buf(ctx_.get_allocator(), MY_SPEC.buffer_size_))) {
LOG_WARN("failed to alloc buffer", K(ret));
}
}
@ -3499,6 +3483,9 @@ int ObSelectIntoOp::create_the_only_data_writer(ObExternalFileWriter *&data_writ
ObCsvFileWriter *csv_data_writer = NULL;
if (OB_FAIL(new_data_writer(data_writer))) {
LOG_WARN("failed to new data writer", K(ret));
} else if (OB_ISNULL(data_writer)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else {
data_writer->url_ = basic_url_;
data_writer_ = data_writer;
@ -3508,10 +3495,8 @@ int ObSelectIntoOp::create_the_only_data_writer(ObExternalFileWriter *&data_writ
&& OB_FAIL(data_writer->open_file())) {
LOG_WARN("failed to open file", K(ret));
} else if (ObExternalFileFormat::FormatType::CSV_FORMAT == format_type_ && MY_SPEC.buffer_size_ > 0) {
if (OB_ISNULL(csv_data_writer = dynamic_cast<ObCsvFileWriter*>(data_writer))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(csv_data_writer->alloc_buf(ctx_.get_allocator(), MY_SPEC.buffer_size_))) {
csv_data_writer = static_cast<ObCsvFileWriter*>(data_writer);
if (OB_FAIL(csv_data_writer->alloc_buf(ctx_.get_allocator(), MY_SPEC.buffer_size_))) {
LOG_WARN("failed to alloc buffer", K(ret));
}
}

View File

@ -703,7 +703,9 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col()
}
} else if (reader_->descr()->physical_type() == parquet::Type::Type::BYTE_ARRAY) {
ObArrayWrap<parquet::ByteArray> values;
int32_t int_bytes = wide::ObDecimalIntConstValue::get_int_bytes_by_precision(file_col_expr_->datum_meta_.precision_);
int32_t int_bytes = wide::ObDecimalIntConstValue::get_int_bytes_by_precision(
(file_col_expr_->datum_meta_.precision_ == -1)
? 38 : file_col_expr_->datum_meta_.precision_);
ObArrayWrap<char> buffer;
OZ (buffer.allocate_array(tmp_alloc_g.get_allocator(), int_bytes));
OZ (values.allocate_array(tmp_alloc_g.get_allocator(), batch_size_));

View File

@ -8695,18 +8695,6 @@ int ObDMLResolver::resolve_external_table_generated_column(
if (OB_FAIL(format.load_from_string(table_format_or_properties, *params_.allocator_))) {
LOG_WARN("load from string failed", K(ret));
}
// delete later
if (OB_SUCC(ret) && format.format_type_ == ObExternalFileFormat::ORC_FORMAT && lib::is_oracle_mode()) {
ret = OB_E(EventTable::EN_EXTERNAL_TABLE_ORACLE) OB_SUCCESS;
bool enable_oracle_orc = OB_SUCCESS != ret;
if (enable_oracle_orc) {
ret = OB_SUCCESS;
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support orc in oracle mode", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "orc in oracle mode");
}
}
if (OB_SUCC(ret) && format.format_type_ != ObResolverUtils::resolve_external_file_column_type(col.col_name_)) {
if (format.format_type_ == ObExternalFileFormat::ORC_FORMAT &&
ObExternalFileFormat::PARQUET_FORMAT != ObResolverUtils::resolve_external_file_column_type(col.col_name_)) {

View File

@ -5263,20 +5263,6 @@ int ObSelectResolver::resolve_into_outfile_with_format(const ParseNode *node, Ob
LOG_WARN("failed to init csv format", K(ret));
}
}
// delete later
if (OB_SUCC(ret) && is_oracle_mode()
&& (ObExternalFileFormat::PARQUET_FORMAT == external_format.format_type_
|| ObExternalFileFormat::ORC_FORMAT == external_format.format_type_)) {
ret = OB_E(EventTable::EN_EXTERNAL_TABLE_ORACLE) OB_SUCCESS;
bool enable_oracle_orc = OB_SUCCESS != ret;
if (enable_oracle_orc) {
ret = OB_SUCCESS;
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support orc or parquet in oracle mode", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "orc or parquet in oracle mode");
}
}
for (int i = 0; OB_SUCC(ret) && i < format_node->num_child_; ++i) {
if (OB_ISNULL(option_node = format_node->children_[i])) {
ret = OB_ERR_UNEXPECTED;