diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp index 2684576f5d..9bfbd962b0 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -243,7 +243,7 @@ Status VFileResultWriter::append_block(Block& block) { RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, block, &output_block)); if (_vfile_writer) { - _write_file(output_block); + RETURN_IF_ERROR(_write_file(output_block)); } else { RETURN_IF_ERROR(_write_csv_file(output_block)); } diff --git a/be/src/vec/runtime/vorc_writer.cpp b/be/src/vec/runtime/vorc_writer.cpp index 665ec27429..f3804a6b62 100644 --- a/be/src/vec/runtime/vorc_writer.cpp +++ b/be/src/vec/runtime/vorc_writer.cpp @@ -165,9 +165,69 @@ void VOrcWriterWrapper::close() { RETURN_WRONG_TYPE \ } +#define WRITE_LARGEINT_STRING_INTO_BATCH(VECTOR_BATCH, COLUMN) \ + VECTOR_BATCH* cur_batch = dynamic_cast(root->fields[i]); \ + size_t begin_off = offset; \ + if (null_map != nullptr) { \ + cur_batch->hasNulls = true; \ + auto& null_data = assert_cast(*null_map).get_data(); \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->notNull[row_id] = 1; \ + auto value = assert_cast(*col).get_data()[row_id]; \ + std::string value_str = fmt::format("{}", value); \ + size_t len = value_str.size(); \ + while (buffer.size < offset + len) { \ + char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, buffer.data, buffer.size); \ + free(const_cast(buffer.data)); \ + buffer.data = new_ptr; \ + buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + } \ + strcpy(const_cast(buffer.data) + offset, value_str.c_str()); \ + offset += len; \ + cur_batch->length[row_id] = len; \ + } \ + } \ + size_t data_off = 0; \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->data[row_id] = const_cast(buffer.data) + begin_off + data_off; \ + data_off += cur_batch->length[row_id]; \ + } \ + } \ + } else if (const auto& not_null_column = check_and_get_column(col)) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + auto value = not_null_column->get_data()[row_id]; \ + std::string value_str = fmt::format("{}", value); \ + size_t len = value_str.size(); \ + while (buffer.size < offset + len) { \ + char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, buffer.data, buffer.size); \ + free(const_cast(buffer.data)); \ + buffer.data = new_ptr; \ + buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + } \ + strcpy(const_cast(buffer.data) + offset, value_str.c_str()); \ + offset += len; \ + cur_batch->length[row_id] = len; \ + } \ + size_t data_off = 0; \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + cur_batch->data[row_id] = const_cast(buffer.data) + begin_off + data_off; \ + data_off += cur_batch->length[row_id]; \ + } \ + } else { \ + RETURN_WRONG_TYPE \ + } + #define WRITE_DATE_STRING_INTO_BATCH(FROM, TO) \ orc::StringVectorBatch* cur_batch = dynamic_cast(root->fields[i]); \ - size_t offset = 0; \ + size_t begin_off = offset; \ if (null_map != nullptr) { \ cur_batch->hasNulls = true; \ auto& null_data = assert_cast(*null_map).get_data(); \ @@ -178,7 +238,7 @@ void VOrcWriterWrapper::close() { cur_batch->notNull[row_id] = 1; \ int len = binary_cast( \ assert_cast&>(*col).get_data()[row_id]) \ - .to_buffer(const_cast(buffer.data)); \ + .to_buffer(const_cast(buffer.data) + offset); \ while (buffer.size < offset + len) { \ char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ memcpy(new_ptr, buffer.data, buffer.size); \ @@ -190,20 +250,20 @@ void VOrcWriterWrapper::close() { offset += len; \ } \ } \ - offset = 0; \ + size_t data_off = 0; \ for (size_t row_id = 0; row_id < sz; row_id++) { \ if (null_data[row_id] != 0) { \ cur_batch->notNull[row_id] = 0; \ } else { \ - cur_batch->data[row_id] = const_cast(buffer.data) + offset; \ - offset += cur_batch->length[row_id]; \ + cur_batch->data[row_id] = const_cast(buffer.data) + begin_off + data_off; \ + data_off += cur_batch->length[row_id]; \ } \ } \ } else if (const auto& not_null_column = \ check_and_get_column>(col)) { \ for (size_t row_id = 0; row_id < sz; row_id++) { \ int len = binary_cast(not_null_column->get_data()[row_id]) \ - .to_buffer(const_cast(buffer.data)); \ + .to_buffer(const_cast(buffer.data) + offset); \ while (buffer.size < offset + len) { \ char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ memcpy(new_ptr, buffer.data, buffer.size); \ @@ -214,10 +274,71 @@ void VOrcWriterWrapper::close() { cur_batch->length[row_id] = len; \ offset += len; \ } \ - offset = 0; \ + size_t data_off = 0; \ for (size_t row_id = 0; row_id < sz; row_id++) { \ - cur_batch->data[row_id] = const_cast(buffer.data) + offset; \ - offset += cur_batch->length[row_id]; \ + cur_batch->data[row_id] = const_cast(buffer.data) + begin_off + data_off; \ + data_off += cur_batch->length[row_id]; \ + } \ + } else { \ + RETURN_WRONG_TYPE \ + } + +#define WRITE_DATETIMEV2_STRING_INTO_BATCH(FROM, TO) \ + orc::StringVectorBatch* cur_batch = dynamic_cast(root->fields[i]); \ + size_t begin_off = offset; \ + if (null_map != nullptr) { \ + cur_batch->hasNulls = true; \ + auto& null_data = assert_cast(*null_map).get_data(); \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->notNull[row_id] = 1; \ + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; \ + int len = \ + binary_cast( \ + assert_cast&>(*col).get_data()[row_id]) \ + .to_buffer(const_cast(buffer.data) + offset, output_scale); \ + while (buffer.size < offset + len) { \ + char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, buffer.data, buffer.size); \ + free(const_cast(buffer.data)); \ + buffer.data = new_ptr; \ + buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + } \ + cur_batch->length[row_id] = len; \ + offset += len; \ + } \ + } \ + size_t data_off = 0; \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->data[row_id] = const_cast(buffer.data) + begin_off + data_off; \ + data_off += cur_batch->length[row_id]; \ + } \ + } \ + } else if (const auto& not_null_column = \ + check_and_get_column>(col)) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; \ + int len = binary_cast(not_null_column->get_data()[row_id]) \ + .to_buffer(const_cast(buffer.data) + offset, output_scale); \ + while (buffer.size < offset + len) { \ + char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, buffer.data, buffer.size); \ + free(const_cast(buffer.data)); \ + buffer.data = new_ptr; \ + buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + } \ + cur_batch->length[row_id] = len; \ + offset += len; \ + } \ + size_t data_off = 0; \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + cur_batch->data[row_id] = const_cast(buffer.data) + begin_off + data_off; \ + data_off += cur_batch->length[row_id]; \ } \ } else { \ RETURN_WRONG_TYPE \ @@ -279,6 +400,7 @@ Status VOrcWriterWrapper::write(const Block& block) { // Buffer used by date type char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); StringRef buffer(ptr, BUFFER_UNIT_SIZE); + size_t offset = 0; size_t sz = block.rows(); auto row_batch = _create_row_batch(sz); @@ -327,7 +449,9 @@ Status VOrcWriterWrapper::write(const Block& block) { break; } case TYPE_LARGEINT: { - return Status::InvalidArgument("do not support large int type."); + WRITE_LARGEINT_STRING_INTO_BATCH(orc::StringVectorBatch, ColumnVector) + SET_NUM_ELEMENTS; + break; } case TYPE_FLOAT: { WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::DoubleVectorBatch, ColumnVector) @@ -352,68 +476,7 @@ Status VOrcWriterWrapper::write(const Block& block) { break; } case TYPE_DATETIMEV2: { - orc::StringVectorBatch* cur_batch = - dynamic_cast(root->fields[i]); - size_t offset = 0; - if (null_map != nullptr) { - cur_batch->hasNulls = true; - auto& null_data = assert_cast(*null_map).get_data(); - for (size_t row_id = 0; row_id < sz; row_id++) { - if (null_data[row_id] != 0) { - cur_batch->notNull[row_id] = 0; - } else { - cur_batch->notNull[row_id] = 1; - int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; - int len = binary_cast>( - assert_cast&>(*col) - .get_data()[row_id]) - .to_buffer(const_cast(buffer.data), - output_scale); - while (buffer.size < offset + len) { - char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); - memcpy(new_ptr, buffer.data, buffer.size); - free(const_cast(buffer.data)); - buffer.data = new_ptr; - buffer.size = buffer.size + BUFFER_UNIT_SIZE; - } - cur_batch->length[row_id] = len; - offset += len; - } - } - offset = 0; - for (size_t row_id = 0; row_id < sz; row_id++) { - if (null_data[row_id] != 0) { - cur_batch->notNull[row_id] = 0; - } else { - cur_batch->data[row_id] = const_cast(buffer.data) + offset; - offset += cur_batch->length[row_id]; - } - } - } else if (const auto& not_null_column = - check_and_get_column>(col)) { - for (size_t row_id = 0; row_id < sz; row_id++) { - int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; - int len = binary_cast>( - not_null_column->get_data()[row_id]) - .to_buffer(const_cast(buffer.data), output_scale); - while (buffer.size < offset + len) { - char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); - memcpy(new_ptr, buffer.data, buffer.size); - free(const_cast(buffer.data)); - buffer.data = new_ptr; - buffer.size = buffer.size + BUFFER_UNIT_SIZE; - } - cur_batch->length[row_id] = len; - offset += len; - } - offset = 0; - for (size_t row_id = 0; row_id < sz; row_id++) { - cur_batch->data[row_id] = const_cast(buffer.data) + offset; - offset += cur_batch->length[row_id]; - } - } else { - RETURN_WRONG_TYPE - } + WRITE_DATETIMEV2_STRING_INTO_BATCH(UInt64, DateV2Value) SET_NUM_ELEMENTS break; } diff --git a/be/src/vec/runtime/vparquet_writer.cpp b/be/src/vec/runtime/vparquet_writer.cpp index ea25a1bc5c..c22f110519 100644 --- a/be/src/vec/runtime/vparquet_writer.cpp +++ b/be/src/vec/runtime/vparquet_writer.cpp @@ -24,14 +24,18 @@ #include #include #include +#include #include #include +#include #include #include #include +#include "common/status.h" #include "io/fs/file_writer.h" +#include "olap/olap_common.h" #include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" #include "runtime/types.h" @@ -58,6 +62,10 @@ namespace doris::vectorized { +static const std::string epoch_date_str = "1970-01-01"; +static const int64_t timestamp_threshold = -2177481943; +static const int64_t timestamp_diff = 343; + ParquetOutputStream::ParquetOutputStream(doris::io::FileWriter* file_writer) : _file_writer(file_writer), _cur_pos(0), _written_len(0) { set_mode(arrow::io::FileMode::WRITE); @@ -170,6 +178,25 @@ void ParquetBuildHelper::build_schema_data_type(parquet::Type::type& parquet_dat } } +void ParquetBuildHelper::build_schema_data_logical_type( + std::shared_ptr& parquet_data_logical_type_ptr, + const TParquetDataLogicalType::type& column_data_logical_type) { + switch (column_data_logical_type) { + case TParquetDataLogicalType::DATE: { + parquet_data_logical_type_ptr = parquet::LogicalType::Date(); + break; + } + case TParquetDataLogicalType::TIMESTAMP: { + parquet_data_logical_type_ptr = + parquet::LogicalType::Timestamp(true, parquet::LogicalType::TimeUnit::MILLIS, true); + break; + } + default: { + parquet_data_logical_type_ptr = parquet::LogicalType::None(); + } + } +} + void ParquetBuildHelper::build_compression_type( parquet::WriterProperties::Builder& builder, const TParquetCompressionType::type& compression_type) { @@ -234,41 +261,56 @@ VParquetWriterWrapper::VParquetWriterWrapper(doris::io::FileWriter* file_writer, const bool& parquet_disable_dictionary, const TParquetVersion::type& parquet_version, bool output_object_data) - : VFileWriterWrapper(output_vexpr_ctxs, output_object_data), _rg_writer(nullptr) { + : VFileWriterWrapper(output_vexpr_ctxs, output_object_data), + _rg_writer(nullptr), + _parquet_schemas(parquet_schemas), + _compression_type(compression_type), + _parquet_disable_dictionary(parquet_disable_dictionary), + _parquet_version(parquet_version) { _outstream = std::shared_ptr(new ParquetOutputStream(file_writer)); - parse_properties(compression_type, parquet_disable_dictionary, parquet_version); - parse_schema(parquet_schemas); } -void VParquetWriterWrapper::parse_properties(const TParquetCompressionType::type& compression_type, - const bool& parquet_disable_dictionary, - const TParquetVersion::type& parquet_version) { - parquet::WriterProperties::Builder builder; - ParquetBuildHelper::build_compression_type(builder, compression_type); - ParquetBuildHelper::build_version(builder, parquet_version); - if (parquet_disable_dictionary) { - builder.disable_dictionary(); - } else { - builder.enable_dictionary(); +Status VParquetWriterWrapper::parse_properties() { + try { + parquet::WriterProperties::Builder builder; + ParquetBuildHelper::build_compression_type(builder, _compression_type); + ParquetBuildHelper::build_version(builder, _parquet_version); + if (_parquet_disable_dictionary) { + builder.disable_dictionary(); + } else { + builder.enable_dictionary(); + } + _properties = builder.build(); + } catch (const parquet::ParquetException& e) { + return Status::InternalError("parquet writer parse properties error: {}", e.what()); } - _properties = builder.build(); + return Status::OK(); } -void VParquetWriterWrapper::parse_schema(const std::vector& parquet_schemas) { +Status VParquetWriterWrapper::parse_schema() { parquet::schema::NodeVector fields; parquet::Repetition::type parquet_repetition_type; parquet::Type::type parquet_data_type; - for (int idx = 0; idx < parquet_schemas.size(); ++idx) { + std::shared_ptr parquet_data_logical_type; + for (int idx = 0; idx < _parquet_schemas.size(); ++idx) { ParquetBuildHelper::build_schema_repetition_type( - parquet_repetition_type, parquet_schemas[idx].schema_repetition_type); + parquet_repetition_type, _parquet_schemas[idx].schema_repetition_type); ParquetBuildHelper::build_schema_data_type(parquet_data_type, - parquet_schemas[idx].schema_data_type); - fields.push_back(parquet::schema::PrimitiveNode::Make( - parquet_schemas[idx].schema_column_name, parquet_repetition_type, - parquet::LogicalType::None(), parquet_data_type)); + _parquet_schemas[idx].schema_data_type); + ParquetBuildHelper::build_schema_data_logical_type( + parquet_data_logical_type, _parquet_schemas[idx].schema_data_logical_type); + try { + fields.push_back(parquet::schema::PrimitiveNode::Make( + _parquet_schemas[idx].schema_column_name, parquet_repetition_type, + parquet_data_logical_type, parquet_data_type)); + } catch (const parquet::ParquetException& e) { + LOG(WARNING) << "parquet writer parse schema error: " << e.what(); + return Status::InternalError("parquet writer parse schema error: {}", e.what()); + } _schema = std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields)); } + return Status::OK(); } #define RETURN_WRONG_TYPE \ @@ -393,7 +435,40 @@ Status VParquetWriterWrapper::write(const Block& block) { break; } case TYPE_LARGEINT: { - return Status::InvalidArgument("do not support large int type."); + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::ByteArrayWriter* col_writer = + static_cast(rgWriter->column(i)); + parquet::ByteArray value; + if (null_map != nullptr) { + auto& null_data = assert_cast(*null_map).get_data(); + for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + single_def_level = 0; + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + single_def_level = 1; + } else { + const int128_t tmp = assert_cast&>(*col) + .get_data()[row_id]; + std::string value_str = fmt::format("{}", tmp); + value.ptr = reinterpret_cast(value_str.data()); + value.len = value_str.length(); + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + } + } + } else if (const auto* not_nullable_column = + check_and_get_column>(col)) { + for (size_t row_id = 0; row_id < sz; row_id++) { + const int128_t tmp = not_nullable_column->get_data()[row_id]; + std::string value_str = fmt::format("{}", tmp); + value.ptr = reinterpret_cast(value_str.data()); + value.len = value_str.length(); + col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, + &value); + } + } else { + RETURN_WRONG_TYPE + } + break; } case TYPE_FLOAT: { DISPATCH_PARQUET_NUMERIC_WRITER(FloatWriter, ColumnVector, float_t) @@ -458,7 +533,67 @@ Status VParquetWriterWrapper::write(const Block& block) { DISPATCH_PARQUET_NUMERIC_WRITER(Int32Writer, ColumnVector, Int32) break; } - case TYPE_DATETIME: + case TYPE_DATETIME: { + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int64Writer* col_writer = + static_cast(rgWriter->column(i)); + uint64_t default_int64 = 0; + if (null_map != nullptr) { + auto& null_data = assert_cast(*null_map).get_data(); + for (size_t row_id = 0; row_id < sz; row_id++) { + def_level[row_id] = null_data[row_id] == 0; + } + int64_t tmp_data[sz]; + for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + tmp_data[row_id] = default_int64; + } else { + VecDateTimeValue datetime_value = binary_cast( + assert_cast&>(*col) + .get_data()[row_id]); + if (!datetime_value.unix_timestamp(&tmp_data[row_id], + TimezoneUtils::default_time_zone)) { + return Status::InternalError("get unix timestamp error."); + } + // -2177481943 represent '1900-12-31 23:54:17' + // but -2177481944 represent '1900-12-31 23:59:59' + // so for timestamp <= -2177481944, we subtract 343 (5min 43s) + if (tmp_data[row_id] < timestamp_threshold) { + tmp_data[row_id] -= timestamp_diff; + } + // convert seconds to MILLIS seconds + tmp_data[row_id] *= 1000; + } + } + col_writer->WriteBatch(sz, def_level.data(), nullptr, + reinterpret_cast(tmp_data)); + } else if (const auto* not_nullable_column = + check_and_get_column>(col)) { + std::vector res(sz); + for (size_t row_id = 0; row_id < sz; row_id++) { + VecDateTimeValue datetime_value = binary_cast( + not_nullable_column->get_data()[row_id]); + + if (!datetime_value.unix_timestamp(&res[row_id], + TimezoneUtils::default_time_zone)) { + return Status::InternalError("get unix timestamp error."); + }; + // -2177481943 represent '1900-12-31 23:54:17' + // but -2177481944 represent '1900-12-31 23:59:59' + // so for timestamp <= -2177481944, we subtract 343 (5min 43s) + if (res[row_id] < timestamp_threshold) { + res[row_id] -= timestamp_diff; + } + // convert seconds to MILLIS seconds + res[row_id] *= 1000; + } + col_writer->WriteBatch(sz, nullable ? def_level.data() : nullptr, nullptr, + reinterpret_cast(res.data())); + } else { + RETURN_WRONG_TYPE + } + break; + } case TYPE_DATE: { parquet::RowGroupWriter* rgWriter = get_rg_writer(); parquet::Int64Writer* col_writer = @@ -469,26 +604,40 @@ Status VParquetWriterWrapper::write(const Block& block) { for (size_t row_id = 0; row_id < sz; row_id++) { def_level[row_id] = null_data[row_id] == 0; } - uint64_t tmp_data[sz]; + VecDateTimeValue epoch_date; + if (!epoch_date.from_date_str(epoch_date_str.c_str(), + epoch_date_str.length())) { + return Status::InternalError("create epoch date from string error"); + } + int32_t days_from_epoch = epoch_date.daynr(); + int32_t tmp_data[sz]; for (size_t row_id = 0; row_id < sz; row_id++) { if (null_data[row_id] != 0) { tmp_data[row_id] = default_int64; } else { - tmp_data[row_id] = binary_cast( - assert_cast&>(*col) - .get_data()[row_id]) - .to_olap_datetime(); + int32_t days = binary_cast( + assert_cast&>(*col) + .get_data()[row_id]) + .daynr(); + tmp_data[row_id] = days - days_from_epoch; } } col_writer->WriteBatch(sz, def_level.data(), nullptr, reinterpret_cast(tmp_data)); - } else if (const auto* not_nullable_column = - check_and_get_column>(col)) { - std::vector res(sz); + } else if (check_and_get_column>(col)) { + VecDateTimeValue epoch_date; + if (!epoch_date.from_date_str(epoch_date_str.c_str(), + epoch_date_str.length())) { + return Status::InternalError("create epoch date from string error"); + } + int32_t days_from_epoch = epoch_date.daynr(); + std::vector res(sz); for (size_t row_id = 0; row_id < sz; row_id++) { - res[row_id] = binary_cast( - not_nullable_column->get_data()[row_id]) - .to_olap_datetime(); + int32_t days = binary_cast( + assert_cast&>(*col) + .get_data()[row_id]) + .daynr(); + res[row_id] = days - days_from_epoch; } col_writer->WriteBatch(sz, nullable ? def_level.data() : nullptr, nullptr, reinterpret_cast(res.data())); @@ -669,7 +818,14 @@ Status VParquetWriterWrapper::write(const Block& block) { } Status VParquetWriterWrapper::prepare() { - _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, _properties); + RETURN_IF_ERROR(parse_properties()); + RETURN_IF_ERROR(parse_schema()); + try { + _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, _properties); + } catch (const parquet::ParquetStatusException& e) { + LOG(WARNING) << "parquet file writer open error: " << e.what(); + return Status::InternalError("parquet file writer open error: {}", e.what()); + } if (_writer == nullptr) { return Status::InternalError("Failed to create file writer"); } @@ -698,7 +854,9 @@ void VParquetWriterWrapper::close() { _rg_writer->Close(); _rg_writer = nullptr; } - _writer->Close(); + if (_writer != nullptr) { + _writer->Close(); + } arrow::Status st = _outstream->Close(); if (!st.ok()) { LOG(WARNING) << "close parquet file error: " << st.ToString(); diff --git a/be/src/vec/runtime/vparquet_writer.h b/be/src/vec/runtime/vparquet_writer.h index 419b4948ef..7d28f35cfb 100644 --- a/be/src/vec/runtime/vparquet_writer.h +++ b/be/src/vec/runtime/vparquet_writer.h @@ -86,6 +86,9 @@ public: static void build_version(parquet::WriterProperties::Builder& builder, const TParquetVersion::type& parquet_version); + static void build_schema_data_logical_type( + std::shared_ptr& parquet_data_logical_type_ptr, + const TParquetDataLogicalType::type& column_data_logical_type); }; class VFileWriterWrapper { @@ -134,11 +137,9 @@ public: private: parquet::RowGroupWriter* get_rg_writer(); - void parse_schema(const std::vector& parquet_schemas); + Status parse_schema(); - void parse_properties(const TParquetCompressionType::type& compression_type, - const bool& parquet_disable_dictionary, - const TParquetVersion::type& parquet_version); + Status parse_properties(); private: std::shared_ptr _outstream; @@ -147,6 +148,11 @@ private: std::unique_ptr _writer; parquet::RowGroupWriter* _rg_writer; const int64_t _max_row_per_group = 10; + + const std::vector& _parquet_schemas; + const TParquetCompressionType::type& _compression_type; + const bool& _parquet_disable_dictionary; + const TParquetVersion::type& _parquet_version; }; } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 6a17626931..c3a98b7b7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -35,6 +35,7 @@ import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TParquetCompressionType; +import org.apache.doris.thrift.TParquetDataLogicalType; import org.apache.doris.thrift.TParquetDataType; import org.apache.doris.thrift.TParquetRepetitionType; import org.apache.doris.thrift.TParquetSchema; @@ -65,6 +66,7 @@ public class OutFileClause { public static final List RESULT_COL_TYPES = Lists.newArrayList(); public static final Map PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_DATA_TYPE_MAP = Maps.newHashMap(); + public static final Map PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_VERSION_MAP = Maps.newHashMap(); public static final Set ORC_DATA_TYPE = Sets.newHashSet(); @@ -97,6 +99,11 @@ public class OutFileClause { PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE); PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", TParquetDataType.FIXED_LEN_BYTE_ARRAY); + PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("date", TParquetDataLogicalType.DATE); + PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("datetime", TParquetDataLogicalType.TIMESTAMP); + // TODO(ftw): add other logical type + PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("none", TParquetDataLogicalType.NONE); + PARQUET_COMPRESSION_TYPE_MAP.put("snappy", TParquetCompressionType.SNAPPY); PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP); PARQUET_COMPRESSION_TYPE_MAP.put("brotli", TParquetCompressionType.BROTLI); @@ -283,6 +290,7 @@ public class OutFileClause { } type = "string"; break; + case LARGEINT: case DATE: case DATETIME: case DATETIMEV2: @@ -354,6 +362,7 @@ public class OutFileClause { + " but the type of column " + i + " is " + schema.second); } break; + case LARGEINT: case DATE: case DATETIME: case DATETIMEV2: @@ -420,13 +429,13 @@ public class OutFileClause { case TINYINT: case SMALLINT: case INT: + case DATE: if (!PARQUET_DATA_TYPE_MAP.get("int32").equals(type)) { throw new AnalysisException("project field type is TINYINT/SMALLINT/INT," + "should use int32, " + "but the definition type of column " + i + " is " + type); } break; case BIGINT: - case DATE: case DATETIME: if (!PARQUET_DATA_TYPE_MAP.get("int64").equals(type)) { throw new AnalysisException("project field type is BIGINT/DATE/DATETIME," @@ -454,9 +463,10 @@ public class OutFileClause { case DECIMALV2: case DATETIMEV2: case DATEV2: + case LARGEINT: if (!PARQUET_DATA_TYPE_MAP.get("byte_array").equals(type)) { throw new AnalysisException("project field type is CHAR/VARCHAR/STRING/DECIMAL/DATEV2" - + "/DATETIMEV2, should use byte_array, but the definition type of column " + + "/DATETIMEV2/LARGEINT, should use byte_array, but the definition type of column " + i + " is " + type); } break; @@ -497,10 +507,10 @@ public class OutFileClause { case TINYINT: case SMALLINT: case INT: + case DATE: parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("int32"); break; case BIGINT: - case DATE: case DATETIME: parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("int64"); break; @@ -519,6 +529,7 @@ public class OutFileClause { case DECIMAL128: case DATETIMEV2: case DATEV2: + case LARGEINT: parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("byte_array"); break; case HLL: @@ -532,6 +543,18 @@ public class OutFileClause { throw new AnalysisException("currently parquet do not support column type: " + expr.getType().getPrimitiveType()); } + + switch (expr.getType().getPrimitiveType()) { + case DATE: + parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("date"); + break; + case DATETIME: + parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("datetime"); + break; + default: + parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("none"); + } + parquetSchema.schema_column_name = colLabels.get(i); parquetSchemas.add(parquetSchema); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java index b8e7bae590..692eea7657 100755 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java @@ -727,7 +727,7 @@ public class SelectStmtTest { SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); // CHECKSTYLE IGNORE THIS LINE } catch (Exception e) { e.printStackTrace(); - Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT")); + Assert.assertTrue(e.getMessage().contains("should use byte_array")); } // do not support large int type, contains function @@ -740,7 +740,7 @@ public class SelectStmtTest { + "\"schema\"=\"required,int32,siteid;\");"; SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); // CHECKSTYLE IGNORE THIS LINE } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT")); + Assert.assertTrue(e.getMessage().contains("should use byte_array")); } // support cast diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 9659b5227c..f2a191a777 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -70,6 +70,25 @@ enum TParquetDataType { FIXED_LEN_BYTE_ARRAY, } +enum TParquetDataLogicalType { + UNDEFINED = 0, // Not a real logical type + STRING = 1, + MAP, + LIST, + ENUM, + DECIMAL, + DATE, + TIME, + TIMESTAMP, + INTERVAL, + INT, + NIL, // Thrift NullType: annotates data that is always null + JSON, + BSON, + UUID, + NONE // Not a real logical type; should always be last element + } + enum TParquetRepetitionType { REQUIRED, REPEATED, @@ -80,6 +99,7 @@ struct TParquetSchema { 1: optional TParquetRepetitionType schema_repetition_type 2: optional TParquetDataType schema_data_type 3: optional string schema_column_name + 4: optional TParquetDataLogicalType schema_data_logical_type } struct TResultFileSinkOptions { diff --git a/regression-test/data/export_p0/test_export_data_types.out b/regression-test/data/export_p0/test_export_data_types.out index e66d811fbc..37c038818a 100644 --- a/regression-test/data/export_p0/test_export_data_types.out +++ b/regression-test/data/export_p0/test_export_data_types.out @@ -1,36 +1,37 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select_export1 -- 1 2023-04-20 2023-04-20 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 1 1 true 1 1 1 1.1 1.1 char1 1 1 1 0.1 1.00000000 1.0000000000 1 1.0000000000000000000000000000000000000 0.10000000000000000000000000000000000000 -2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 2 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 -3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 3 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 +2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 -170141183460469231731687303715884105728 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 +3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 170141183460469231731687303715884105727 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 4 0000-01-01 0000-01-01 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 4 4 true 4 4 4 4.4 4.4 char4 4 4 4 0.4 4.00000000 4.0000000000 4 4.0000000000000000000000000000000000000 0.40000000000000000000000000000000000000 -- !select_load1 -- 1 2023-04-20 2023-04-20 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 1 1 true 1 1 1 1.1 1.1 char1 1 1 1 0.1 1.00000000 1.0000000000 1 1.0000000000000000000000000000000000000 0.10000000000000000000000000000000000000 -2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 2 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 -3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 3 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 +2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 -170141183460469231731687303715884105728 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 +3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 170141183460469231731687303715884105727 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 4 0000-01-01 0000-01-01 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 4 4 true 4 4 4 4.4 4.4 char4 4 4 4 0.4 4.00000000 4.0000000000 4 4.0000000000000000000000000000000000000 0.40000000000000000000000000000000000000 -- !select_load2 -- -1 2023-04-20 2023-04-20 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 1 1 true 1 1 1.1 1.1 char1 1 1 1 0.1 1.00000000 1.0000000000 1 1.0000000000000000000000000000000000000 0.10000000000000000000000000000000000000 -2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 -3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 +1 2023-04-20 2023-04-20 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 1 1 true 1 1 1 1.1 1.1 char1 1 1 1 0.1 1.00000000 1.0000000000 1 1.0000000000000000000000000000000000000 0.10000000000000000000000000000000000000 +2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 -170141183460469231731687303715884105728 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 +3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 170141183460469231731687303715884105727 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 +4 0000-01-02 0000-01-01 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 4 4 true 4 4 4 4.4 4.4 char4 4 4 4 0.4 4.00000000 4.0000000000 4 4.0000000000000000000000000000000000000 0.40000000000000000000000000000000000000 -- !select_load3 -- -1 2023-04-20 Beijing Haidian 1 1 true 1 1 1.1 1.1 char1 1 -2 9999-12-31 Haidian -32768 -128 true -2147483648 -9223372036854775808 1.4E-45 4.9E-324 char2 100000000 -3 2023-04-21 Beijing 32767 127 true 2147483647 9223372036854775807 3.4028235e+38 1.7976931348623157E308 char3 999999999 -4 0000-01-01 Beijing Haidian 4 4 true 4 4 4.4 4.4 char4 4 +1 2023-04-20 2023-04-20 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 1 1 true 1 1 1 1.1 1.1 char1 1 1 1 0.1 1.00000000 1.0000000000 1 1.0000000000000000000000000000000000000 0.10000000000000000000000000000000000000 +2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 -170141183460469231731687303715884105728 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 +3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 170141183460469231731687303715884105727 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 +4 0000-01-01 0000-01-01 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 4 4 true 4 4 4 4.4 4.4 char4 4 4 4 0.4 4.00000000 4.0000000000 4 4.0000000000000000000000000000000000000 0.40000000000000000000000000000000000000 -- !select_load4 -- 1 2023-04-20 2023-04-20 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 1 1 true 1 1 1 1.1 1.1 char1 1 1 1 0.1 1.00000000 1.0000000000 1 1.0000000000000000000000000000000000000 0.10000000000000000000000000000000000000 -2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 2 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 -3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 3 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 +2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 -170141183460469231731687303715884105728 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 +3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 170141183460469231731687303715884105727 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 4 0000-01-01 0000-01-01 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 4 4 true 4 4 4 4.4 4.4 char4 4 4 4 0.4 4.00000000 4.0000000000 4 4.0000000000000000000000000000000000000 0.40000000000000000000000000000000000000 -- !select_load5 -- 1 2023-04-20 2023-04-20 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 1 1 true 1 1 1 1.1 1.1 char1 1 1 1 0.1 1.00000000 1.0000000000 1 1.0000000000000000000000000000000000000 0.10000000000000000000000000000000000000 -2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 2 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 -3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 3 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 +2 9999-12-31 9999-12-31 9999-12-31T23:59:59 9999-12-31T23:59:59 2023-04-20T00:00:00.120 2023-04-20T00:00:00.334400 Haidian -32768 -128 true -2147483648 -9223372036854775808 -170141183460469231731687303715884105728 1.4E-45 4.9E-324 char2 100000000 100000000 4 0.1 0.99999999 9999999999.9999999999 99999999999999999999999999999999999999 9.9999999999999999999999999999999999999 0.99999999999999999999999999999999999999 +3 2023-04-21 2023-04-21 2023-04-20T12:34:56 2023-04-20T00:00 2023-04-20T00:00:00.123 2023-04-20T00:00:00.123456 Beijing 32767 127 true 2147483647 9223372036854775807 170141183460469231731687303715884105727 3.4028235e+38 1.7976931348623157E308 char3 999999999 999999999 9 0.9 9.99999999 1234567890.0123456789 12345678901234567890123456789012345678 1.2345678901234567890123456789012345678 0.12345678901234567890123456789012345678 4 0000-01-01 0000-01-01 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 2023-04-20T00:00 Beijing Haidian 4 4 true 4 4 4 4.4 4.4 char4 4 4 4 0.4 4.00000000 4.0000000000 4 4.0000000000000000000000000000000000000 0.40000000000000000000000000000000000000 diff --git a/regression-test/suites/export_p0/test_export_basic.groovy b/regression-test/suites/export_p0/test_export_basic.groovy index b55bea8a95..07c3168c5d 100644 --- a/regression-test/suites/export_p0/test_export_basic.groovy +++ b/regression-test/suites/export_p0/test_export_basic.groovy @@ -84,6 +84,8 @@ suite("test_export_basic", "p0") { sql """ INSERT INTO ${table_export_name} VALUES ${sb.toString()} """ + def insert_res = sql "show last insert;" + logger.info("insert result: " + insert_res.toString()) qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """ diff --git a/regression-test/suites/export_p0/test_export_csv.groovy b/regression-test/suites/export_p0/test_export_csv.groovy index 7286d1efe6..a8f75f21a4 100644 --- a/regression-test/suites/export_p0/test_export_csv.groovy +++ b/regression-test/suites/export_p0/test_export_csv.groovy @@ -88,6 +88,8 @@ suite("test_export_csv", "p0") { sql """ INSERT INTO ${table_export_name} VALUES ${sb.toString()} """ + def insert_res = sql "show last insert;" + logger.info("insert result: " + insert_res.toString()) qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY user_id; """ diff --git a/regression-test/suites/export_p0/test_export_data_types.groovy b/regression-test/suites/export_p0/test_export_data_types.groovy index 21cb93e733..f23d16c3ec 100644 --- a/regression-test/suites/export_p0/test_export_data_types.groovy +++ b/regression-test/suites/export_p0/test_export_data_types.groovy @@ -54,40 +54,43 @@ suite("test_export_data_types", "p0") { def table_load_name = "test_load_data_types" def outfile_path_prefix = """/tmp/test_export""" - // create table - sql """ DROP TABLE IF EXISTS ${table_export_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_export_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间2", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_1` DATETIMEV2 NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_2` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_3` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `street` STRING COMMENT "用户所在街道", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `largeint_col` largeint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `decimalv3_col` decimalv3 COMMENT "", - `decimalv3_col2` decimalv3(1,0) COMMENT "", - `decimalv3_col3` decimalv3(1,1) COMMENT "", - `decimalv3_col4` decimalv3(9,8) COMMENT "", - `decimalv3_col5` decimalv3(20,10) COMMENT "", - `decimalv3_col6` decimalv3(38,0) COMMENT "", - `decimalv3_col7` decimalv3(38,37) COMMENT "", - `decimalv3_col8` decimalv3(38,38) COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ + def create_table = {table_name -> + sql """ DROP TABLE IF EXISTS ${table_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + `user_id` INT NOT NULL COMMENT "用户id", + `date` DATE NOT NULL COMMENT "数据灌入日期时间", + `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间2", + `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", + `datetimev2_1` DATETIMEV2 NOT NULL COMMENT "数据灌入日期时间", + `datetimev2_2` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间", + `datetimev2_3` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间", + `city` VARCHAR(20) COMMENT "用户所在城市", + `street` STRING COMMENT "用户所在街道", + `age` SMALLINT COMMENT "用户年龄", + `sex` TINYINT COMMENT "用户性别", + `bool_col` boolean COMMENT "", + `int_col` int COMMENT "", + `bigint_col` bigint COMMENT "", + `largeint_col` largeint COMMENT "", + `float_col` float COMMENT "", + `double_col` double COMMENT "", + `char_col` CHAR(10) COMMENT "", + `decimal_col` decimal COMMENT "", + `decimalv3_col` decimalv3 COMMENT "", + `decimalv3_col2` decimalv3(1,0) COMMENT "", + `decimalv3_col3` decimalv3(1,1) COMMENT "", + `decimalv3_col4` decimalv3(9,8) COMMENT "", + `decimalv3_col5` decimalv3(20,10) COMMENT "", + `decimalv3_col6` decimalv3(38,0) COMMENT "", + `decimalv3_col7` decimalv3(38,37) COMMENT "", + `decimalv3_col8` decimalv3(38,38) COMMENT "" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + } + + create_table(table_export_name); StringBuilder sb = new StringBuilder() int i = 1 @@ -101,14 +104,14 @@ suite("test_export_data_types", "p0") { sb.append(""" (${++i}, '9999-12-31', '9999-12-31', '9999-12-31 23:59:59', '9999-12-31 23:59:59', '2023-04-20 00:00:00.12', '2023-04-20 00:00:00.3344', '', 'Haidian', - ${Short.MIN_VALUE}, ${Byte.MIN_VALUE}, true, ${Integer.MIN_VALUE}, ${Long.MIN_VALUE}, ${i}, ${Float.MIN_VALUE}, ${Double.MIN_VALUE}, 'char${i}', + ${Short.MIN_VALUE}, ${Byte.MIN_VALUE}, true, ${Integer.MIN_VALUE}, ${Long.MIN_VALUE}, -170141183460469231731687303715884105728, ${Float.MIN_VALUE}, ${Double.MIN_VALUE}, 'char${i}', 100000000, 100000000, 4, 0.1, 0.99999999, 9999999999.9999999999, 99999999999999999999999999999999999999, 9.9999999999999999999999999999999999999, 0.99999999999999999999999999999999999999), """) sb.append(""" (${++i}, '2023-04-21', '2023-04-21', '2023-04-20 12:34:56', '2023-04-20 00:00:00', '2023-04-20 00:00:00.123', '2023-04-20 00:00:00.123456', 'Beijing', '', - ${Short.MAX_VALUE}, ${Byte.MAX_VALUE}, true, ${Integer.MAX_VALUE}, ${Long.MAX_VALUE}, ${i}, ${Float.MAX_VALUE}, ${Double.MAX_VALUE}, 'char${i}', + ${Short.MAX_VALUE}, ${Byte.MAX_VALUE}, true, ${Integer.MAX_VALUE}, ${Long.MAX_VALUE}, 170141183460469231731687303715884105727, ${Float.MAX_VALUE}, ${Double.MAX_VALUE}, 'char${i}', 999999999, 999999999, 9, 0.9, 9.99999999, 1234567890.0123456789, 12345678901234567890123456789012345678, 1.2345678901234567890123456789012345678, 0.12345678901234567890123456789012345678), """) @@ -124,6 +127,8 @@ suite("test_export_data_types", "p0") { ${sb.toString()} """ + def insert_res = sql "show last insert;" + logger.info("insert result: " + insert_res.toString()) qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY user_id; """ def check_path_exists = { dir_path -> @@ -187,40 +192,7 @@ suite("test_export_data_types", "p0") { // check file amounts check_file_amounts.call("${outFilePath}", 1) - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间2", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_1` DATETIMEV2 NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_2` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_3` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `street` STRING COMMENT "用户所在街道", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `largeint_col` largeint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `decimalv3_col` decimalv3 COMMENT "", - `decimalv3_col2` decimalv3(1,0) COMMENT "", - `decimalv3_col3` decimalv3(1,1) COMMENT "", - `decimalv3_col4` decimalv3(9,8) COMMENT "", - `decimalv3_col5` decimalv3(20,10) COMMENT "", - `decimalv3_col6` decimalv3(38,0) COMMENT "", - `decimalv3_col7` decimalv3(38,37) COMMENT "", - `decimalv3_col8` decimalv3(38,38) COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ + create_table(table_load_name); File[] files = new File("${outFilePath}").listFiles() String file_path = files[0].getAbsolutePath() @@ -264,11 +236,10 @@ suite("test_export_data_types", "p0") { // exec export sql """ - EXPORT TABLE ${table_export_name} where user_id<4 TO "file://${outFilePath}/" + EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/" PROPERTIES( "label" = "${label}", - "format" = "parquet", - "columns" = "user_id, date, datev2, datetime, datetimev2_1, datetimev2_2, datetimev2_3, city, street, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col, decimalv3_col, decimalv3_col2, decimalv3_col3, decimalv3_col4, decimalv3_col5, decimalv3_col6, decimalv3_col7, decimalv3_col8" + "format" = "parquet" ); """ waiting_export.call(label) @@ -276,39 +247,7 @@ suite("test_export_data_types", "p0") { // check file amounts check_file_amounts.call("${outFilePath}", 1) - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间2", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_1` DATETIMEV2 NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_2` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_3` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `street` STRING COMMENT "用户所在街道", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `decimalv3_col` decimalv3 COMMENT "", - `decimalv3_col2` decimalv3(1,0) COMMENT "", - `decimalv3_col3` decimalv3(1,1) COMMENT "", - `decimalv3_col4` decimalv3(9,8) COMMENT "", - `decimalv3_col5` decimalv3(20,10) COMMENT "", - `decimalv3_col6` decimalv3(38,0) COMMENT "", - `decimalv3_col7` decimalv3(38,37) COMMENT "", - `decimalv3_col8` decimalv3(38,38) COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ + create_table(table_load_name); File[] files = new File("${outFilePath}").listFiles() String file_path = files[0].getAbsolutePath() @@ -328,7 +267,7 @@ suite("test_export_data_types", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) - assertEquals(3, json.NumberTotalRows) + assertEquals(4, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) } } @@ -353,8 +292,7 @@ suite("test_export_data_types", "p0") { EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/" PROPERTIES( "label" = "${label}", - "format" = "orc", - "columns" = "user_id, date, city, street, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col" + "format" = "orc" ); """ waiting_export.call(label) @@ -362,26 +300,7 @@ suite("test_export_data_types", "p0") { // check file amounts check_file_amounts.call("${outFilePath}", 1) - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `street` STRING COMMENT "用户所在街道", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ + create_table(table_load_name); File[] files = new File("${outFilePath}").listFiles() String file_path = files[0].getAbsolutePath() @@ -435,40 +354,7 @@ suite("test_export_data_types", "p0") { // check file amounts check_file_amounts.call("${outFilePath}", 1) - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间2", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_1` DATETIMEV2 NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_2` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_3` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `street` STRING COMMENT "用户所在街道", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `largeint_col` largeint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `decimalv3_col` decimalv3 COMMENT "", - `decimalv3_col2` decimalv3(1,0) COMMENT "", - `decimalv3_col3` decimalv3(1,1) COMMENT "", - `decimalv3_col4` decimalv3(9,8) COMMENT "", - `decimalv3_col5` decimalv3(20,10) COMMENT "", - `decimalv3_col6` decimalv3(38,0) COMMENT "", - `decimalv3_col7` decimalv3(38,37) COMMENT "", - `decimalv3_col8` decimalv3(38,38) COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ + create_table(table_load_name); File[] files = new File("${outFilePath}").listFiles() String file_path = files[0].getAbsolutePath() @@ -524,40 +410,7 @@ suite("test_export_data_types", "p0") { // check file amounts check_file_amounts.call("${outFilePath}", 1) - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间2", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_1` DATETIMEV2 NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_2` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间", - `datetimev2_3` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `street` STRING COMMENT "用户所在街道", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `largeint_col` largeint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `decimalv3_col` decimalv3 COMMENT "", - `decimalv3_col2` decimalv3(1,0) COMMENT "", - `decimalv3_col3` decimalv3(1,1) COMMENT "", - `decimalv3_col4` decimalv3(9,8) COMMENT "", - `decimalv3_col5` decimalv3(20,10) COMMENT "", - `decimalv3_col6` decimalv3(38,0) COMMENT "", - `decimalv3_col7` decimalv3(38,37) COMMENT "", - `decimalv3_col8` decimalv3(38,38) COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ + create_table(table_load_name); File[] files = new File("${outFilePath}").listFiles() String file_path = files[0].getAbsolutePath() diff --git a/regression-test/suites/export_p0/test_export_orc.groovy b/regression-test/suites/export_p0/test_export_orc.groovy index 1321988c9e..7892e97e62 100644 --- a/regression-test/suites/export_p0/test_export_orc.groovy +++ b/regression-test/suites/export_p0/test_export_orc.groovy @@ -88,6 +88,8 @@ suite("test_export_orc", "p0") { sql """ INSERT INTO ${table_export_name} VALUES ${sb.toString()} """ + def insert_res = sql "show last insert;" + logger.info("insert result: " + insert_res.toString()) qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY user_id; """ diff --git a/regression-test/suites/export_p0/test_export_parquet.groovy b/regression-test/suites/export_p0/test_export_parquet.groovy index 3f98d32530..7c4bc78715 100644 --- a/regression-test/suites/export_p0/test_export_parquet.groovy +++ b/regression-test/suites/export_p0/test_export_parquet.groovy @@ -88,6 +88,8 @@ suite("test_export_parquet", "p0") { sql """ INSERT INTO ${table_export_name} VALUES ${sb.toString()} """ + def insert_res = sql "show last insert;" + logger.info("insert result: " + insert_res.toString()) qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY user_id; """