[Fix](hive-writer) Fix hive partition update core. (#35311)
Issue: #31442 ``` /home/zcp/repo_center/doris_master/doris/be/src/common/signal_handler.h:421 1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0] in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 2# JVM_handle_linux_signal in /usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so 3# 0x00007F963FA9D090 in /lib/x86_64-linux-gnu/libc.so.6 4# doris::vectorized::VHivePartitionWriter::_build_partition_update() at /home/zcp/repo_center/doris_master/doris/be/src/vec/sink/writer/vhive_partition_writer.cpp:215 5# doris::vectorized::VHivePartitionWriter::close(doris::Status const&) at /home/zcp/repo_center/doris_master/doris/be/src/vec/sink/writer/vhive_partition_writer.cpp:164 6# doris::vectorized::VHiveTableWriter::close(doris::Status) at /home/zcp/repo_center/doris_master/doris/be/src/vec/sink/writer/vhive_table_writer.cpp:209 7# doris::vectorized::AsyncResultWriter::process_block(doris::RuntimeState*, doris::RuntimeProfile*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/sink/writer/async_result_writer.cpp:184 8# doris::vectorized::AsyncResultWriter::start_writer(doris::RuntimeState*, doris::RuntimeProfile*)::$_0::operator()() const at ```
This commit is contained in:
@ -136,6 +136,9 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
|
||||
}
|
||||
|
||||
Status VHivePartitionWriter::close(const Status& status) {
|
||||
if (status.ok()) {
|
||||
_state->hive_partition_updates().emplace_back(_build_partition_update());
|
||||
}
|
||||
if (_file_format_transformer != nullptr) {
|
||||
Status st = _file_format_transformer->close();
|
||||
if (!st.ok()) {
|
||||
@ -150,7 +153,6 @@ Status VHivePartitionWriter::close(const Status& status) {
|
||||
LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path, st.to_string());
|
||||
}
|
||||
}
|
||||
_state->hive_partition_updates().emplace_back(_build_partition_update());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -201,11 +203,14 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
|
||||
hive_partition_update.__set_location(location);
|
||||
hive_partition_update.__set_file_names({_get_target_file_name()});
|
||||
hive_partition_update.__set_row_count(_row_count);
|
||||
DCHECK(_file_format_transformer != nullptr);
|
||||
hive_partition_update.__set_file_size(_file_format_transformer->written_len());
|
||||
|
||||
if (_write_info.file_type == TFileType::FILE_S3) {
|
||||
DCHECK(_file_writer != nullptr);
|
||||
doris::io::S3FileWriter* s3_mpu_file_writer =
|
||||
dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
|
||||
DCHECK(s3_mpu_file_writer != nullptr);
|
||||
TS3MPUPendingUpload s3_mpu_pending_upload;
|
||||
s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket());
|
||||
s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key());
|
||||
|
||||
@ -208,7 +208,8 @@ Status VHiveTableWriter::close(Status status) {
|
||||
for (const auto& pair : _partitions_to_writers) {
|
||||
Status st = pair.second->close(status);
|
||||
if (st != Status::OK()) {
|
||||
LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string());
|
||||
LOG(WARNING) << fmt::format("partition writer close failed for partition {}",
|
||||
st.to_string());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -330,15 +331,15 @@ std::vector<std::string> VHiveTableWriter::_create_partition_values(vectorized::
|
||||
partition_column, position);
|
||||
|
||||
// Check if value contains only printable ASCII characters
|
||||
bool isValid = true;
|
||||
bool is_valid = true;
|
||||
for (char c : value) {
|
||||
if (c < 0x20 || c > 0x7E) {
|
||||
isValid = false;
|
||||
is_valid = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!isValid) {
|
||||
if (!is_valid) {
|
||||
// Encode value using Base16 encoding with space separator
|
||||
std::stringstream encoded;
|
||||
for (unsigned char c : value) {
|
||||
@ -414,7 +415,6 @@ std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& type_des
|
||||
char buf[64];
|
||||
char* pos = value.to_string(buf);
|
||||
return std::string(buf, pos - buf - 1);
|
||||
break;
|
||||
}
|
||||
case TYPE_DATEV2: {
|
||||
DateV2Value<DateV2ValueType> value =
|
||||
|
||||
Reference in New Issue
Block a user