diff --git a/be/src/agent/be_exec_version_manager.h b/be/src/agent/be_exec_version_manager.h index ad8cd007c0..963437b3d3 100644 --- a/be/src/agent/be_exec_version_manager.h +++ b/be/src/agent/be_exec_version_manager.h @@ -63,11 +63,13 @@ private: * b. array contains/position/countequal function return nullable in less situations. * c. cleared old version of Version 2. * d. unix_timestamp function support timestamp with float for datetimev2, and change nullable mode. - * e. the right function outputs NULL when the function contains NULL, substr function returns empty if start > str.length. - * 4: start from doris 2.1.x - * a. change shuffle serialize/deserialize way + * e. change shuffle serialize/deserialize way + * f. the right function outputs NULL when the function contains NULL, substr function returns empty if start > str.length. */ -inline const int BeExecVersionManager::max_be_exec_version = 4; -inline const int BeExecVersionManager::min_be_exec_version = 0; +constexpr inline int BeExecVersionManager::max_be_exec_version = 3; +constexpr inline int BeExecVersionManager::min_be_exec_version = 0; + +/// functional +constexpr inline int USE_NEW_SERDE = 3; // release on DORIS version 2.1 } // namespace doris diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp index 36a4c15aa2..5b65467b6c 100644 --- a/be/src/olap/wal/wal_reader.cpp +++ b/be/src/olap/wal/wal_reader.cpp @@ -29,7 +29,14 @@ namespace doris { WalReader::WalReader(const std::string& file_name) : _file_name(file_name), _offset(0) {} -WalReader::~WalReader() {} +WalReader::~WalReader() = default; + +static Status _deserialize(PBlock& block, const std::string& buf) { + if (UNLIKELY(!block.ParseFromString(buf))) { + return Status::InternalError("failed to deserialize row"); + } + return Status::OK(); +} Status WalReader::init() { RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_name, &file_reader)); @@ -98,13 +105,6 @@ Status WalReader::read_header(std::string& col_ids) { return Status::OK(); } -Status WalReader::_deserialize(PBlock& block, std::string& buf) { - if (UNLIKELY(!block.ParseFromString(buf))) { - return Status::InternalError("failed to deserialize row"); - } - return Status::OK(); -} - Status WalReader::_check_checksum(const char* binary, size_t size, uint32_t checksum) { uint32_t computed_checksum = crc32c::Value(binary, size); if (LIKELY(computed_checksum == checksum)) { diff --git a/be/src/olap/wal/wal_reader.h b/be/src/olap/wal/wal_reader.h index 7c852ee18f..1f26a7598f 100644 --- a/be/src/olap/wal/wal_reader.h +++ b/be/src/olap/wal/wal_reader.h @@ -35,10 +35,8 @@ public: Status read_header(std::string& col_ids); private: - Status _deserialize(PBlock& block, std::string& buf); Status _check_checksum(const char* binary, size_t size, uint32_t checksum); -private: std::string _file_name; uint32_t _version = 0; size_t _offset; diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp index 8edc6b3b86..62b1352a57 100644 --- a/be/src/olap/wal/wal_writer.cpp +++ b/be/src/olap/wal/wal_writer.cpp @@ -57,19 +57,19 @@ Status WalWriter::finalize() { Status WalWriter::append_blocks(const PBlockArray& blocks) { size_t total_size = 0; - for (const auto& block : blocks) { - total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE; - } size_t offset = 0; for (const auto& block : blocks) { uint8_t len_buf[sizeof(uint64_t)]; uint64_t block_length = block->ByteSizeLong(); + total_size += LENGTH_SIZE + block_length + CHECKSUM_SIZE; encode_fixed64_le(len_buf, block_length); RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)})); offset += LENGTH_SIZE; + std::string content = block->SerializeAsString(); RETURN_IF_ERROR(_file_writer->append(content)); offset += block_length; + uint8_t checksum_buf[sizeof(uint32_t)]; uint32_t checksum = crc32c::Value(content.data(), block_length); encode_fixed32_le(checksum_buf, checksum); diff --git a/be/src/vec/data_types/data_type_decimal.cpp b/be/src/vec/data_types/data_type_decimal.cpp index 0573a17729..b44b116d6f 100644 --- a/be/src/vec/data_types/data_type_decimal.cpp +++ b/be/src/vec/data_types/data_type_decimal.cpp @@ -23,8 +23,11 @@ #include #include #include -#include +#include +#include + +#include "agent/be_exec_version_manager.h" #include "runtime/decimalv2_value.h" #include "util/string_parser.hpp" #include "vec/columns/column.h" @@ -77,8 +80,7 @@ void DataTypeDecimal::to_string(const IColumn& column, size_t row_num, auto str = value.to_string(scale); ostr.write(str.data(), str.size()); } else { - DecimalV2Value value = - (DecimalV2Value)assert_cast(*ptr).get_element(row_num); + auto value = (DecimalV2Value)assert_cast(*ptr).get_element(row_num); auto str = value.to_string(scale); ostr.write(str.data(), str.size()); } @@ -104,7 +106,7 @@ Status DataTypeDecimal::from_string(ReadBuffer& rb, IColumn* column) const { template int64_t DataTypeDecimal::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - if (be_exec_version >= 4) { + if (be_exec_version >= USE_NEW_SERDE) { auto size = sizeof(T) * column.size(); if (size <= SERIALIZED_MEM_SIZE_LIMIT) { return sizeof(uint32_t) + size; @@ -119,7 +121,7 @@ int64_t DataTypeDecimal::get_uncompressed_serialized_bytes(const IColumn& col template char* DataTypeDecimal::serialize(const IColumn& column, char* buf, int be_exec_version) const { - if (be_exec_version >= 4) { + if (be_exec_version >= USE_NEW_SERDE) { // row num const auto mem_size = column.size() * sizeof(T); *reinterpret_cast(buf) = mem_size; @@ -156,7 +158,7 @@ char* DataTypeDecimal::serialize(const IColumn& column, char* buf, int be_exe template const char* DataTypeDecimal::deserialize(const char* buf, IColumn* column, int be_exec_version) const { - if (be_exec_version >= 4) { + if (be_exec_version >= USE_NEW_SERDE) { // row num uint32_t mem_size = *reinterpret_cast(buf); buf += sizeof(uint32_t); diff --git a/be/src/vec/data_types/data_type_nullable.cpp b/be/src/vec/data_types/data_type_nullable.cpp index f160a1f323..b10244c852 100644 --- a/be/src/vec/data_types/data_type_nullable.cpp +++ b/be/src/vec/data_types/data_type_nullable.cpp @@ -24,19 +24,20 @@ #include #include #include -#include +#include +#include #include -#include +#include "agent/be_exec_version_manager.h" #include "vec/columns/column.h" #include "vec/columns/column_const.h" #include "vec/columns/column_nullable.h" #include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" #include "vec/common/string_buffer.hpp" -#include "vec/common/typeid_cast.h" #include "vec/core/field.h" +#include "vec/data_types/data_type.h" #include "vec/data_types/data_type_nothing.h" #include "vec/io/reader_buffer.h" @@ -99,7 +100,7 @@ Status DataTypeNullable::from_string(ReadBuffer& rb, IColumn* column) const { // : value1 | value2 | ...> int64_t DataTypeNullable::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - if (be_exec_version >= 4) { + if (be_exec_version >= USE_NEW_SERDE) { size_t ret = 0; if (size_t size = sizeof(bool) * column.size(); size <= SERIALIZED_MEM_SIZE_LIMIT) { ret += size + sizeof(uint32_t); @@ -124,9 +125,9 @@ int64_t DataTypeNullable::get_uncompressed_serialized_bytes(const IColumn& colum } char* DataTypeNullable::serialize(const IColumn& column, char* buf, int be_exec_version) const { - if (be_exec_version >= 4) { + if (be_exec_version >= USE_NEW_SERDE) { auto ptr = column.convert_to_full_column_if_const(); - const ColumnNullable& col = assert_cast(*ptr.get()); + const auto& col = assert_cast(*ptr.get()); // row num auto mem_size = col.size() * sizeof(bool); @@ -147,7 +148,7 @@ char* DataTypeNullable::serialize(const IColumn& column, char* buf, int be_exec_ return nested_data_type->serialize(col.get_nested_column(), buf, be_exec_version); } else { auto ptr = column.convert_to_full_column_if_const(); - const ColumnNullable& col = assert_cast(*ptr.get()); + const auto& col = assert_cast(*ptr.get()); // row num *reinterpret_cast(buf) = column.size(); @@ -162,8 +163,8 @@ char* DataTypeNullable::serialize(const IColumn& column, char* buf, int be_exec_ const char* DataTypeNullable::deserialize(const char* buf, IColumn* column, int be_exec_version) const { - if (be_exec_version >= 4) { - ColumnNullable* col = assert_cast(column); + if (be_exec_version >= USE_NEW_SERDE) { + auto* col = assert_cast(column); // row num uint32_t mem_size = *reinterpret_cast(buf); buf += sizeof(uint32_t); @@ -183,7 +184,7 @@ const char* DataTypeNullable::deserialize(const char* buf, IColumn* column, IColumn& nested = col->get_nested_column(); return nested_data_type->deserialize(buf, &nested, be_exec_version); } else { - ColumnNullable* col = assert_cast(column); + auto* col = assert_cast(column); // row num uint32_t row_num = *reinterpret_cast(buf); buf += sizeof(uint32_t); @@ -229,7 +230,7 @@ DataTypePtr make_nullable(const DataTypePtr& type) { DataTypes make_nullable(const DataTypes& types) { DataTypes nullable_types; - for (auto& type : types) { + for (const auto& type : types) { nullable_types.push_back(make_nullable(type)); } return nullable_types; @@ -244,19 +245,15 @@ DataTypePtr remove_nullable(const DataTypePtr& type) { DataTypes remove_nullable(const DataTypes& types) { DataTypes no_null_types; - for (auto& type : types) { + for (const auto& type : types) { no_null_types.push_back(remove_nullable(type)); } return no_null_types; } bool have_nullable(const DataTypes& types) { - for (auto& type : types) { - if (type->is_nullable()) { - return true; - } - } - return false; + return std::any_of(types.begin(), types.end(), + [](const DataTypePtr& type) { return type->is_nullable(); }); } } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_number_base.cpp b/be/src/vec/data_types/data_type_number_base.cpp index a3ac1d80af..662b8e1873 100644 --- a/be/src/vec/data_types/data_type_number_base.cpp +++ b/be/src/vec/data_types/data_type_number_base.cpp @@ -23,11 +23,12 @@ #include #include #include -#include +#include #include #include +#include "agent/be_exec_version_manager.h" #include "gutil/strings/numbers.h" #include "runtime/large_int_value.h" #include "util/mysql_global.h" @@ -122,9 +123,9 @@ Field DataTypeNumberBase::get_field(const TExprNode& node) const { } if constexpr (std::is_same_v, TypeId>) { StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; - __int128_t value = StringParser::string_to_int<__int128>( - node.large_int_literal.value.c_str(), node.large_int_literal.value.size(), - &parse_result); + auto value = StringParser::string_to_int<__int128>(node.large_int_literal.value.c_str(), + node.large_int_literal.value.size(), + &parse_result); if (parse_result != StringParser::PARSE_SUCCESS) { value = MAX_INT128; } @@ -162,7 +163,7 @@ std::string DataTypeNumberBase::to_string(const IColumn& column, size_t row_n template int64_t DataTypeNumberBase::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - if (be_exec_version >= 4) { + if (be_exec_version >= USE_NEW_SERDE) { auto size = sizeof(T) * column.size(); if (size <= SERIALIZED_MEM_SIZE_LIMIT) { return sizeof(uint32_t) + size; @@ -178,7 +179,7 @@ int64_t DataTypeNumberBase::get_uncompressed_serialized_bytes(const IColumn& template char* DataTypeNumberBase::serialize(const IColumn& column, char* buf, int be_exec_version) const { - if (be_exec_version >= 4) { + if (be_exec_version >= USE_NEW_SERDE) { // row num const auto mem_size = column.size() * sizeof(T); *reinterpret_cast(buf) = mem_size; @@ -215,7 +216,7 @@ char* DataTypeNumberBase::serialize(const IColumn& column, char* buf, template const char* DataTypeNumberBase::deserialize(const char* buf, IColumn* column, int be_exec_version) const { - if (be_exec_version >= 4) { + if (be_exec_version >= USE_NEW_SERDE) { // row num uint32_t mem_size = *reinterpret_cast(buf); buf += sizeof(uint32_t); diff --git a/be/src/vec/data_types/data_type_string.cpp b/be/src/vec/data_types/data_type_string.cpp index a3f180a27c..16440675b2 100644 --- a/be/src/vec/data_types/data_type_string.cpp +++ b/be/src/vec/data_types/data_type_string.cpp @@ -22,8 +22,10 @@ #include #include -#include +#include + +#include "agent/be_exec_version_manager.h" #include "vec/columns/column.h" #include "vec/columns/column_const.h" #include "vec/columns/column_string.h" @@ -77,7 +79,7 @@ bool DataTypeString::equals(const IDataType& rhs) const { // : | = 4) { + if (be_exec_version >= USE_NEW_SERDE) { auto ptr = column.convert_to_full_column_if_const(); const auto& data_column = assert_cast(*ptr.get()); int64_t size = sizeof(uint32_t) + sizeof(uint64_t); @@ -111,7 +113,7 @@ int64_t DataTypeString::get_uncompressed_serialized_bytes(const IColumn& column, } char* DataTypeString::serialize(const IColumn& column, char* buf, int be_exec_version) const { - if (be_exec_version >= 4) { + if (be_exec_version >= USE_NEW_SERDE) { auto ptr = column.convert_to_full_column_if_const(); const auto& data_column = assert_cast(*ptr.get()); @@ -169,8 +171,8 @@ char* DataTypeString::serialize(const IColumn& column, char* buf, int be_exec_ve const char* DataTypeString::deserialize(const char* buf, IColumn* column, int be_exec_version) const { - if (be_exec_version >= 4) { - ColumnString* column_string = assert_cast(column); + if (be_exec_version >= USE_NEW_SERDE) { + auto* column_string = assert_cast(column); ColumnString::Chars& data = column_string->get_chars(); ColumnString::Offsets& offsets = column_string->get_offsets(); @@ -206,7 +208,7 @@ const char* DataTypeString::deserialize(const char* buf, IColumn* column, } return buf; } else { - ColumnString* column_string = assert_cast(column); + auto* column_string = assert_cast(column); ColumnString::Chars& data = column_string->get_chars(); ColumnString::Offsets& offsets = column_string->get_offsets(); // row num diff --git a/be/test/exec/test_data/wal_scanner/wal b/be/test/exec/test_data/wal_scanner/wal index 2c5fe90963..ddce750894 100644 Binary files a/be/test/exec/test_data/wal_scanner/wal and b/be/test/exec/test_data/wal_scanner/wal differ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 00d3ebb8a9..27065b1393 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1713,7 +1713,7 @@ public class Config extends ConfigBase { * Max data version of backends serialize block. */ @ConfField(mutable = false) - public static int max_be_exec_version = 4; + public static int max_be_exec_version = 3; /** * Min data version of backends serialize block.