support parquet read ufloat,udouble,unsigned decimal(unumber)

This commit is contained in:
dontknow9179 2024-11-29 14:48:35 +00:00 committed by ob-robot
parent 9d62ee876f
commit 30eb2ce6d6
4 changed files with 56 additions and 65 deletions

View File

@ -56,7 +56,11 @@ int ObExternalFileWriter::close_file()
{
int ret = OB_SUCCESS;
if (IntoFileLocation::SERVER_DISK == file_location_) {
file_appender_.close();
if (file_appender_.is_opened() && OB_FAIL(file_appender_.fsync())) {
LOG_WARN("failed to do fsync", K(ret));
} else {
file_appender_.close();
}
} else if (OB_FAIL(storage_appender_.close())) {
LOG_WARN("fail to close storage appender", K(ret), K(url_), K(access_info_));
}

View File

@ -2131,20 +2131,19 @@ int ObSelectIntoOp::get_parquet_logical_type(std::shared_ptr<const parquet::Logi
logical_type = parquet::LogicalType::None();
} else if (ob_is_number_or_decimal_int_tc(obj_type)) {
logical_type = parquet::LogicalType::Decimal(precision, scale);
} else if (ObDateTimeType == obj_type && is_mysql_mode()) {
} else if (ObDateTimeType == obj_type) {
logical_type = parquet::LogicalType::Timestamp(false, parquet::LogicalType::TimeUnit::MICROS);
} else if (ObTimestampType == obj_type) {
logical_type = parquet::LogicalType::Timestamp(true, parquet::LogicalType::TimeUnit::MICROS);
} else if (ObTimestampNanoType == obj_type || ObTimestampLTZType == obj_type
|| ObTimestampTZType == obj_type) {
} else if (ObTimestampNanoType == obj_type || ObTimestampLTZType == obj_type) {
logical_type = parquet::LogicalType::None();
} else if (ob_is_date_tc(obj_type) || (ObDateTimeType == obj_type && is_oracle_mode())) {
} else if (ob_is_date_tc(obj_type)) {
logical_type = parquet::LogicalType::Date();
} else if (ob_is_time_tc(obj_type)) {
logical_type = parquet::LogicalType::Time(false, parquet::LogicalType::TimeUnit::MICROS);
} else if (ob_is_year_tc(obj_type)) {
logical_type = parquet::LogicalType::Int(8, false);
} else if (ob_is_string_type(obj_type) || ObNullType == obj_type) {
} else if (ob_is_string_type(obj_type) || ObNullType == obj_type || ObRawType == obj_type) {
logical_type = parquet::LogicalType::String();
} else if (ob_is_bit_tc(obj_type) /*uint64_t*/) {
logical_type = parquet::LogicalType::Int(64, false);
@ -2167,14 +2166,13 @@ int ObSelectIntoOp::get_parquet_physical_type(parquet::Type::type &physical_type
|| ObMediumIntType == obj_type || ObInt32Type == obj_type
|| ObUTinyIntType == obj_type || ObUSmallIntType == obj_type
|| ObUMediumIntType == obj_type || ObUInt32Type == obj_type
|| ob_is_date_tc(obj_type) || ob_is_year_tc(obj_type)
|| (ObDateTimeType == obj_type && is_oracle_mode())) {
|| ob_is_date_tc(obj_type) || ob_is_year_tc(obj_type)) {
physical_type = parquet::Type::INT32;
} else if (ObIntType == obj_type || ObUInt64Type == obj_type
|| ob_is_datetime_tc(obj_type) || ob_is_time_tc(obj_type)
|| ob_is_bit_tc(obj_type)) {
physical_type = parquet::Type::INT64;
} else if (ob_is_otimestampe_tc(obj_type)) {
} else if (ObTimestampNanoType == obj_type || ObTimestampLTZType == obj_type) {
physical_type = parquet::Type::INT96;
} else if (ob_is_float_tc(obj_type)) { // float, ufloat
physical_type = parquet::Type::FLOAT;
@ -2185,7 +2183,7 @@ int ObSelectIntoOp::get_parquet_physical_type(parquet::Type::type &physical_type
} else if (ob_is_string_tc(obj_type) /*varchar,char,varbinary,binary*/
|| ob_is_text_tc(obj_type) /*TinyText,MediumText,Text,LongText,TinyBLOB,MediumBLOB,BLOB,LongBLOB*/
|| ob_is_enum_or_set_type(obj_type)
|| ObNullType == obj_type) {
|| ObNullType == obj_type || ObRawType == obj_type) {
physical_type = parquet::Type::BYTE_ARRAY;
} else {
ret = OB_NOT_SUPPORTED;
@ -2245,14 +2243,11 @@ int ObSelectIntoOp::orc_type_mapping_of_ob_type(ObDatumMeta& meta, int max_lengt
LOG_WARN("unsupport type for orc", K(obj_type), K(int_bytes));
}
}
} else if (ObTimestampType == obj_type || ob_is_otimestamp_type(obj_type)) {
} else if (ObTimestampType == obj_type || ObTimestampLTZType == obj_type) {
orc_type = orc::createPrimitiveType(orc::TypeKind::TIMESTAMP_INSTANT);
// ObTimestampTZType
// ObTimestampLTZType
// ObTimestampNanoType
} else if (ObDateTimeType == obj_type && is_mysql_mode()) {
} else if (ObDateTimeType == obj_type || ObTimestampNanoType == obj_type) {
orc_type = orc::createPrimitiveType(orc::TypeKind::TIMESTAMP);
} else if (ObDateType == obj_type || (ObDateTimeType == obj_type && is_oracle_mode())) {
} else if (ObDateType == obj_type) {
orc_type = orc::createPrimitiveType(orc::TypeKind::DATE);
} else if (ObVarcharType == obj_type && meta.cs_type_ != CS_TYPE_BINARY) {
orc_type = orc::createCharType(orc::TypeKind::VARCHAR, max_length);
@ -2260,11 +2255,10 @@ int ObSelectIntoOp::orc_type_mapping_of_ob_type(ObDatumMeta& meta, int max_lengt
orc_type = orc::createCharType(orc::TypeKind::CHAR, max_length);
} else if (ObYearType == obj_type) {
orc_type = orc::createPrimitiveType(orc::TypeKind::INT);
} else if (ObNullType == obj_type
|| (CS_TYPE_BINARY == meta.cs_type_
&& (ob_is_text_tc(obj_type) || ob_is_string_tc(obj_type) || ObRawType == obj_type))) {
} else if (ObNullType == obj_type || ObRawType == obj_type
|| (CS_TYPE_BINARY == meta.cs_type_ && ob_is_string_type(obj_type))) {
orc_type = orc::createCharType(orc::TypeKind::BINARY, max_length);
} else if (CS_TYPE_BINARY != meta.cs_type_ && (ob_is_text_tc(obj_type) || ob_is_string_tc(obj_type) || ObRawType == obj_type)) { // not binary
} else if (CS_TYPE_BINARY != meta.cs_type_ && ob_is_string_type(obj_type)) { // not binary
orc_type = orc::createCharType(orc::TypeKind::STRING, max_length);
} else {
ret = OB_ERR_UNEXPECTED;
@ -2602,8 +2596,7 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(col_idx), K(row_idx));
} else if (ob_is_integer_type(datum_meta.type_)
|| ObYearType == datum_meta.type_ || ObDateType == datum_meta.type_
|| (ObDateTimeType == datum_meta.type_ && is_oracle_mode())) {
|| ObYearType == datum_meta.type_ || ObDateType == datum_meta.type_) {
orc::LongVectorBatch *long_batch = dynamic_cast<orc::LongVectorBatch *>(col_vector_batch);
if (OB_ISNULL(long_batch)) {
ret = OB_ERR_UNEXPECTED;
@ -2613,9 +2606,7 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
col_vector_batch->notNull[row_offset] = false;
} else {
col_vector_batch->notNull[row_offset] = true;
if (ObDateTimeType == datum_meta.type_ && is_oracle_mode()) {
long_batch->data[row_offset] = expr_vector->get_datetime(row_idx) / 1000000 / 3600 / 24;
} else if (OB_FAIL(get_data_from_expr_vector(expr_vector, row_idx, datum_meta.type_, long_batch->data[row_offset]))) {
if (OB_FAIL(get_data_from_expr_vector(expr_vector, row_idx, datum_meta.type_, long_batch->data[row_offset]))) {
LOG_WARN("faild to get data from expr vector", K(ret), K(col_idx), K(row_idx), K(datum_meta.type_));
}
}
@ -2719,7 +2710,7 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
timestamp_vector_batch->data[row_offset] = out_usec / USECS_PER_SEC;
timestamp_vector_batch->nanoseconds[row_offset] = (out_usec % USECS_PER_SEC) * NSECS_PER_USEC; // usec to nanosecond
}
} else if (ob_is_otimestampe_tc(datum_meta.type_)) {
} else if (ObTimestampNanoType == datum_meta.type_ || ObTimestampLTZType == datum_meta.type_) {
orc::TimestampVectorBatch *timestamp_vector_batch = dynamic_cast<orc::TimestampVectorBatch *>(col_vector_batch);
if (OB_ISNULL(timestamp_vector_batch)) {
ret = OB_ERR_UNEXPECTED;
@ -2729,22 +2720,9 @@ int ObSelectIntoOp::build_orc_cell(const ObDatumMeta &datum_meta,
col_vector_batch->notNull[row_offset] = false;
} else {
col_vector_batch->notNull[row_offset] = true;
if (ObTimestampTZType == datum_meta.type_) {
const ObOTimestampData& rtime = expr_vector->get_otimestamp_tz(row_idx);
int32_t offset_min = 0;
ObTime ob_time(DT_TYPE_ORACLE_TIMESTAMP);
if (OB_FAIL(ObTimeConverter::extract_offset_from_otimestamp(rtime, get_timezone_info(get_exec_ctx().get_my_session()), offset_min, ob_time))) {
LOG_WARN("failed to extract_offset_from_otimestamp", K(ret));
} else {
int64_t out_usec = rtime.time_us_ + MIN_TO_USEC(offset_min);
timestamp_vector_batch->data[row_offset] = out_usec / USECS_PER_SEC; // usec to sec
timestamp_vector_batch->nanoseconds[row_offset] = (out_usec % USECS_PER_SEC) * NSECS_PER_USEC + rtime.time_ctx_.tail_nsec_; // usec to nanosecond
}
} else if (ObTimestampLTZType == datum_meta.type_ || ObTimestampNanoType == datum_meta.type_) {
const ObOTimestampTinyData& rtime = expr_vector->get_otimestamp_tiny(row_idx);
timestamp_vector_batch->data[row_offset] = rtime.time_us_ / USECS_PER_SEC; // usec to sec
timestamp_vector_batch->nanoseconds[row_offset] = (rtime.time_us_ % USECS_PER_SEC) * NSECS_PER_USEC + rtime.to_timestamp_data().time_ctx_.tail_nsec_; // usec to nanosecond
}
const ObOTimestampTinyData& rtime = expr_vector->get_otimestamp_tiny(row_idx);
timestamp_vector_batch->data[row_offset] = rtime.time_us_ / USECS_PER_SEC; // usec to sec
timestamp_vector_batch->nanoseconds[row_offset] = (rtime.time_us_ % USECS_PER_SEC) * NSECS_PER_USEC + rtime.to_timestamp_data().time_ctx_.tail_nsec_; // usec to nanosecond
}
} else {
ret = OB_ERR_UNEXPECTED;
@ -2780,7 +2758,7 @@ int ObSelectIntoOp::check_oracle_number(ObObjType obj_type, int16_t &precision,
if (is_oracle_mode() && ob_is_number_tc(obj_type)) {
if (scale == 0 && precision == -1) {
precision = 38; // oracle int
} else if (precision < 0) {
} else if (precision < 1 || scale < -84) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "number without specified precision and scale");
LOG_WARN("not support number without specified precision and scale", K(ret));
@ -2868,7 +2846,7 @@ int ObSelectIntoOp::calc_byte_array(const common::ObIVector* expr_vector,
uint32_t &res_len)
{
int ret = OB_SUCCESS;
ObString ob_str = expr_vector->get_string(row_idx);
ObString ob_str;
ObString res_str;
bool has_lob_header = obj_meta.has_lob_header();
res_len = 0;
@ -2904,23 +2882,27 @@ int ObSelectIntoOp::oracle_timestamp_to_int96(const common::ObIVector* expr_vect
{
int ret = OB_SUCCESS;
int64_t out_usec = 0;
int32_t tmp_offset = 0;
ObOTimestampData oracle_timestamp;
if (ObTimestampTZType == datum_meta.type_) {
int32_t offset_min = 0;
oracle_timestamp = expr_vector->get_otimestamp_tz(row_idx);
ObTime ob_time(DT_TYPE_ORACLE_TIMESTAMP);
if (OB_FAIL(ObTimeConverter::extract_offset_from_otimestamp(oracle_timestamp,
get_timezone_info(get_exec_ctx().get_my_session()),
offset_min,
ob_time))) {
LOG_WARN("failed to extract_offset_from_otimestamp", K(ret));
} else {
out_usec = oracle_timestamp.time_us_ + MIN_TO_USEC(offset_min);
}
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support this type in parquet", K(ret));
} else if (ObTimestampLTZType == datum_meta.type_ || ObTimestampNanoType == datum_meta.type_) {
oracle_timestamp = expr_vector->get_otimestamp_tiny(row_idx).to_timestamp_data();
out_usec = expr_vector->get_otimestamp_tiny(row_idx).time_us_;
}
// oracle timestamp logical type is none, only stored as utc
// convert nano to utc
if (OB_SUCC(ret) && ObTimestampNanoType == datum_meta.type_) {
if (OB_ISNULL(ctx_.get_my_session()) || OB_ISNULL(ctx_.get_my_session()->get_timezone_info())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(ctx_.get_my_session()->get_timezone_info()->get_timezone_offset(0, tmp_offset))) {
LOG_WARN("failed to get timezone offset", K(ret));
} else {
out_usec -= SEC_TO_USEC(tmp_offset);
}
}
uint32_t julian_date_value = (out_usec / 86400000000LL) + 2440588;
uint64_t nsec_time_value = oracle_timestamp.time_ctx_.tail_nsec_ + std::abs(out_usec % 86400000000LL) * 1000;
res.value[2] = julian_date_value;
@ -3227,11 +3209,7 @@ int ObSelectIntoOp::build_parquet_cell(parquet::RowGroupWriter* rg_writer,
if (expr_vector->is_null(row_idx)) {
definition_levels[row_offset] = null_definition_level;
} else {
if (is_oracle_mode() && ObDateTimeType == datum_meta.type_) {
*value = expr_vector->get_datetime(row_idx) / 1000000 / 3600 / 24;
} else {
*value = expr_vector->get_int32(row_idx);
}
*value = expr_vector->get_int32(row_idx);
value_offset++;
definition_levels[row_offset] = normal_definition_level;
}

View File

@ -324,7 +324,7 @@ ObParquetTableRowIterator::DataLoader::LOAD_FUNC ObParquetTableRowIterator::Data
func = NULL;
}
} else if ((no_log_type || log_type->is_string() || log_type->is_enum())
&& ob_is_string_type(datum_type.type_)) {
&& (ob_is_string_type(datum_type.type_) || ObRawType == datum_type.type_)) {
//convert parquet enum/string to string vector
if (parquet::Type::BYTE_ARRAY == phy_type) {
func = &DataLoader::load_string_col;
@ -425,9 +425,9 @@ ObParquetTableRowIterator::DataLoader::LOAD_FUNC ObParquetTableRowIterator::Data
} else if ((no_log_type || log_type->is_timestamp()) && parquet::Type::INT96 == phy_type
&& (ob_is_otimestamp_type(datum_type.type_) || ObTimestampType == datum_type.type_)) {
func = &DataLoader::load_timestamp_hive;
} else if (no_log_type && parquet::Type::FLOAT == phy_type && ObFloatType == datum_type.type_) {
} else if (no_log_type && parquet::Type::FLOAT == phy_type && ob_is_float_tc(datum_type.type_)) {
func = &DataLoader::load_float;
} else if (no_log_type && parquet::Type::DOUBLE == phy_type && ObDoubleType == datum_type.type_) {
} else if (no_log_type && parquet::Type::DOUBLE == phy_type && ob_is_double_tc(datum_type.type_)) {
func = &DataLoader::load_double;
} else if (log_type->is_interval()
|| log_type->is_map()

View File

@ -5134,13 +5134,22 @@ int ObResolverUtils::build_file_column_expr_for_parquet(
}
if (ob_is_enum_or_set_type(column_expr->get_data_type())
|| ob_is_text_tc(column_expr->get_data_type())) {
file_column_expr->set_data_type(ObVarcharType);
if (is_oracle_mode() && CS_TYPE_BINARY == column_expr->get_collation_type()) {
file_column_expr->set_data_type(ObRawType);
} else if (is_mysql_mode() && ob_is_enum_or_set_type(column_expr->get_data_type())) {
file_column_expr->set_data_type(ObCharType);
} else {
file_column_expr->set_data_type(ObVarcharType);
}
if (is_mysql_mode()) {
file_column_expr->set_length(OB_MAX_MYSQL_VARCHAR_LENGTH);
} else {
file_column_expr->set_length(OB_MAX_ORACLE_VARCHAR_LENGTH);
}
}
if (ob_is_number_tc(column_expr->get_data_type())) {
file_column_expr->set_data_type(ObDecimalIntType);
}
}
} else {
ret = OB_ERR_UNEXPECTED;
@ -5544,7 +5553,7 @@ int ObResolverUtils::resolve_generated_column_expr(ObResolverParams &params,
const ObCollationType dst_cs_type = generated_column.get_collation_type();
/* implicit data conversion judgement */
if (OB_SUCC(ret) && lib::is_oracle_mode()) {
if (OB_SUCC(ret) && lib::is_oracle_mode() && !tbl_schema.is_external_table()) {
if (!cast_supported(expr_datatype,
expr_cs_type,
dst_datatype,