From e64627a40799a071fa4f38e65ad76965b9676fc9 Mon Sep 17 00:00:00 2001 From: dontknow9179 <545187809@qq.com> Date: Thu, 12 Dec 2024 17:46:33 +0000 Subject: [PATCH] bugfix: parquet load decimal to oracle int --- .../engine/basic/ob_external_file_writer.cpp | 2 +- .../engine/basic/ob_external_file_writer.h | 2 +- src/sql/engine/basic/ob_select_into_op.cpp | 95 ++++++++----------- .../table/ob_parquet_table_row_iter.cpp | 4 +- src/sql/resolver/dml/ob_dml_resolver.cpp | 12 --- src/sql/resolver/dml/ob_select_resolver.cpp | 14 --- 6 files changed, 45 insertions(+), 84 deletions(-) diff --git a/src/sql/engine/basic/ob_external_file_writer.cpp b/src/sql/engine/basic/ob_external_file_writer.cpp index 0402e15ee..364e203ea 100644 --- a/src/sql/engine/basic/ob_external_file_writer.cpp +++ b/src/sql/engine/basic/ob_external_file_writer.cpp @@ -600,7 +600,7 @@ int ObOrcFileWriter::open_orc_file_writer(const orc::Type &orc_schema, int ObOrcFileWriter::write_file() { int ret = OB_SUCCESS; - orc::StructVectorBatch* root = dynamic_cast(orc_row_batch_.get()); + orc::StructVectorBatch* root = static_cast(orc_row_batch_.get()); orc::ColumnVectorBatch* col_vector_batch = NULL; if (batch_has_written_) { // do nothing diff --git a/src/sql/engine/basic/ob_external_file_writer.h b/src/sql/engine/basic/ob_external_file_writer.h index 5a395b92c..275e764b4 100644 --- a/src/sql/engine/basic/ob_external_file_writer.h +++ b/src/sql/engine/basic/ob_external_file_writer.h @@ -260,7 +260,7 @@ public: bool is_file_writer_null() {return !orc_file_writer_; } bool is_valid_to_write(orc::StructVectorBatch* &root) { - root = dynamic_cast(orc_row_batch_.get()); + root = static_cast(orc_row_batch_.get()); return orc_file_writer_ && orc_row_batch_ && OB_NOT_NULL(root); } int64_t get_file_size() override diff --git a/src/sql/engine/basic/ob_select_into_op.cpp b/src/sql/engine/basic/ob_select_into_op.cpp index 3a00d477f..368816edb 100644 --- a/src/sql/engine/basic/ob_select_into_op.cpp +++ b/src/sql/engine/basic/ob_select_into_op.cpp @@ -623,9 +623,13 @@ int ObSelectIntoOp::calc_first_file_path(ObString &path) ObString input_file_name = file_location_ == IntoFileLocation::REMOTE_OSS ? path.split_on('?').trim() : path; - if (input_file_name.length() == 0 || path.length() == 0 || OB_ISNULL(input)) { + if (OB_ISNULL(input)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected path or input is null", K(ret)); + LOG_WARN("op input is null", K(ret)); + } else if (input_file_name.length() == 0 || path.length() == 0) { + ret = OB_INVALID_ARGUMENT; + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "invalid outfile path"); + LOG_WARN("invalid outfile path", K(ret)); } else { if (input_file_name.ptr()[input_file_name.length() - 1] == '/'){ OZ(file_name_with_suffix.append_fmt("%.*sdata", input_file_name.length(), input_file_name.ptr())); @@ -744,10 +748,13 @@ int ObSelectIntoOp::split_file(ObExternalFileWriter &data_writer) { int ret = OB_SUCCESS; if (ObExternalFileFormat::FormatType::CSV_FORMAT == format_type_) { - ObCsvFileWriter &csv_data_writer = dynamic_cast(data_writer); - if (!use_shared_buf_ && OB_FAIL(csv_data_writer.flush_buf())) { + ObCsvFileWriter *csv_data_writer = static_cast(&data_writer); + if (OB_ISNULL(csv_data_writer)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null data writer", K(ret)); + } else if (!use_shared_buf_ && OB_FAIL(csv_data_writer->flush_buf())) { LOG_WARN("failed to flush buffer", K(ret)); - } else if (has_lob_ && use_shared_buf_ && OB_FAIL(csv_data_writer.flush_shared_buf(shared_buf_))) { + } else if (has_lob_ && use_shared_buf_ && OB_FAIL(csv_data_writer->flush_shared_buf(shared_buf_))) { // 要保证文件中每一行的完整性, 有lob的时候shared buffer里不一定是完整的一行 // 因此剩下的shared buffer里的内容也要刷到当前文件里, 这种情况下无法严格满足max_file_size的限制 LOG_WARN("failed to flush shared buffer", K(ret)); @@ -1291,7 +1298,7 @@ int ObSelectIntoOp::into_outfile(ObExternalFileWriter *data_writer) } } if (OB_SUCC(ret)) { - if (OB_ISNULL(data_writer) || OB_ISNULL(csv_data_writer = dynamic_cast(data_writer))) { + if (OB_ISNULL(csv_data_writer = static_cast(data_writer))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null data writer", K(ret)); } @@ -2062,8 +2069,7 @@ int ObSelectIntoOp::into_outfile_batch_csv(const ObBatchRows &brs, ObExternalFil } else if (do_partition_ && OB_FAIL(get_data_writer_for_partition(partition_datum_vector.at(i)->get_string(), data_writer))) { LOG_WARN("failed to set data writer for partition", K(ret)); - } else if (OB_ISNULL(data_writer) - || OB_ISNULL(csv_data_writer = dynamic_cast(data_writer))) { + } else if (OB_ISNULL(csv_data_writer = static_cast(data_writer))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null data writer", K(ret)); } else if (has_compress_ && OB_ISNULL(csv_data_writer->get_compress_stream_writer()) @@ -2394,7 +2400,7 @@ int ObSelectIntoOp::into_outfile_batch_parquet(const ObBatchRows &brs, ObExterna } else if (do_partition_ && OB_FAIL(get_data_writer_for_partition(partition_vector->get_string(row_idx), data_writer))) { LOG_WARN("failed to set data writer for partition", K(ret)); - } else if (OB_ISNULL(data_writer) || OB_ISNULL(parquet_data_writer = dynamic_cast(data_writer))) { + } else if (OB_ISNULL(parquet_data_writer = static_cast(data_writer))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null data writer", K(ret)); } else if (parquet_data_writer->is_file_writer_null() @@ -2521,7 +2527,7 @@ int ObSelectIntoOp::into_outfile_batch_orc(const ObBatchRows &brs, ObExternalFil } else if (do_partition_ && OB_FAIL(get_data_writer_for_partition(partition_vector->get_string(row_idx), data_writer))) { LOG_WARN("failed to set data writer for partition", K(ret)); - } else if (OB_ISNULL(data_writer) || OB_ISNULL(orc_data_writer = dynamic_cast(data_writer))) { + } else if (OB_ISNULL(orc_data_writer = static_cast(data_writer))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null data writer", K(ret)); } else if (orc_data_writer->is_file_writer_null() @@ -2588,11 +2594,8 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta, LOG_WARN("unexpected error", K(ret), K(col_idx), K(row_idx)); } else if (ob_is_integer_type(datum_meta.type_) || ObYearType == datum_meta.type_ || ObDateType == datum_meta.type_) { - orc::LongVectorBatch *long_batch = dynamic_cast(col_vector_batch); - if (OB_ISNULL(long_batch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error", K(ret), K(col_idx)); - } else if (expr_vector->is_null(row_idx)) { + orc::LongVectorBatch *long_batch = static_cast(col_vector_batch); + if (expr_vector->is_null(row_idx)) { col_vector_batch->hasNulls = true; col_vector_batch->notNull[row_offset] = false; } else { @@ -2624,27 +2627,19 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta, } if (OB_FAIL(ret)) { } else if (int_bytes <= sizeof(int64_t)) { - orc::Decimal64VectorBatch *decimal64vectorbatch = dynamic_cast(col_vector_batch); + orc::Decimal64VectorBatch *decimal64vectorbatch = static_cast(col_vector_batch); decimal64vectorbatch->precision = datum_meta.precision_; decimal64vectorbatch->scale = datum_meta.scale_; - if (OB_ISNULL(decimal64vectorbatch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error", K(ret), K(col_idx)); - } else if (int_bytes == sizeof(int32_t)) { + if (int_bytes == sizeof(int32_t)) { decimal64vectorbatch->values[row_offset] = value->int32_v_[0]; } else { decimal64vectorbatch->values[row_offset] = value->int64_v_[0]; } } else if (int_bytes <= sizeof(int128_t)) { - orc::Decimal128VectorBatch *decimal128vectorbatch = dynamic_cast(col_vector_batch); + orc::Decimal128VectorBatch *decimal128vectorbatch = static_cast(col_vector_batch); decimal128vectorbatch->precision = datum_meta.precision_; decimal128vectorbatch->scale = datum_meta.scale_; - if (OB_ISNULL(decimal128vectorbatch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error", K(ret), K(col_idx)); - } else { - decimal128vectorbatch->values[row_offset] = orc::Int128(value->int128_v_[0] >> 64, value->int128_v_[0]); - } + decimal128vectorbatch->values[row_offset] = orc::Int128(value->int128_v_[0] >> 64, value->int128_v_[0]); } else { ret = OB_NOT_SUPPORTED; LOG_USER_ERROR(OB_NOT_SUPPORTED, "this decimal type for orc"); @@ -2652,11 +2647,8 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta, } } } else if (ObDoubleType == datum_meta.type_ || ObFloatType == datum_meta.type_) { - orc::DoubleVectorBatch *double_vector_batch = dynamic_cast(col_vector_batch); - if (OB_ISNULL(double_vector_batch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error", K(ret), K(col_idx)); - } else if (expr_vector->is_null(row_idx)) { + orc::DoubleVectorBatch *double_vector_batch = static_cast(col_vector_batch); + if (expr_vector->is_null(row_idx)) { col_vector_batch->hasNulls = true; col_vector_batch->notNull[row_offset] = false; } else { @@ -2668,14 +2660,11 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta, } } else if (ob_is_text_tc(datum_meta.type_) || ob_is_string_tc(datum_meta.type_) || ObRawType == datum_meta.type_ || ObNullType == datum_meta.type_) { - orc::StringVectorBatch * string_vector_batch = dynamic_cast(col_vector_batch); + orc::StringVectorBatch * string_vector_batch = static_cast(col_vector_batch); bool has_lob_header = obj_meta.has_lob_header(); char *buf = nullptr; uint32_t res_len = 0; - if (OB_ISNULL(string_vector_batch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error", K(ret), K(col_idx)); - } else if (expr_vector->is_null(row_idx)) { + if (expr_vector->is_null(row_idx)) { col_vector_batch->hasNulls = true; col_vector_batch->notNull[row_offset] = false; } else { @@ -2688,11 +2677,8 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta, } } } else if (ob_is_datetime_tc(datum_meta.type_)) { // ObDatetimeType | ObTimestampType - orc::TimestampVectorBatch *timestamp_vector_batch = dynamic_cast(col_vector_batch); - if (OB_ISNULL(timestamp_vector_batch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error", K(ret), K(col_idx), K(row_idx)); - } else if (expr_vector->is_null(row_idx)) { + orc::TimestampVectorBatch *timestamp_vector_batch = static_cast(col_vector_batch); + if (expr_vector->is_null(row_idx)) { col_vector_batch->hasNulls = true; col_vector_batch->notNull[row_offset] = false; } else { @@ -2702,11 +2688,8 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta, timestamp_vector_batch->nanoseconds[row_offset] = (out_usec % USECS_PER_SEC) * NSECS_PER_USEC; // usec to nanosecond } } else if (ObTimestampNanoType == datum_meta.type_ || ObTimestampLTZType == datum_meta.type_) { - orc::TimestampVectorBatch *timestamp_vector_batch = dynamic_cast(col_vector_batch); - if (OB_ISNULL(timestamp_vector_batch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error", K(ret), K(col_idx), K(row_idx)); - } else if (expr_vector->is_null(row_idx)) { + orc::TimestampVectorBatch *timestamp_vector_batch = static_cast(col_vector_batch); + if (expr_vector->is_null(row_idx)) { col_vector_batch->hasNulls = true; col_vector_batch->notNull[row_offset] = false; } else { @@ -3462,11 +3445,12 @@ int ObSelectIntoOp::get_data_writer_for_partition(const ObString &partition_str, bool writer_added = false; if (OB_FAIL(new_data_writer(data_writer))) { LOG_WARN("failed to new data writer", K(ret)); + } else if (OB_ISNULL(data_writer)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); } else if (ObExternalFileFormat::FormatType::CSV_FORMAT == format_type_ && MY_SPEC.buffer_size_ > 0) { - if (OB_ISNULL(csv_data_writer = dynamic_cast(data_writer))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null", K(ret)); - } else if (OB_FAIL(csv_data_writer->alloc_buf(ctx_.get_allocator(), MY_SPEC.buffer_size_))) { + csv_data_writer = static_cast(data_writer); + if (OB_FAIL(csv_data_writer->alloc_buf(ctx_.get_allocator(), MY_SPEC.buffer_size_))) { LOG_WARN("failed to alloc buffer", K(ret)); } } @@ -3499,6 +3483,9 @@ int ObSelectIntoOp::create_the_only_data_writer(ObExternalFileWriter *&data_writ ObCsvFileWriter *csv_data_writer = NULL; if (OB_FAIL(new_data_writer(data_writer))) { LOG_WARN("failed to new data writer", K(ret)); + } else if (OB_ISNULL(data_writer)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); } else { data_writer->url_ = basic_url_; data_writer_ = data_writer; @@ -3508,10 +3495,8 @@ int ObSelectIntoOp::create_the_only_data_writer(ObExternalFileWriter *&data_writ && OB_FAIL(data_writer->open_file())) { LOG_WARN("failed to open file", K(ret)); } else if (ObExternalFileFormat::FormatType::CSV_FORMAT == format_type_ && MY_SPEC.buffer_size_ > 0) { - if (OB_ISNULL(csv_data_writer = dynamic_cast(data_writer))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null", K(ret)); - } else if (OB_FAIL(csv_data_writer->alloc_buf(ctx_.get_allocator(), MY_SPEC.buffer_size_))) { + csv_data_writer = static_cast(data_writer); + if (OB_FAIL(csv_data_writer->alloc_buf(ctx_.get_allocator(), MY_SPEC.buffer_size_))) { LOG_WARN("failed to alloc buffer", K(ret)); } } diff --git a/src/sql/engine/table/ob_parquet_table_row_iter.cpp b/src/sql/engine/table/ob_parquet_table_row_iter.cpp index 9d48dbaf2..b61a75109 100644 --- a/src/sql/engine/table/ob_parquet_table_row_iter.cpp +++ b/src/sql/engine/table/ob_parquet_table_row_iter.cpp @@ -703,7 +703,9 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col() } } else if (reader_->descr()->physical_type() == parquet::Type::Type::BYTE_ARRAY) { ObArrayWrap values; - int32_t int_bytes = wide::ObDecimalIntConstValue::get_int_bytes_by_precision(file_col_expr_->datum_meta_.precision_); + int32_t int_bytes = wide::ObDecimalIntConstValue::get_int_bytes_by_precision( + (file_col_expr_->datum_meta_.precision_ == -1) + ? 38 : file_col_expr_->datum_meta_.precision_); ObArrayWrap buffer; OZ (buffer.allocate_array(tmp_alloc_g.get_allocator(), int_bytes)); OZ (values.allocate_array(tmp_alloc_g.get_allocator(), batch_size_)); diff --git a/src/sql/resolver/dml/ob_dml_resolver.cpp b/src/sql/resolver/dml/ob_dml_resolver.cpp index d649d298c..058783633 100755 --- a/src/sql/resolver/dml/ob_dml_resolver.cpp +++ b/src/sql/resolver/dml/ob_dml_resolver.cpp @@ -8695,18 +8695,6 @@ int ObDMLResolver::resolve_external_table_generated_column( if (OB_FAIL(format.load_from_string(table_format_or_properties, *params_.allocator_))) { LOG_WARN("load from string failed", K(ret)); } - // delete later - if (OB_SUCC(ret) && format.format_type_ == ObExternalFileFormat::ORC_FORMAT && lib::is_oracle_mode()) { - ret = OB_E(EventTable::EN_EXTERNAL_TABLE_ORACLE) OB_SUCCESS; - bool enable_oracle_orc = OB_SUCCESS != ret; - if (enable_oracle_orc) { - ret = OB_SUCCESS; - } else { - ret = OB_NOT_SUPPORTED; - LOG_WARN("not support orc in oracle mode", K(ret)); - LOG_USER_ERROR(OB_NOT_SUPPORTED, "orc in oracle mode"); - } - } if (OB_SUCC(ret) && format.format_type_ != ObResolverUtils::resolve_external_file_column_type(col.col_name_)) { if (format.format_type_ == ObExternalFileFormat::ORC_FORMAT && ObExternalFileFormat::PARQUET_FORMAT != ObResolverUtils::resolve_external_file_column_type(col.col_name_)) { diff --git a/src/sql/resolver/dml/ob_select_resolver.cpp b/src/sql/resolver/dml/ob_select_resolver.cpp index 918c0a886..5cb5690a4 100644 --- a/src/sql/resolver/dml/ob_select_resolver.cpp +++ b/src/sql/resolver/dml/ob_select_resolver.cpp @@ -5263,20 +5263,6 @@ int ObSelectResolver::resolve_into_outfile_with_format(const ParseNode *node, Ob LOG_WARN("failed to init csv format", K(ret)); } } - // delete later - if (OB_SUCC(ret) && is_oracle_mode() - && (ObExternalFileFormat::PARQUET_FORMAT == external_format.format_type_ - || ObExternalFileFormat::ORC_FORMAT == external_format.format_type_)) { - ret = OB_E(EventTable::EN_EXTERNAL_TABLE_ORACLE) OB_SUCCESS; - bool enable_oracle_orc = OB_SUCCESS != ret; - if (enable_oracle_orc) { - ret = OB_SUCCESS; - } else { - ret = OB_NOT_SUPPORTED; - LOG_WARN("not support orc or parquet in oracle mode", K(ret)); - LOG_USER_ERROR(OB_NOT_SUPPORTED, "orc or parquet in oracle mode"); - } - } for (int i = 0; OB_SUCC(ret) && i < format_node->num_child_; ++i) { if (OB_ISNULL(option_node = format_node->children_[i])) { ret = OB_ERR_UNEXPECTED;