diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake index 2758bb9b93..5c07f79d6e 100644 --- a/be/cmake/thirdparty.cmake +++ b/be/cmake/thirdparty.cmake @@ -154,6 +154,7 @@ add_thirdparty(com_err) add_thirdparty(k5crypto) add_thirdparty(gssapi_krb5) add_thirdparty(dragonbox_to_chars LIB64) +add_thirdparty(streamvbyte LIB64) target_include_directories(dragonbox_to_chars INTERFACE "${THIRDPARTY_DIR}/include/dragonbox-1.1.3") if (OS_MACOSX) diff --git a/be/src/agent/be_exec_version_manager.h b/be/src/agent/be_exec_version_manager.h index 91f3e2b6d2..ff5b141c20 100644 --- a/be/src/agent/be_exec_version_manager.h +++ b/be/src/agent/be_exec_version_manager.h @@ -63,8 +63,10 @@ private: * b. array contains/position/countequal function return nullable in less situations. * c. cleared old version of Version 2. * d. unix_timestamp function support timestamp with float for datetimev2, and change nullable mode. + * 4: start from doris 2.1.x + * a. change shuffle serialize/deserialize way */ -inline const int BeExecVersionManager::max_be_exec_version = 3; +inline const int BeExecVersionManager::max_be_exec_version = 4; inline const int BeExecVersionManager::min_be_exec_version = 0; } // namespace doris diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h index 1528690f6b..02106b75e6 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sort.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h @@ -77,7 +77,7 @@ struct AggregateFunctionSortData { size_t compressed_bytes = 0; static_cast(block.serialize(state->be_exec_version(), &pblock, &uncompressed_bytes, &compressed_bytes, - segment_v2::CompressionTypePB::SNAPPY)); + segment_v2::CompressionTypePB::NO_COMPRESSION)); write_string_binary(pblock.SerializeAsString(), buf); } diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index c2594a2e35..80a10c528e 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -855,6 +855,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, // when data type is HLL, content_uncompressed_size maybe larger than real size. std::string column_values; try { + // TODO: After support c++23, we should use resize_and_overwrite to replace resize column_values.resize(content_uncompressed_size); } catch (...) { std::string msg = fmt::format("Try to alloc {} bytes for pblock column values failed.", @@ -868,34 +869,36 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); } *uncompressed_bytes = content_uncompressed_size; + const size_t serialize_bytes = buf - column_values.data(); + *compressed_bytes = serialize_bytes; + column_values.resize(serialize_bytes); // compress if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { SCOPED_RAW_TIMER(&_compress_time_ns); pblock->set_compression_type(compression_type); - pblock->set_uncompressed_size(content_uncompressed_size); + pblock->set_uncompressed_size(serialize_bytes); BlockCompressionCodec* codec; RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); faststring buf_compressed; - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress( - Slice(column_values.data(), content_uncompressed_size), &buf_compressed)); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION( + codec->compress(Slice(column_values.data(), serialize_bytes), &buf_compressed)); size_t compressed_size = buf_compressed.size(); - if (LIKELY(compressed_size < content_uncompressed_size)) { + if (LIKELY(compressed_size < serialize_bytes)) { + // TODO: rethink the logic here may copy again ? pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); pblock->set_compressed(true); *compressed_bytes = compressed_size; } else { pblock->set_column_values(std::move(column_values)); - *compressed_bytes = content_uncompressed_size; } VLOG_ROW << "uncompressed size: " << content_uncompressed_size << ", compressed size: " << compressed_size; } else { pblock->set_column_values(std::move(column_values)); - *compressed_bytes = content_uncompressed_size; } if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits::max()) { return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.", diff --git a/be/src/vec/core/block_spill_writer.cpp b/be/src/vec/core/block_spill_writer.cpp index 86a59e9357..6b5e21c393 100644 --- a/be/src/vec/core/block_spill_writer.cpp +++ b/be/src/vec/core/block_spill_writer.cpp @@ -122,7 +122,7 @@ Status BlockSpillWriter::_write_internal(const Block& block) { SCOPED_TIMER(serialize_timer_); status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock, &uncompressed_bytes, &compressed_bytes, - segment_v2::CompressionTypePB::LZ4); + segment_v2::CompressionTypePB::NO_COMPRESSION); if (!status.ok()) { unlink(file_path_.c_str()); return status; diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h index 06082f0095..7807f2a5d5 100644 --- a/be/src/vec/data_types/data_type.h +++ b/be/src/vec/data_types/data_type.h @@ -55,6 +55,10 @@ class Field; using DataTypePtr = std::shared_ptr; using DataTypes = std::vector; +constexpr auto SERIALIZED_MEM_SIZE_LIMIT = 256; +inline size_t upper_int32(size_t size) { + return (3 + size) / 4.0; +} /** Properties of data type. * Contains methods for serialization/deserialization. diff --git a/be/src/vec/data_types/data_type_decimal.cpp b/be/src/vec/data_types/data_type_decimal.cpp index bec9847d0a..4e4c7ddd81 100644 --- a/be/src/vec/data_types/data_type_decimal.cpp +++ b/be/src/vec/data_types/data_type_decimal.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -105,35 +106,86 @@ Status DataTypeDecimal::from_string(ReadBuffer& rb, IColumn* column) const { template int64_t DataTypeDecimal::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - return sizeof(uint32_t) + column.size() * sizeof(FieldType); + if (be_exec_version >= 4) { + auto size = sizeof(T) * column.size(); + if (size <= SERIALIZED_MEM_SIZE_LIMIT) { + return sizeof(uint32_t) + size; + } else { + return sizeof(uint32_t) + sizeof(size_t) + + std::max(size, streamvbyte_max_compressedbytes(upper_int32(size))); + } + } else { + return sizeof(uint32_t) + column.size() * sizeof(FieldType); + } } template char* DataTypeDecimal::serialize(const IColumn& column, char* buf, int be_exec_version) const { - // row num - const auto row_num = column.size(); - *reinterpret_cast(buf) = row_num; - buf += sizeof(uint32_t); - // column values - auto ptr = column.convert_to_full_column_if_const(); - const auto* origin_data = assert_cast(*ptr.get()).get_data().data(); - memcpy(buf, origin_data, row_num * sizeof(FieldType)); - buf += row_num * sizeof(FieldType); - return buf; + if (be_exec_version >= 4) { + // row num + const auto mem_size = column.size() * sizeof(T); + *reinterpret_cast(buf) = mem_size; + buf += sizeof(uint32_t); + // column data + auto ptr = column.convert_to_full_column_if_const(); + const auto* origin_data = + assert_cast&>(*ptr.get()).get_data().data(); + if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(buf, origin_data, mem_size); + return buf + mem_size; + } + + auto encode_size = + streamvbyte_encode(reinterpret_cast(origin_data), + upper_int32(mem_size), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast(buf) = encode_size; + buf += sizeof(size_t); + return buf + encode_size; + } else { + // row num + const auto row_num = column.size(); + *reinterpret_cast(buf) = row_num; + buf += sizeof(uint32_t); + // column values + auto ptr = column.convert_to_full_column_if_const(); + const auto* origin_data = assert_cast(*ptr.get()).get_data().data(); + memcpy(buf, origin_data, row_num * sizeof(FieldType)); + buf += row_num * sizeof(FieldType); + return buf; + } } template const char* DataTypeDecimal::deserialize(const char* buf, IColumn* column, int be_exec_version) const { - // row num - uint32_t row_num = *reinterpret_cast(buf); - buf += sizeof(uint32_t); - // column values - auto& container = assert_cast(column)->get_data(); - container.resize(row_num); - memcpy(container.data(), buf, row_num * sizeof(FieldType)); - buf += row_num * sizeof(FieldType); - return buf; + if (be_exec_version >= 4) { + // row num + uint32_t mem_size = *reinterpret_cast(buf); + buf += sizeof(uint32_t); + // column data + auto& container = assert_cast*>(column)->get_data(); + container.resize(mem_size / sizeof(T)); + if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(container.data(), buf, mem_size); + return buf + mem_size; + } + + size_t encode_size = *reinterpret_cast(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(container.data()), + upper_int32(mem_size)); + return buf + encode_size; + } else { + // row num + uint32_t row_num = *reinterpret_cast(buf); + buf += sizeof(uint32_t); + // column values + auto& container = assert_cast(column)->get_data(); + container.resize(row_num); + memcpy(container.data(), buf, row_num * sizeof(FieldType)); + buf += row_num * sizeof(FieldType); + return buf; + } } template diff --git a/be/src/vec/data_types/data_type_nullable.cpp b/be/src/vec/data_types/data_type_nullable.cpp index d5db76ab35..1e4ecb1cf2 100644 --- a/be/src/vec/data_types/data_type_nullable.cpp +++ b/be/src/vec/data_types/data_type_nullable.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -102,42 +103,102 @@ Status DataTypeNullable::from_string(ReadBuffer& rb, IColumn* column) const { // : value1 | value2 | ...> int64_t DataTypeNullable::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - int64_t size = sizeof(uint32_t); - size += sizeof(bool) * column.size(); - size += nested_data_type->get_uncompressed_serialized_bytes( - assert_cast(*column.convert_to_full_column_if_const()) - .get_nested_column(), - be_exec_version); - return size; + if (be_exec_version >= 4) { + size_t ret = 0; + if (size_t size = sizeof(bool) * column.size(); size <= SERIALIZED_MEM_SIZE_LIMIT) { + ret += size + sizeof(uint32_t); + } else { + ret += (sizeof(uint32_t) + sizeof(size_t) + + std::max(size, streamvbyte_max_compressedbytes(upper_int32(size)))); + } + ret += nested_data_type->get_uncompressed_serialized_bytes( + assert_cast(*column.convert_to_full_column_if_const()) + .get_nested_column(), + be_exec_version); + return ret; + } else { + int64_t size = sizeof(uint32_t); + size += sizeof(bool) * column.size(); + size += nested_data_type->get_uncompressed_serialized_bytes( + assert_cast(*column.convert_to_full_column_if_const()) + .get_nested_column(), + be_exec_version); + return size; + } } char* DataTypeNullable::serialize(const IColumn& column, char* buf, int be_exec_version) const { - auto ptr = column.convert_to_full_column_if_const(); - const ColumnNullable& col = assert_cast(*ptr.get()); + if (be_exec_version >= 4) { + auto ptr = column.convert_to_full_column_if_const(); + const ColumnNullable& col = assert_cast(*ptr.get()); - // row num - *reinterpret_cast(buf) = column.size(); - buf += sizeof(uint32_t); - // null flags - memcpy(buf, col.get_null_map_data().data(), column.size() * sizeof(bool)); - buf += column.size() * sizeof(bool); - // data values - return nested_data_type->serialize(col.get_nested_column(), buf, be_exec_version); + // row num + auto mem_size = col.size() * sizeof(bool); + *reinterpret_cast(buf) = mem_size; + buf += sizeof(uint32_t); + // null flags + if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(buf, col.get_null_map_data().data(), mem_size); + buf += mem_size; + } else { + auto encode_size = streamvbyte_encode( + reinterpret_cast(col.get_null_map_data().data()), + upper_int32(mem_size), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast(buf) = encode_size; + buf += (sizeof(size_t) + encode_size); + } + // data values + return nested_data_type->serialize(col.get_nested_column(), buf, be_exec_version); + } else { + auto ptr = column.convert_to_full_column_if_const(); + const ColumnNullable& col = assert_cast(*ptr.get()); + + // row num + *reinterpret_cast(buf) = column.size(); + buf += sizeof(uint32_t); + // null flags + memcpy(buf, col.get_null_map_data().data(), column.size() * sizeof(bool)); + buf += column.size() * sizeof(bool); + // data values + return nested_data_type->serialize(col.get_nested_column(), buf, be_exec_version); + } } const char* DataTypeNullable::deserialize(const char* buf, IColumn* column, int be_exec_version) const { - ColumnNullable* col = assert_cast(column); - // row num - uint32_t row_num = *reinterpret_cast(buf); - buf += sizeof(uint32_t); - // null flags - col->get_null_map_data().resize(row_num); - memcpy(col->get_null_map_data().data(), buf, row_num * sizeof(bool)); - buf += row_num * sizeof(bool); - // data values - IColumn& nested = col->get_nested_column(); - return nested_data_type->deserialize(buf, &nested, be_exec_version); + if (be_exec_version >= 4) { + ColumnNullable* col = assert_cast(column); + // row num + uint32_t mem_size = *reinterpret_cast(buf); + buf += sizeof(uint32_t); + // null flags + col->get_null_map_data().resize(mem_size / sizeof(bool)); + if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(col->get_null_map_data().data(), buf, mem_size); + buf += mem_size; + } else { + size_t encode_size = *reinterpret_cast(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(col->get_null_map_data().data()), + upper_int32(mem_size)); + buf += encode_size; + } + // data values + IColumn& nested = col->get_nested_column(); + return nested_data_type->deserialize(buf, &nested, be_exec_version); + } else { + ColumnNullable* col = assert_cast(column); + // row num + uint32_t row_num = *reinterpret_cast(buf); + buf += sizeof(uint32_t); + // null flags + col->get_null_map_data().resize(row_num); + memcpy(col->get_null_map_data().data(), buf, row_num * sizeof(bool)); + buf += row_num * sizeof(bool); + // data values + IColumn& nested = col->get_nested_column(); + return nested_data_type->deserialize(buf, &nested, be_exec_version); + } } void DataTypeNullable::to_pb_column_meta(PColumnMeta* col_meta) const { diff --git a/be/src/vec/data_types/data_type_number_base.cpp b/be/src/vec/data_types/data_type_number_base.cpp index bd73fc4adc..a3ac1d80af 100644 --- a/be/src/vec/data_types/data_type_number_base.cpp +++ b/be/src/vec/data_types/data_type_number_base.cpp @@ -22,11 +22,11 @@ #include #include +#include #include #include #include -#include #include "gutil/strings/numbers.h" #include "runtime/large_int_value.h" @@ -162,38 +162,87 @@ std::string DataTypeNumberBase::to_string(const IColumn& column, size_t row_n template int64_t DataTypeNumberBase::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - return sizeof(uint32_t) + column.size() * sizeof(FieldType); + if (be_exec_version >= 4) { + auto size = sizeof(T) * column.size(); + if (size <= SERIALIZED_MEM_SIZE_LIMIT) { + return sizeof(uint32_t) + size; + } else { + return sizeof(uint32_t) + sizeof(size_t) + + std::max(size, streamvbyte_max_compressedbytes(upper_int32(size))); + } + } else { + return sizeof(uint32_t) + column.size() * sizeof(FieldType); + } } template char* DataTypeNumberBase::serialize(const IColumn& column, char* buf, int be_exec_version) const { - // row num - const auto row_num = column.size(); - *reinterpret_cast(buf) = row_num; - buf += sizeof(uint32_t); - // column data - auto ptr = column.convert_to_full_column_if_const(); - const auto* origin_data = assert_cast&>(*ptr.get()).get_data().data(); - memcpy(buf, origin_data, row_num * sizeof(FieldType)); - buf += row_num * sizeof(FieldType); + if (be_exec_version >= 4) { + // row num + const auto mem_size = column.size() * sizeof(T); + *reinterpret_cast(buf) = mem_size; + buf += sizeof(uint32_t); + // column data + auto ptr = column.convert_to_full_column_if_const(); + const auto* origin_data = assert_cast&>(*ptr.get()).get_data().data(); + if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(buf, origin_data, mem_size); + return buf + mem_size; + } - return buf; + auto encode_size = + streamvbyte_encode(reinterpret_cast(origin_data), + upper_int32(mem_size), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast(buf) = encode_size; + buf += sizeof(size_t); + return buf + encode_size; + } else { + // row num + const auto row_num = column.size(); + *reinterpret_cast(buf) = row_num; + buf += sizeof(uint32_t); + // column data + auto ptr = column.convert_to_full_column_if_const(); + const auto* origin_data = assert_cast&>(*ptr.get()).get_data().data(); + memcpy(buf, origin_data, row_num * sizeof(FieldType)); + buf += row_num * sizeof(FieldType); + + return buf; + } } template const char* DataTypeNumberBase::deserialize(const char* buf, IColumn* column, int be_exec_version) const { - // row num - uint32_t row_num = *reinterpret_cast(buf); - buf += sizeof(uint32_t); - // column data - auto& container = assert_cast*>(column)->get_data(); - container.resize(row_num); - memcpy(container.data(), buf, row_num * sizeof(FieldType)); - buf += row_num * sizeof(FieldType); + if (be_exec_version >= 4) { + // row num + uint32_t mem_size = *reinterpret_cast(buf); + buf += sizeof(uint32_t); + // column data + auto& container = assert_cast*>(column)->get_data(); + container.resize(mem_size / sizeof(T)); + if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(container.data(), buf, mem_size); + return buf + mem_size; + } - return buf; + size_t encode_size = *reinterpret_cast(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(container.data()), + upper_int32(mem_size)); + return buf + encode_size; + } else { + // row num + uint32_t row_num = *reinterpret_cast(buf); + buf += sizeof(uint32_t); + // column data + auto& container = assert_cast*>(column)->get_data(); + container.resize(row_num); + memcpy(container.data(), buf, row_num * sizeof(FieldType)); + buf += row_num * sizeof(FieldType); + return buf; + } } template diff --git a/be/src/vec/data_types/data_type_string.cpp b/be/src/vec/data_types/data_type_string.cpp index bf23705bb7..a3f180a27c 100644 --- a/be/src/vec/data_types/data_type_string.cpp +++ b/be/src/vec/data_types/data_type_string.cpp @@ -20,11 +20,10 @@ #include "vec/data_types/data_type_string.h" +#include +#include #include -#include -#include - #include "vec/columns/column.h" #include "vec/columns/column_const.h" #include "vec/columns/column_string.h" @@ -78,107 +77,155 @@ bool DataTypeString::equals(const IDataType& rhs) const { // : | (*ptr.get()); + if (be_exec_version >= 4) { + auto ptr = column.convert_to_full_column_if_const(); + const auto& data_column = assert_cast(*ptr.get()); + int64_t size = sizeof(uint32_t) + sizeof(uint64_t); + if (auto offsets_size = data_column.size() * sizeof(IColumn::Offset); + offsets_size <= SERIALIZED_MEM_SIZE_LIMIT) { + size += offsets_size; + } else { + size += sizeof(size_t) + std::max(offsets_size, streamvbyte_max_compressedbytes( + upper_int32(offsets_size))); + } + + if (auto bytes = data_column.get_chars().size(); bytes <= SERIALIZED_MEM_SIZE_LIMIT) { + size += bytes; + } else { + size += sizeof(size_t) + + std::max(bytes, streamvbyte_max_compressedbytes(upper_int32(bytes))); + } + return size; + } else { + auto ptr = column.convert_to_full_column_if_const(); + const auto& data_column = assert_cast(*ptr.get()); + + if (be_exec_version == 0) { + return sizeof(IColumn::Offset) * (column.size() + 1) + sizeof(uint64_t) + + data_column.get_chars().size() + column.size(); + } - if (be_exec_version == 0) { return sizeof(IColumn::Offset) * (column.size() + 1) + sizeof(uint64_t) + - data_column.get_chars().size() + column.size(); + data_column.get_chars().size(); } - - return sizeof(IColumn::Offset) * (column.size() + 1) + sizeof(uint64_t) + - data_column.get_chars().size(); } char* DataTypeString::serialize(const IColumn& column, char* buf, int be_exec_version) const { - auto ptr = column.convert_to_full_column_if_const(); - const auto& data_column = assert_cast(*ptr.get()); + if (be_exec_version >= 4) { + auto ptr = column.convert_to_full_column_if_const(); + const auto& data_column = assert_cast(*ptr.get()); - if (be_exec_version == 0) { + // row num + uint32_t mem_size = data_column.size() * sizeof(IColumn::Offset); + *reinterpret_cast(buf) = mem_size; + buf += sizeof(uint32_t); + // offsets + if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(buf, data_column.get_offsets().data(), mem_size); + buf += mem_size; + } else { + auto encode_size = streamvbyte_encode( + reinterpret_cast(data_column.get_offsets().data()), + upper_int32(mem_size), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast(buf) = encode_size; + buf += (sizeof(size_t) + encode_size); + } + + // values + uint64_t value_len = data_column.get_chars().size(); + *reinterpret_cast(buf) = value_len; + buf += sizeof(uint64_t); + if (value_len <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(buf, data_column.get_chars().data(), value_len); + buf += value_len; + return buf; + } + auto encode_size = streamvbyte_encode( + reinterpret_cast(data_column.get_chars().data()), + upper_int32(value_len), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast(buf) = encode_size; + buf += (sizeof(size_t) + encode_size); + return buf; + } else { + auto ptr = column.convert_to_full_column_if_const(); + const auto& data_column = assert_cast(*ptr.get()); // row num *reinterpret_cast(buf) = column.size(); buf += sizeof(IColumn::Offset); // offsets - for (int i = 0; i < column.size(); i++) { - *reinterpret_cast(buf) = data_column.get_offsets()[i] + i + 1; - buf += sizeof(IColumn::Offset); - } + memcpy(buf, data_column.get_offsets().data(), column.size() * sizeof(IColumn::Offset)); + buf += column.size() * sizeof(IColumn::Offset); // total length - *reinterpret_cast(buf) = data_column.get_chars().size() + column.size(); + uint64_t value_len = data_column.get_chars().size(); + *reinterpret_cast(buf) = value_len; buf += sizeof(uint64_t); // values - for (int i = 0; i < column.size(); i++) { - auto data = data_column.get_data_at(i); - memcpy(buf, data.data, data.size); - buf += data.size; - *buf = '\0'; - buf++; - } + memcpy(buf, data_column.get_chars().data(), value_len); + buf += value_len; + return buf; } - - // row num - *reinterpret_cast(buf) = column.size(); - buf += sizeof(IColumn::Offset); - // offsets - memcpy(buf, data_column.get_offsets().data(), column.size() * sizeof(IColumn::Offset)); - buf += column.size() * sizeof(IColumn::Offset); - // total length - uint64_t value_len = data_column.get_chars().size(); - *reinterpret_cast(buf) = value_len; - buf += sizeof(uint64_t); - // values - memcpy(buf, data_column.get_chars().data(), value_len); - buf += value_len; - - return buf; } const char* DataTypeString::deserialize(const char* buf, IColumn* column, int be_exec_version) const { - ColumnString* column_string = assert_cast(column); - ColumnString::Chars& data = column_string->get_chars(); - ColumnString::Offsets& offsets = column_string->get_offsets(); + if (be_exec_version >= 4) { + ColumnString* column_string = assert_cast(column); + ColumnString::Chars& data = column_string->get_chars(); + ColumnString::Offsets& offsets = column_string->get_offsets(); - if (be_exec_version == 0) { + uint32_t mem_size = *reinterpret_cast(buf); + buf += sizeof(uint32_t); + offsets.resize(mem_size / sizeof(IColumn::Offset)); + // offsets + if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(offsets.data(), buf, mem_size); + buf += mem_size; + } else { + size_t encode_size = *reinterpret_cast(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(offsets.data()), + upper_int32(mem_size)); + buf += encode_size; + } + // total length + uint64_t value_len = *reinterpret_cast(buf); + buf += sizeof(uint64_t); + data.resize(value_len); + + // offsets + if (value_len <= SERIALIZED_MEM_SIZE_LIMIT) { + memcpy(data.data(), buf, value_len); + buf += value_len; + } else { + size_t encode_size = *reinterpret_cast(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(data.data()), + upper_int32(value_len)); + buf += encode_size; + } + return buf; + } else { + ColumnString* column_string = assert_cast(column); + ColumnString::Chars& data = column_string->get_chars(); + ColumnString::Offsets& offsets = column_string->get_offsets(); // row num IColumn::Offset row_num = *reinterpret_cast(buf); buf += sizeof(IColumn::Offset); // offsets offsets.resize(row_num); - for (int i = 0; i < row_num; i++) { - offsets[i] = *reinterpret_cast(buf) - i - 1; - buf += sizeof(IColumn::Offset); - } + memcpy(offsets.data(), buf, sizeof(IColumn::Offset) * row_num); + buf += sizeof(IColumn::Offset) * row_num; // total length uint64_t value_len = *reinterpret_cast(buf); buf += sizeof(uint64_t); // values - data.resize(value_len - row_num); - for (int i = 0; i < row_num; i++) { - memcpy(data.data() + offsets[i - 1], buf, offsets[i] - offsets[i - 1]); - buf += offsets[i] - offsets[i - 1] + 1; - } + data.resize(value_len); + memcpy(data.data(), buf, value_len); + buf += value_len; return buf; } - - // row num - IColumn::Offset row_num = *reinterpret_cast(buf); - buf += sizeof(IColumn::Offset); - // offsets - offsets.resize(row_num); - memcpy(offsets.data(), buf, sizeof(IColumn::Offset) * row_num); - buf += sizeof(IColumn::Offset) * row_num; - // total length - uint64_t value_len = *reinterpret_cast(buf); - buf += sizeof(uint64_t); - // values - data.resize(value_len); - memcpy(data.data(), buf, value_len); - buf += value_len; - - return buf; } } // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index c19f56fd07..d429b8bbd9 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -66,7 +66,8 @@ Status VWalWriter::write_wal(vectorized::Block* block) { PBlock pblock; size_t uncompressed_bytes = 0, compressed_bytes = 0; RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes, - &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); + &compressed_bytes, + segment_v2::CompressionTypePB::NO_COMPRESSION)); RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); return Status::OK(); } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 8c4dc4eb0b..248b0f0dbe 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1707,7 +1707,7 @@ public class Config extends ConfigBase { * Max data version of backends serialize block. */ @ConfField(mutable = false) - public static int max_be_exec_version = 3; + public static int max_be_exec_version = 4; /** * Min data version of backends serialize block. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index bb880136b8..e2ebde3db9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -700,7 +700,7 @@ public class SessionVariable implements Serializable, Writable { public String preferJoinMethod = "broadcast"; @VariableMgr.VarAttr(name = FRAGMENT_TRANSMISSION_COMPRESSION_CODEC) - public String fragmentTransmissionCompressionCodec = "lz4"; + public String fragmentTransmissionCompressionCodec = "none"; /* * the parallel exec instance num for one Fragment in one BE