[Fix](smooth-upgrade) Fix incompatibility when upgrade from 2.0 to 2.1 (#32220)
This commit is contained in:
@ -58,8 +58,10 @@ private:
|
||||
* c. change the string hash method in runtime filter
|
||||
* d. elt function return type change to nullable(string)
|
||||
* e. add repeat_max_num in repeat function
|
||||
* 3: start from doris 2.1
|
||||
* 3: start from doris 2.0 (by some mistakes)
|
||||
* a. aggregation function do not serialize bitmap to string.
|
||||
* 4: start from doris 2.1
|
||||
* a. support window funnel mode from 2.0
|
||||
* b. array contains/position/countequal function return nullable in less situations.
|
||||
* c. cleared old version of Version 2.
|
||||
* d. unix_timestamp function support timestamp with float for datetimev2, and change nullable mode.
|
||||
@ -67,10 +69,11 @@ private:
|
||||
* f. shrink some function's nullable mode.
|
||||
* g. do local merge of remote runtime filter
|
||||
*/
|
||||
constexpr inline int BeExecVersionManager::max_be_exec_version = 3;
|
||||
constexpr inline int BeExecVersionManager::max_be_exec_version = 4;
|
||||
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;
|
||||
|
||||
/// functional
|
||||
constexpr inline int USE_NEW_SERDE = 3; // release on DORIS version 2.1
|
||||
constexpr inline int BITMAP_SERDE = 3;
|
||||
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
#include <mutex>
|
||||
#include <ostream>
|
||||
|
||||
#include "agent/be_exec_version_manager.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/object_pool.h"
|
||||
#include "common/status.h"
|
||||
@ -993,7 +994,7 @@ Status IRuntimeFilter::publish(bool publish_local) {
|
||||
} else if (_has_local_target) {
|
||||
RETURN_IF_ERROR(send_to_local(_wrapper));
|
||||
} else if (!publish_local) {
|
||||
if (_is_broadcast_join || _state->be_exec_version < 3) {
|
||||
if (_is_broadcast_join || _state->be_exec_version < USE_NEW_SERDE) {
|
||||
RETURN_IF_ERROR(send_to_remote(this));
|
||||
} else {
|
||||
RETURN_IF_ERROR(do_local_merge());
|
||||
|
||||
@ -25,6 +25,7 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "agent/be_exec_version_manager.h"
|
||||
#include "common/compiler_util.h" // IWYU pragma: keep
|
||||
#include "util/bitmap_value.h"
|
||||
#include "vec/aggregate_functions/aggregate_function.h"
|
||||
@ -159,7 +160,7 @@ public:
|
||||
|
||||
void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst,
|
||||
const size_t num_rows, Arena* arena) const override {
|
||||
if (version >= 3) {
|
||||
if (version >= BITMAP_SERDE) {
|
||||
auto& col = assert_cast<ColumnBitmap&>(*dst);
|
||||
char place[sizeof(Data)];
|
||||
col.resize(num_rows);
|
||||
@ -177,7 +178,7 @@ public:
|
||||
|
||||
void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset,
|
||||
MutableColumnPtr& dst, const size_t num_rows) const override {
|
||||
if (version >= 3) {
|
||||
if (version >= BITMAP_SERDE) {
|
||||
auto& col = assert_cast<ColumnBitmap&>(*dst);
|
||||
col.resize(num_rows);
|
||||
auto* data = col.get_data().data();
|
||||
@ -191,7 +192,7 @@ public:
|
||||
|
||||
void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column,
|
||||
Arena* arena) const override {
|
||||
if (version >= 3) {
|
||||
if (version >= BITMAP_SERDE) {
|
||||
auto& col = assert_cast<const ColumnBitmap&>(column);
|
||||
const size_t num_rows = column.size();
|
||||
auto* data = col.get_data().data();
|
||||
@ -209,7 +210,7 @@ public:
|
||||
Arena* arena) const override {
|
||||
DCHECK(end <= column.size() && begin <= end)
|
||||
<< ", begin:" << begin << ", end:" << end << ", column.size():" << column.size();
|
||||
if (version >= 3) {
|
||||
if (version >= BITMAP_SERDE) {
|
||||
auto& col = assert_cast<const ColumnBitmap&>(column);
|
||||
auto* data = col.get_data().data();
|
||||
for (size_t i = begin; i <= end; ++i) {
|
||||
@ -223,7 +224,7 @@ public:
|
||||
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
|
||||
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
|
||||
const size_t num_rows) const override {
|
||||
if (version >= 3) {
|
||||
if (version >= BITMAP_SERDE) {
|
||||
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
|
||||
auto* data = col.get_data().data();
|
||||
for (size_t i = 0; i != num_rows; ++i) {
|
||||
@ -237,7 +238,7 @@ public:
|
||||
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
|
||||
AggregateDataPtr rhs, const ColumnString* column,
|
||||
Arena* arena, const size_t num_rows) const override {
|
||||
if (version >= 3) {
|
||||
if (version >= BITMAP_SERDE) {
|
||||
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
|
||||
auto* data = col.get_data().data();
|
||||
for (size_t i = 0; i != num_rows; ++i) {
|
||||
@ -253,7 +254,7 @@ public:
|
||||
|
||||
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
|
||||
IColumn& to) const override {
|
||||
if (version >= 3) {
|
||||
if (version >= BITMAP_SERDE) {
|
||||
auto& col = assert_cast<ColumnBitmap&>(to);
|
||||
size_t old_size = col.size();
|
||||
col.resize(old_size + 1);
|
||||
@ -264,7 +265,7 @@ public:
|
||||
}
|
||||
|
||||
[[nodiscard]] MutableColumnPtr create_serialize_column() const override {
|
||||
if (version >= 3) {
|
||||
if (version >= BITMAP_SERDE) {
|
||||
return ColumnBitmap::create();
|
||||
} else {
|
||||
return ColumnString::create();
|
||||
@ -272,7 +273,7 @@ public:
|
||||
}
|
||||
|
||||
[[nodiscard]] DataTypePtr get_serialized_type() const override {
|
||||
if (version >= 3) {
|
||||
if (version >= BITMAP_SERDE) {
|
||||
return std::make_shared<DataTypeBitMap>();
|
||||
} else {
|
||||
return IAggregateFunction::get_serialized_type();
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "agent/be_exec_version_manager.h"
|
||||
#include "common/compiler_util.h"
|
||||
#include "util/binary_cast.hpp"
|
||||
#include "vec/aggregate_functions/aggregate_function.h"
|
||||
@ -269,8 +270,8 @@ public:
|
||||
|
||||
void create(AggregateDataPtr __restrict place) const override {
|
||||
auto data = new (place) WindowFunnelState<DateValueType, NativeType>();
|
||||
/// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version`
|
||||
data->enable_mode = version >= 3;
|
||||
/// support window funnel mode from 2.1. See `BeExecVersionManager::max_be_exec_version`
|
||||
data->enable_mode = version >= USE_NEW_SERDE;
|
||||
}
|
||||
|
||||
String get_name() const override { return "window_funnel"; }
|
||||
|
||||
@ -111,8 +111,8 @@ class SimpleFunctionFactory {
|
||||
using Creator = std::function<FunctionBuilderPtr()>;
|
||||
using FunctionCreators = phmap::flat_hash_map<std::string, Creator>;
|
||||
using FunctionIsVariadic = phmap::flat_hash_set<std::string>;
|
||||
/// @TEMPORARY: for be_exec_version=3
|
||||
constexpr static int NEWEST_VERSION_FUNCTION_SUBSTITUTE = 3;
|
||||
/// @TEMPORARY: for be_exec_version=4
|
||||
constexpr static int NEWEST_VERSION_FUNCTION_SUBSTITUTE = 4;
|
||||
|
||||
public:
|
||||
void register_function(const std::string& name, const Creator& ptr) {
|
||||
|
||||
Binary file not shown.
@ -1762,7 +1762,7 @@ public class Config extends ConfigBase {
|
||||
* Max data version of backends serialize block.
|
||||
*/
|
||||
@ConfField(mutable = false)
|
||||
public static int max_be_exec_version = 3;
|
||||
public static int max_be_exec_version = 4;
|
||||
|
||||
/**
|
||||
* Min data version of backends serialize block.
|
||||
|
||||
Reference in New Issue
Block a user