[Fix](smooth-upgrade) fix unnecessary high version of smooth upgrade (#30283)
fix unnecessary high version of smooth upgrade
This commit is contained in:
@ -63,11 +63,13 @@ private:
|
||||
* b. array contains/position/countequal function return nullable in less situations.
|
||||
* c. cleared old version of Version 2.
|
||||
* d. unix_timestamp function support timestamp with float for datetimev2, and change nullable mode.
|
||||
* e. the right function outputs NULL when the function contains NULL, substr function returns empty if start > str.length.
|
||||
* 4: start from doris 2.1.x
|
||||
* a. change shuffle serialize/deserialize way
|
||||
* e. change shuffle serialize/deserialize way
|
||||
* f. the right function outputs NULL when the function contains NULL, substr function returns empty if start > str.length.
|
||||
*/
|
||||
inline const int BeExecVersionManager::max_be_exec_version = 4;
|
||||
inline const int BeExecVersionManager::min_be_exec_version = 0;
|
||||
constexpr inline int BeExecVersionManager::max_be_exec_version = 3;
|
||||
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;
|
||||
|
||||
/// functional
|
||||
constexpr inline int USE_NEW_SERDE = 3; // release on DORIS version 2.1
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -29,7 +29,14 @@ namespace doris {
|
||||
|
||||
WalReader::WalReader(const std::string& file_name) : _file_name(file_name), _offset(0) {}
|
||||
|
||||
WalReader::~WalReader() {}
|
||||
WalReader::~WalReader() = default;
|
||||
|
||||
static Status _deserialize(PBlock& block, const std::string& buf) {
|
||||
if (UNLIKELY(!block.ParseFromString(buf))) {
|
||||
return Status::InternalError("failed to deserialize row");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WalReader::init() {
|
||||
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_name, &file_reader));
|
||||
@ -98,13 +105,6 @@ Status WalReader::read_header(std::string& col_ids) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WalReader::_deserialize(PBlock& block, std::string& buf) {
|
||||
if (UNLIKELY(!block.ParseFromString(buf))) {
|
||||
return Status::InternalError("failed to deserialize row");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WalReader::_check_checksum(const char* binary, size_t size, uint32_t checksum) {
|
||||
uint32_t computed_checksum = crc32c::Value(binary, size);
|
||||
if (LIKELY(computed_checksum == checksum)) {
|
||||
|
||||
@ -35,10 +35,8 @@ public:
|
||||
Status read_header(std::string& col_ids);
|
||||
|
||||
private:
|
||||
Status _deserialize(PBlock& block, std::string& buf);
|
||||
Status _check_checksum(const char* binary, size_t size, uint32_t checksum);
|
||||
|
||||
private:
|
||||
std::string _file_name;
|
||||
uint32_t _version = 0;
|
||||
size_t _offset;
|
||||
|
||||
@ -57,19 +57,19 @@ Status WalWriter::finalize() {
|
||||
|
||||
Status WalWriter::append_blocks(const PBlockArray& blocks) {
|
||||
size_t total_size = 0;
|
||||
for (const auto& block : blocks) {
|
||||
total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE;
|
||||
}
|
||||
size_t offset = 0;
|
||||
for (const auto& block : blocks) {
|
||||
uint8_t len_buf[sizeof(uint64_t)];
|
||||
uint64_t block_length = block->ByteSizeLong();
|
||||
total_size += LENGTH_SIZE + block_length + CHECKSUM_SIZE;
|
||||
encode_fixed64_le(len_buf, block_length);
|
||||
RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)}));
|
||||
offset += LENGTH_SIZE;
|
||||
|
||||
std::string content = block->SerializeAsString();
|
||||
RETURN_IF_ERROR(_file_writer->append(content));
|
||||
offset += block_length;
|
||||
|
||||
uint8_t checksum_buf[sizeof(uint32_t)];
|
||||
uint32_t checksum = crc32c::Value(content.data(), block_length);
|
||||
encode_fixed32_le(checksum_buf, checksum);
|
||||
|
||||
@ -23,8 +23,11 @@
|
||||
#include <fmt/format.h>
|
||||
#include <gen_cpp/data.pb.h>
|
||||
#include <streamvbyte.h>
|
||||
#include <string.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <cstring>
|
||||
|
||||
#include "agent/be_exec_version_manager.h"
|
||||
#include "runtime/decimalv2_value.h"
|
||||
#include "util/string_parser.hpp"
|
||||
#include "vec/columns/column.h"
|
||||
@ -77,8 +80,7 @@ void DataTypeDecimal<T>::to_string(const IColumn& column, size_t row_num,
|
||||
auto str = value.to_string(scale);
|
||||
ostr.write(str.data(), str.size());
|
||||
} else {
|
||||
DecimalV2Value value =
|
||||
(DecimalV2Value)assert_cast<const ColumnType&>(*ptr).get_element(row_num);
|
||||
auto value = (DecimalV2Value)assert_cast<const ColumnType&>(*ptr).get_element(row_num);
|
||||
auto str = value.to_string(scale);
|
||||
ostr.write(str.data(), str.size());
|
||||
}
|
||||
@ -104,7 +106,7 @@ Status DataTypeDecimal<T>::from_string(ReadBuffer& rb, IColumn* column) const {
|
||||
template <typename T>
|
||||
int64_t DataTypeDecimal<T>::get_uncompressed_serialized_bytes(const IColumn& column,
|
||||
int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
auto size = sizeof(T) * column.size();
|
||||
if (size <= SERIALIZED_MEM_SIZE_LIMIT) {
|
||||
return sizeof(uint32_t) + size;
|
||||
@ -119,7 +121,7 @@ int64_t DataTypeDecimal<T>::get_uncompressed_serialized_bytes(const IColumn& col
|
||||
|
||||
template <typename T>
|
||||
char* DataTypeDecimal<T>::serialize(const IColumn& column, char* buf, int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
// row num
|
||||
const auto mem_size = column.size() * sizeof(T);
|
||||
*reinterpret_cast<uint32_t*>(buf) = mem_size;
|
||||
@ -156,7 +158,7 @@ char* DataTypeDecimal<T>::serialize(const IColumn& column, char* buf, int be_exe
|
||||
template <typename T>
|
||||
const char* DataTypeDecimal<T>::deserialize(const char* buf, IColumn* column,
|
||||
int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
// row num
|
||||
uint32_t mem_size = *reinterpret_cast<const uint32_t*>(buf);
|
||||
buf += sizeof(uint32_t);
|
||||
|
||||
@ -24,19 +24,20 @@
|
||||
#include <gen_cpp/data.pb.h>
|
||||
#include <glog/logging.h>
|
||||
#include <streamvbyte.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstring>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "agent/be_exec_version_manager.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
#include "vec/columns/columns_number.h"
|
||||
#include "vec/common/assert_cast.h"
|
||||
#include "vec/common/string_buffer.hpp"
|
||||
#include "vec/common/typeid_cast.h"
|
||||
#include "vec/core/field.h"
|
||||
#include "vec/data_types/data_type.h"
|
||||
#include "vec/data_types/data_type_nothing.h"
|
||||
#include "vec/io/reader_buffer.h"
|
||||
|
||||
@ -99,7 +100,7 @@ Status DataTypeNullable::from_string(ReadBuffer& rb, IColumn* column) const {
|
||||
// <values array>: value1 | value2 | ...>
|
||||
int64_t DataTypeNullable::get_uncompressed_serialized_bytes(const IColumn& column,
|
||||
int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
size_t ret = 0;
|
||||
if (size_t size = sizeof(bool) * column.size(); size <= SERIALIZED_MEM_SIZE_LIMIT) {
|
||||
ret += size + sizeof(uint32_t);
|
||||
@ -124,9 +125,9 @@ int64_t DataTypeNullable::get_uncompressed_serialized_bytes(const IColumn& colum
|
||||
}
|
||||
|
||||
char* DataTypeNullable::serialize(const IColumn& column, char* buf, int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
auto ptr = column.convert_to_full_column_if_const();
|
||||
const ColumnNullable& col = assert_cast<const ColumnNullable&>(*ptr.get());
|
||||
const auto& col = assert_cast<const ColumnNullable&>(*ptr.get());
|
||||
|
||||
// row num
|
||||
auto mem_size = col.size() * sizeof(bool);
|
||||
@ -147,7 +148,7 @@ char* DataTypeNullable::serialize(const IColumn& column, char* buf, int be_exec_
|
||||
return nested_data_type->serialize(col.get_nested_column(), buf, be_exec_version);
|
||||
} else {
|
||||
auto ptr = column.convert_to_full_column_if_const();
|
||||
const ColumnNullable& col = assert_cast<const ColumnNullable&>(*ptr.get());
|
||||
const auto& col = assert_cast<const ColumnNullable&>(*ptr.get());
|
||||
|
||||
// row num
|
||||
*reinterpret_cast<uint32_t*>(buf) = column.size();
|
||||
@ -162,8 +163,8 @@ char* DataTypeNullable::serialize(const IColumn& column, char* buf, int be_exec_
|
||||
|
||||
const char* DataTypeNullable::deserialize(const char* buf, IColumn* column,
|
||||
int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
ColumnNullable* col = assert_cast<ColumnNullable*>(column);
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
auto* col = assert_cast<ColumnNullable*>(column);
|
||||
// row num
|
||||
uint32_t mem_size = *reinterpret_cast<const uint32_t*>(buf);
|
||||
buf += sizeof(uint32_t);
|
||||
@ -183,7 +184,7 @@ const char* DataTypeNullable::deserialize(const char* buf, IColumn* column,
|
||||
IColumn& nested = col->get_nested_column();
|
||||
return nested_data_type->deserialize(buf, &nested, be_exec_version);
|
||||
} else {
|
||||
ColumnNullable* col = assert_cast<ColumnNullable*>(column);
|
||||
auto* col = assert_cast<ColumnNullable*>(column);
|
||||
// row num
|
||||
uint32_t row_num = *reinterpret_cast<const uint32_t*>(buf);
|
||||
buf += sizeof(uint32_t);
|
||||
@ -229,7 +230,7 @@ DataTypePtr make_nullable(const DataTypePtr& type) {
|
||||
|
||||
DataTypes make_nullable(const DataTypes& types) {
|
||||
DataTypes nullable_types;
|
||||
for (auto& type : types) {
|
||||
for (const auto& type : types) {
|
||||
nullable_types.push_back(make_nullable(type));
|
||||
}
|
||||
return nullable_types;
|
||||
@ -244,19 +245,15 @@ DataTypePtr remove_nullable(const DataTypePtr& type) {
|
||||
|
||||
DataTypes remove_nullable(const DataTypes& types) {
|
||||
DataTypes no_null_types;
|
||||
for (auto& type : types) {
|
||||
for (const auto& type : types) {
|
||||
no_null_types.push_back(remove_nullable(type));
|
||||
}
|
||||
return no_null_types;
|
||||
}
|
||||
|
||||
bool have_nullable(const DataTypes& types) {
|
||||
for (auto& type : types) {
|
||||
if (type->is_nullable()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return std::any_of(types.begin(), types.end(),
|
||||
[](const DataTypePtr& type) { return type->is_nullable(); });
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -23,11 +23,12 @@
|
||||
#include <fmt/format.h>
|
||||
#include <glog/logging.h>
|
||||
#include <streamvbyte.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <cstring>
|
||||
#include <limits>
|
||||
#include <type_traits>
|
||||
|
||||
#include "agent/be_exec_version_manager.h"
|
||||
#include "gutil/strings/numbers.h"
|
||||
#include "runtime/large_int_value.h"
|
||||
#include "util/mysql_global.h"
|
||||
@ -122,9 +123,9 @@ Field DataTypeNumberBase<T>::get_field(const TExprNode& node) const {
|
||||
}
|
||||
if constexpr (std::is_same_v<TypeId<T>, TypeId<Int128>>) {
|
||||
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
|
||||
__int128_t value = StringParser::string_to_int<__int128>(
|
||||
node.large_int_literal.value.c_str(), node.large_int_literal.value.size(),
|
||||
&parse_result);
|
||||
auto value = StringParser::string_to_int<__int128>(node.large_int_literal.value.c_str(),
|
||||
node.large_int_literal.value.size(),
|
||||
&parse_result);
|
||||
if (parse_result != StringParser::PARSE_SUCCESS) {
|
||||
value = MAX_INT128;
|
||||
}
|
||||
@ -162,7 +163,7 @@ std::string DataTypeNumberBase<T>::to_string(const IColumn& column, size_t row_n
|
||||
template <typename T>
|
||||
int64_t DataTypeNumberBase<T>::get_uncompressed_serialized_bytes(const IColumn& column,
|
||||
int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
auto size = sizeof(T) * column.size();
|
||||
if (size <= SERIALIZED_MEM_SIZE_LIMIT) {
|
||||
return sizeof(uint32_t) + size;
|
||||
@ -178,7 +179,7 @@ int64_t DataTypeNumberBase<T>::get_uncompressed_serialized_bytes(const IColumn&
|
||||
template <typename T>
|
||||
char* DataTypeNumberBase<T>::serialize(const IColumn& column, char* buf,
|
||||
int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
// row num
|
||||
const auto mem_size = column.size() * sizeof(T);
|
||||
*reinterpret_cast<uint32_t*>(buf) = mem_size;
|
||||
@ -215,7 +216,7 @@ char* DataTypeNumberBase<T>::serialize(const IColumn& column, char* buf,
|
||||
template <typename T>
|
||||
const char* DataTypeNumberBase<T>::deserialize(const char* buf, IColumn* column,
|
||||
int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
// row num
|
||||
uint32_t mem_size = *reinterpret_cast<const uint32_t*>(buf);
|
||||
buf += sizeof(uint32_t);
|
||||
|
||||
@ -22,8 +22,10 @@
|
||||
|
||||
#include <lz4/lz4.h>
|
||||
#include <streamvbyte.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <cstring>
|
||||
|
||||
#include "agent/be_exec_version_manager.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/columns/column_string.h"
|
||||
@ -77,7 +79,7 @@ bool DataTypeString::equals(const IDataType& rhs) const {
|
||||
// <value array> : <value1> | <value2 | ...
|
||||
int64_t DataTypeString::get_uncompressed_serialized_bytes(const IColumn& column,
|
||||
int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
auto ptr = column.convert_to_full_column_if_const();
|
||||
const auto& data_column = assert_cast<const ColumnString&>(*ptr.get());
|
||||
int64_t size = sizeof(uint32_t) + sizeof(uint64_t);
|
||||
@ -111,7 +113,7 @@ int64_t DataTypeString::get_uncompressed_serialized_bytes(const IColumn& column,
|
||||
}
|
||||
|
||||
char* DataTypeString::serialize(const IColumn& column, char* buf, int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
auto ptr = column.convert_to_full_column_if_const();
|
||||
const auto& data_column = assert_cast<const ColumnString&>(*ptr.get());
|
||||
|
||||
@ -169,8 +171,8 @@ char* DataTypeString::serialize(const IColumn& column, char* buf, int be_exec_ve
|
||||
|
||||
const char* DataTypeString::deserialize(const char* buf, IColumn* column,
|
||||
int be_exec_version) const {
|
||||
if (be_exec_version >= 4) {
|
||||
ColumnString* column_string = assert_cast<ColumnString*>(column);
|
||||
if (be_exec_version >= USE_NEW_SERDE) {
|
||||
auto* column_string = assert_cast<ColumnString*>(column);
|
||||
ColumnString::Chars& data = column_string->get_chars();
|
||||
ColumnString::Offsets& offsets = column_string->get_offsets();
|
||||
|
||||
@ -206,7 +208,7 @@ const char* DataTypeString::deserialize(const char* buf, IColumn* column,
|
||||
}
|
||||
return buf;
|
||||
} else {
|
||||
ColumnString* column_string = assert_cast<ColumnString*>(column);
|
||||
auto* column_string = assert_cast<ColumnString*>(column);
|
||||
ColumnString::Chars& data = column_string->get_chars();
|
||||
ColumnString::Offsets& offsets = column_string->get_offsets();
|
||||
// row num
|
||||
|
||||
Binary file not shown.
@ -1713,7 +1713,7 @@ public class Config extends ConfigBase {
|
||||
* Max data version of backends serialize block.
|
||||
*/
|
||||
@ConfField(mutable = false)
|
||||
public static int max_be_exec_version = 4;
|
||||
public static int max_be_exec_version = 3;
|
||||
|
||||
/**
|
||||
* Min data version of backends serialize block.
|
||||
|
||||
Reference in New Issue
Block a user