diff --git a/be/src/agent/be_exec_version_manager.h b/be/src/agent/be_exec_version_manager.h new file mode 100644 index 0000000000..f3e7a1c38c --- /dev/null +++ b/be/src/agent/be_exec_version_manager.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace doris { + +class BeExecVersionManager { +public: + BeExecVersionManager() = delete; + + static bool check_be_exec_version(int be_exec_version) { + if (be_exec_version > max_be_exec_version || be_exec_version < min_be_exec_version) { + LOG(WARNING) << fmt::format( + "Received be_exec_version is not supported, be_exec_version={}, " + "min_be_exec_version={}, max_be_exec_version={}, maybe due to FE version not " + "match " + "with BE.", + be_exec_version, min_be_exec_version, max_be_exec_version); + return false; + } + return true; + } + + static int get_newest_version() { return max_be_exec_version; } + +private: + static const int max_be_exec_version; + static const int min_be_exec_version; +}; + +// When we have some breaking change for execute engine, we should update be_exec_version. +// 0: not contain be_exec_version. +// 1: start from doris 1.2 +// a. remove ColumnString terminating zero. +// b. runtime filter use new hash method. +inline const int BeExecVersionManager::max_be_exec_version = 1; +inline const int BeExecVersionManager::min_be_exec_version = 0; + +} // namespace doris diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index f1895dc561..5a412733eb 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -19,10 +19,6 @@ #include -#include -#include -#include - #include "common/status.h" #include "gen_cpp/HeartbeatService.h" #include "gen_cpp/Status_types.h" @@ -33,23 +29,8 @@ #include "util/thrift_server.h" #include "util/time.h" -using std::fstream; -using std::nothrow; -using std::string; -using std::vector; -using apache::thrift::TProcessor; - namespace doris { -// When we have some breaking change for execute engine, we should update be_exec_version. -// 0: not contain be_exec_version. -// 1: remove ColumnString's terminating zero. -const int HeartbeatServer::max_be_exec_version = 1; -const int HeartbeatServer::min_be_exec_version = 0; - -// For support rolling upgrade, we send data as second newest version. -int HeartbeatServer::be_exec_version = max_be_exec_version - 1; - HeartbeatServer::HeartbeatServer(TMasterInfo* master_info) : _master_info(master_info), _fe_epoch(0) { _olap_engine = StorageEngine::instance(); @@ -171,13 +152,6 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { _master_info->__set_backend_id(master_info.backend_id); } - if (master_info.__isset.be_exec_version && check_be_exec_version(master_info.be_exec_version) && - be_exec_version != master_info.be_exec_version) { - LOG(INFO) << fmt::format("be_exec_version changed from {} to {}", be_exec_version, - master_info.be_exec_version); - be_exec_version = master_info.be_exec_version; - } - if (need_report) { LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; _olap_engine->notify_listeners(); @@ -189,7 +163,7 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port, ThriftServer** thrift_server, uint32_t worker_thread_num, TMasterInfo* local_master_info) { - HeartbeatServer* heartbeat_server = new (nothrow) HeartbeatServer(local_master_info); + HeartbeatServer* heartbeat_server = new HeartbeatServer(local_master_info); if (heartbeat_server == nullptr) { return Status::InternalError("Get heartbeat server failed"); } @@ -197,10 +171,10 @@ Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port, heartbeat_server->init_cluster_id(); std::shared_ptr handler(heartbeat_server); - std::shared_ptr server_processor(new HeartbeatServiceProcessor(handler)); - string server_name("heartbeat"); + std::shared_ptr server_processor( + new HeartbeatServiceProcessor(handler)); *thrift_server = - new ThriftServer(server_name, server_processor, server_port, worker_thread_num); + new ThriftServer("heartbeat", server_processor, server_port, worker_thread_num); return Status::OK(); } } // namespace doris diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h index f002b8cbf0..73bfbba7fa 100644 --- a/be/src/agent/heartbeat_server.h +++ b/be/src/agent/heartbeat_server.h @@ -47,23 +47,6 @@ public: // * heartbeat_result: The result of heartbeat set void heartbeat(THeartbeatResult& heartbeat_result, const TMasterInfo& master_info) override; - static const int max_be_exec_version; - static const int min_be_exec_version; - static int be_exec_version; - - static bool check_be_exec_version(int be_exec_version) { - if (be_exec_version > max_be_exec_version || be_exec_version < min_be_exec_version) { - LOG(WARNING) << fmt::format( - "Received be_exec_version is not supported, be_exec_version={}, " - "min_be_exec_version={}, max_be_exec_version={}, maybe due to FE version not " - "match " - "with BE.", - be_exec_version, min_be_exec_version, max_be_exec_version); - return false; - } - return true; - } - private: Status _heartbeat(const TMasterInfo& master_info); diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h index 545c40cc5b..ec3b24434e 100644 --- a/be/src/exprs/bloomfilter_predicate.h +++ b/be/src/exprs/bloomfilter_predicate.h @@ -197,6 +197,8 @@ public: virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0; + virtual void insert_fixed_len(const char* data) = 0; + virtual uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, uint16_t* offsets, int number) = 0; @@ -222,6 +224,10 @@ struct CommonFindOp { } } + void insert_single(BloomFilterAdaptor& bloom_filter, const char* data) const { + bloom_filter.add_element(*((T*)data)); + } + uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data, const uint8* nullmap, uint16_t* offsets, int number) const { uint16_t new_size = 0; @@ -272,6 +278,10 @@ struct StringFindOp { LOG(FATAL) << "StringFindOp does not support insert_batch"; } + void insert_single(BloomFilterAdaptor& bloom_filter, const char* data) const { + LOG(FATAL) << "StringFindOp does not support insert_single"; + } + uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data, const uint8* nullmap, uint16_t* offsets, int number) const { LOG(FATAL) << "StringFindOp does not support find_batch_olap_engine"; @@ -412,6 +422,11 @@ public: dummy.insert_batch(*_bloom_filter, data, offsets, number); } + void insert_fixed_len(const char* data) override { + DCHECK(_bloom_filter != nullptr); + dummy.insert_single(*_bloom_filter, data); + } + uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, uint16_t* offsets, int number) override { return dummy.find_batch_olap_engine(*_bloom_filter, data, nullmap, offsets, number); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 77a476d095..b8700be923 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -18,6 +18,7 @@ #include "runtime_filter.h" #include +#include #include "common/object_pool.h" #include "common/status.h" @@ -403,16 +404,20 @@ class RuntimePredicateWrapper { public: RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, const RuntimeFilterParams* params) - : _pool(pool), + : _state(state), + _pool(pool), _column_return_type(params->column_return_type), _filter_type(params->filter_type), _fragment_instance_id(params->fragment_instance_id), _filter_id(params->filter_id) {} // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge - RuntimePredicateWrapper(ObjectPool* pool, RuntimeFilterType type, UniqueId fragment_instance_id, + RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, PrimitiveType column_type, + RuntimeFilterType type, UniqueId fragment_instance_id, uint32_t filter_id) - : _pool(pool), + : _state(state), + _pool(pool), + _column_return_type(column_type), _filter_type(type), _fragment_instance_id(fragment_instance_id), _filter_id(filter_id) {} @@ -452,14 +457,29 @@ public: << "Can not change to bloom filter because of runtime filter type is " << to_string(_filter_type); _is_bloomfilter = true; + insert_to_bloom_filter(_bloomfilter_func.get()); + // release in filter + _hybrid_set.reset(create_set(_column_return_type)); + } + + void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { if (_hybrid_set->size() > 0) { + bool use_batch = _state->enable_vectorized_exec() && + IRuntimeFilter::enable_use_batch(_state->be_exec_version(), + _column_return_type); auto it = _hybrid_set->begin(); - while (it->has_next()) { - _bloomfilter_func->insert(it->get_value()); - it->next(); + + if (use_batch) { + while (it->has_next()) { + bloom_filter->insert_fixed_len((char*)it->get_value()); + it->next(); + } + } else { + while (it->has_next()) { + bloom_filter->insert(it->get_value()); + it->next(); + } } - // release in filter - _hybrid_set.reset(create_set(_column_return_type)); } } @@ -561,7 +581,7 @@ public: } void insert_batch(const vectorized::ColumnPtr column, const std::vector& rows) { - if (IRuntimeFilter::enable_use_batch(_column_return_type)) { + if (IRuntimeFilter::enable_use_batch(_state->be_exec_version(), _column_return_type)) { insert_fixed_len(column->get_raw_data().data, rows.data(), rows.size()); } else { for (int index : rows) { @@ -676,12 +696,7 @@ public: << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " << *(wrapper->get_ignored_in_filter_msg()); - auto it = wrapper->_hybrid_set->begin(); - while (it->has_next()) { - auto value = it->get_value(); - _bloomfilter_func->insert(value); - it->next(); - } + wrapper->insert_to_bloom_filter(_bloomfilter_func.get()); // bloom filter merge bloom filter } else { _bloomfilter_func->merge(wrapper->_bloomfilter_func.get()); @@ -1043,6 +1058,7 @@ public: } private: + RuntimeState* _state; ObjectPool* _pool; PrimitiveType _column_return_type; // column type RuntimeFilterType _filter_type; @@ -1254,14 +1270,16 @@ Status IRuntimeFilter::serialize(PPublishFilterRequest* request, void** data, in return serialize_impl(request, data, len); } -Status IRuntimeFilter::create_wrapper(const MergeRuntimeFilterParams* param, ObjectPool* pool, +Status IRuntimeFilter::create_wrapper(RuntimeState* state, const MergeRuntimeFilterParams* param, + ObjectPool* pool, std::unique_ptr* wrapper) { - return _create_wrapper(param, pool, wrapper); + return _create_wrapper(state, param, pool, wrapper); } -Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParams* param, ObjectPool* pool, +Status IRuntimeFilter::create_wrapper(RuntimeState* state, const UpdateRuntimeFilterParams* param, + ObjectPool* pool, std::unique_ptr* wrapper) { - return _create_wrapper(param, pool, wrapper); + return _create_wrapper(state, param, pool, wrapper); } void IRuntimeFilter::change_to_bloom_filter() { @@ -1273,10 +1291,14 @@ void IRuntimeFilter::change_to_bloom_filter() { } template -Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, +Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, ObjectPool* pool, std::unique_ptr* wrapper) { int filter_type = param->request->filter_type(); - wrapper->reset(new RuntimePredicateWrapper(pool, get_type(filter_type), + PrimitiveType column_type = PrimitiveType::INVALID_TYPE; + if (param->request->has_in_filter()) { + column_type = to_primitive_type(param->request->in_filter().column_type()); + } + wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type, get_type(filter_type), UniqueId(param->request->fragment_id()), param->request->filter_id())); @@ -1642,7 +1664,7 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { set_ignored_msg(*msg); } std::unique_ptr wrapper; - RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _pool, &wrapper)); + RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, &wrapper)); auto origin_type = _wrapper->get_real_type(); RETURN_IF_ERROR(_wrapper->merge(wrapper.get())); if (origin_type != _wrapper->get_real_type()) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 42584d64f9..14ed389531 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -17,11 +17,6 @@ #pragma once -#include -#include -#include -#include - #include "exprs/expr_context.h" #include "util/runtime_profile.h" #include "util/time.h" @@ -213,9 +208,11 @@ public: // for ut const RuntimePredicateWrapper* get_wrapper(); - static Status create_wrapper(const MergeRuntimeFilterParams* param, ObjectPool* pool, + static Status create_wrapper(RuntimeState* state, const MergeRuntimeFilterParams* param, + ObjectPool* pool, std::unique_ptr* wrapper); - static Status create_wrapper(const UpdateRuntimeFilterParams* param, ObjectPool* pool, + static Status create_wrapper(RuntimeState* state, const UpdateRuntimeFilterParams* param, + ObjectPool* pool, std::unique_ptr* wrapper); void change_to_bloom_filter(); Status update_filter(const UpdateRuntimeFilterParams* param); @@ -245,8 +242,8 @@ public: void ready_for_publish(); - static bool enable_use_batch(PrimitiveType type) { - return is_int_or_bool(type) || is_float_or_double(type); + static bool enable_use_batch(int be_exec_version, PrimitiveType type) { + return be_exec_version > 0 && (is_int_or_bool(type) || is_float_or_double(type)); } protected: @@ -258,7 +255,7 @@ protected: Status serialize_impl(T* request, void** data, int* len); template - static Status _create_wrapper(const T* param, ObjectPool* pool, + static Status _create_wrapper(RuntimeState* state, const T* param, ObjectPool* pool, std::unique_ptr* wrapper); RuntimeState* _state; diff --git a/be/src/olap/bloom_filter_predicate.cpp b/be/src/olap/bloom_filter_predicate.cpp index 8d281cb06a..82688d4f0c 100644 --- a/be/src/olap/bloom_filter_predicate.cpp +++ b/be/src/olap/bloom_filter_predicate.cpp @@ -41,26 +41,26 @@ namespace doris { ColumnPredicate* BloomFilterColumnPredicateFactory::create_column_predicate( uint32_t column_id, const std::shared_ptr& bloom_filter, - FieldType type) { + FieldType type, int be_exec_version) { std::shared_ptr filter; switch (type) { -#define M(NAME) \ - case OLAP_FIELD_##NAME: { \ - filter.reset(create_bloom_filter(NAME)); \ - filter->light_copy(bloom_filter.get()); \ - return new BloomFilterColumnPredicate(column_id, filter); \ +#define M(NAME) \ + case OLAP_FIELD_##NAME: { \ + filter.reset(create_bloom_filter(NAME)); \ + filter->light_copy(bloom_filter.get()); \ + return new BloomFilterColumnPredicate(column_id, filter, be_exec_version); \ } APPLY_FOR_PRIMTYPE(M) #undef M case OLAP_FIELD_TYPE_DECIMAL: { filter.reset(create_bloom_filter(TYPE_DECIMALV2)); filter->light_copy(bloom_filter.get()); - return new BloomFilterColumnPredicate(column_id, filter); + return new BloomFilterColumnPredicate(column_id, filter, be_exec_version); } case OLAP_FIELD_TYPE_BOOL: { filter.reset(create_bloom_filter(TYPE_BOOLEAN)); filter->light_copy(bloom_filter.get()); - return new BloomFilterColumnPredicate(column_id, filter); + return new BloomFilterColumnPredicate(column_id, filter, be_exec_version); } default: return nullptr; diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h index dcced4edef..8c2af710fa 100644 --- a/be/src/olap/bloom_filter_predicate.h +++ b/be/src/olap/bloom_filter_predicate.h @@ -17,8 +17,6 @@ #pragma once -#include - #include "exprs/bloomfilter_predicate.h" #include "exprs/runtime_filter.h" #include "olap/column_predicate.h" @@ -37,10 +35,12 @@ public: using SpecificFilter = BloomFilterFunc; BloomFilterColumnPredicate(uint32_t column_id, - const std::shared_ptr& filter) + const std::shared_ptr& filter, + int be_exec_version) : ColumnPredicate(column_id), _filter(filter), - _specific_filter(static_cast(_filter.get())) {} + _specific_filter(static_cast(_filter.get())), + _be_exec_version(be_exec_version) {} ~BloomFilterColumnPredicate() override = default; PredicateType type() const override { return PredicateType::BF; } @@ -81,7 +81,7 @@ private: new_size += _specific_filter->find_uint32_t(dict_col->get_hash_value(idx)); } } - } else if (IRuntimeFilter::enable_use_batch(T)) { + } else if (IRuntimeFilter::enable_use_batch(_be_exec_version, T)) { new_size = _specific_filter->find_fixed_len_olap_engine( (char*)reinterpret_cast*>(&column) ->get_data() @@ -125,6 +125,7 @@ private: mutable uint64_t _passed_rows = 0; mutable bool _always_true = false; mutable bool _has_calculate_filter = false; + int _be_exec_version; }; template @@ -178,7 +179,8 @@ uint16_t BloomFilterColumnPredicate::evaluate(const vectorized::IColumn& colu class BloomFilterColumnPredicateFactory { public: static ColumnPredicate* create_column_predicate( - uint32_t column_id, const std::shared_ptr& filter, FieldType type); + uint32_t column_id, const std::shared_ptr& filter, FieldType type, + int be_exec_version); }; } //namespace doris diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 2f629a807e..b1a1154346 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -21,11 +21,7 @@ #include "common/status.h" #include "olap/bloom_filter_predicate.h" -#include "olap/collect_iterator.h" -#include "olap/comparison_predicate.h" -#include "olap/in_list_predicate.h" #include "olap/like_column_predicate.h" -#include "olap/null_predicate.h" #include "olap/olap_common.h" #include "olap/predicate_creator.h" #include "olap/row.h" @@ -33,9 +29,6 @@ #include "olap/schema.h" #include "olap/tablet.h" #include "runtime/mem_pool.h" -#include "util/mem_util.hpp" -#include "util/string_util.h" -#include "vec/data_types/data_type_decimal.h" namespace doris { @@ -212,7 +205,6 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params, _reader_context.is_upper_keys_included = &_is_upper_keys_included; _reader_context.delete_handler = &_delete_handler; _reader_context.stats = &_stats; - _reader_context.runtime_state = read_params.runtime_state; _reader_context.use_page_cache = read_params.use_page_cache; _reader_context.sequence_id_idx = _sequence_col_idx; _reader_context.batch_size = _batch_size; @@ -236,6 +228,7 @@ Status TabletReader::_init_params(const ReaderParams& read_params) { _reader_type = read_params.reader_type; _tablet = read_params.tablet; _tablet_schema = read_params.tablet_schema; + _reader_context.runtime_state = read_params.runtime_state; _init_conditions_param(read_params); @@ -470,8 +463,9 @@ ColumnPredicate* TabletReader::_parse_to_predicate( return nullptr; } const TabletColumn& column = _tablet_schema->column(index); - return BloomFilterColumnPredicateFactory::create_column_predicate(index, bloom_filter.second, - column.type()); + return BloomFilterColumnPredicateFactory::create_column_predicate( + index, bloom_filter.second, column.type(), + _reader_context.runtime_state->be_exec_version()); } ColumnPredicate* TabletReader::_parse_to_predicate(const FunctionFilter& function_filter) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 511979d2a8..f70b789661 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -25,11 +25,8 @@ #include "agent/cgroups_mgr.h" #include "common/object_pool.h" -#include "common/resource_tls.h" #include "common/signal_handler.h" -#include "gen_cpp/DataSinks_types.h" #include "gen_cpp/FrontendService.h" -#include "gen_cpp/HeartbeatService.h" #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/QueryPlanExtra_types.h" @@ -662,11 +659,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi exec_state->set_need_wait_execution_trigger(); } + RETURN_IF_ERROR(exec_state->prepare(params)); + std::shared_ptr handler; - _runtimefilter_controller.add_entity(params, &handler); + _runtimefilter_controller.add_entity(params, &handler, exec_state->executor()->runtime_state()); exec_state->set_merge_controller_handler(handler); - RETURN_IF_ERROR(exec_state->prepare(params)); { std::lock_guard lock(_lock); _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state)); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 0ed812737f..9bc95e2de1 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -19,13 +19,11 @@ #include -#include "client_cache.h" #include "exprs/bloomfilter_predicate.h" #include "exprs/runtime_filter.h" #include "gen_cpp/internal_service.pb.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" -#include "runtime/plan_fragment_executor.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "service/brpc.h" @@ -147,7 +145,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cntVal->runtime_filter_desc = *runtime_filter_desc; cntVal->target_info = *target_info; cntVal->pool.reset(new ObjectPool()); - cntVal->filter = cntVal->pool->add(new IRuntimeFilter(nullptr, cntVal->pool.get())); + cntVal->filter = cntVal->pool->add(new IRuntimeFilter(_state, cntVal->pool.get())); std::string filter_id = std::to_string(runtime_filter_desc->filter_id); // LOG(INFO) << "entity filter id:" << filter_id; @@ -201,7 +199,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ MergeRuntimeFilterParams params(request, attach_data); ObjectPool* pool = iter->second->pool.get(); RuntimeFilterWrapperHolder holder; - RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms, pool, holder.getHandle())); + RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, ¶ms, pool, holder.getHandle())); RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get())); cntVal->arrive_id.insert(UniqueId(request->fragment_id()).to_string()); merged_size = cntVal->arrive_id.size(); @@ -278,7 +276,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ Status RuntimeFilterMergeController::add_entity( const TExecPlanFragmentParams& params, - std::shared_ptr* handle) { + std::shared_ptr* handle, RuntimeState* state) { if (!params.params.__isset.runtime_filter_params || params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) { return Status::OK(); @@ -296,7 +294,7 @@ Status RuntimeFilterMergeController::add_entity( if (iter == _filter_controller_map[shard].end()) { *handle = std::shared_ptr( - new RuntimeFilterMergeControllerEntity(), entity_closer); + new RuntimeFilterMergeControllerEntity(state), entity_closer); _filter_controller_map[shard][query_id_str] = *handle; const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params; RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index a3b3add74b..e090be6f4f 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -29,7 +29,7 @@ #include "exprs/runtime_filter.h" #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/PlanNodes_types.h" -#include "util/time.h" +#include "runtime/runtime_state.h" #include "util/uid_util.h" namespace butil { @@ -110,7 +110,8 @@ private: // the class is destroyed with the last fragment_exec. class RuntimeFilterMergeControllerEntity { public: - RuntimeFilterMergeControllerEntity() : _query_id(0, 0), _fragment_instance_id(0, 0) {} + RuntimeFilterMergeControllerEntity(RuntimeState* state) + : _query_id(0, 0), _fragment_instance_id(0, 0), _state(state) {} ~RuntimeFilterMergeControllerEntity() = default; Status init(UniqueId query_id, UniqueId fragment_instance_id, @@ -150,6 +151,7 @@ private: // TODO: convert filter id to i32 // filter-id -> val std::map> _filter_map; + RuntimeState* _state; }; // RuntimeFilterMergeController has a map query-id -> entity @@ -163,7 +165,8 @@ public: // If a query-id -> entity already exists // add_entity will return a exists entity Status add_entity(const TExecPlanFragmentParams& params, - std::shared_ptr* handle); + std::shared_ptr* handle, + RuntimeState* state); // thread safe // increate a reference count // if a query-id is not exist diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 71831667d0..83f651725c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -20,20 +20,13 @@ #pragma once -#include #include -#include -#include -#include -#include -#include #include "cctz/time_zone.h" #include "common/global_types.h" #include "common/object_pool.h" #include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions #include "gen_cpp/Types_types.h" // for TUniqueId -#include "runtime/mem_pool.h" #include "runtime/query_fragments_ctx.h" #include "runtime/thread_resource_mgr.h" #include "util/runtime_profile.h" @@ -329,6 +322,13 @@ public: bool enable_vectorized_exec() const { return _query_options.enable_vectorized_engine; } + int be_exec_version() const { + if (!_query_options.__isset.be_exec_version) { + return 0; + } + return _query_options.be_exec_version; + } + bool trim_tailing_spaces_for_external_table_query() const { return _query_options.trim_tailing_spaces_for_external_table_query; } @@ -341,7 +341,7 @@ public: return _query_options.enable_enable_exchange_node_parallel_merge; } - segment_v2::CompressionTypePB fragement_transmission_compression_type() { + segment_v2::CompressionTypePB fragement_transmission_compression_type() const { if (_query_options.__isset.fragment_transmission_compression_codec) { if (_query_options.fragment_transmission_compression_codec == "lz4") { return segment_v2::CompressionTypePB::LZ4; diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.cpp b/be/src/vec/aggregate_functions/aggregate_function_sort.cpp index b0566b829f..8d14621709 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sort.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_sort.cpp @@ -17,23 +17,21 @@ #include "vec/aggregate_functions/aggregate_function_sort.h" -#include "vec/aggregate_functions/aggregate_function_combinator.h" #include "vec/aggregate_functions/aggregate_function_simple_factory.h" -#include "vec/common/typeid_cast.h" -#include "vec/data_types/data_type_nullable.h" namespace doris::vectorized { AggregateFunctionPtr transform_to_sort_agg_function(const AggregateFunctionPtr& nested_function, const DataTypes& arguments, - const SortDescription& sort_desc) { + const SortDescription& sort_desc, + RuntimeState* state) { DCHECK(nested_function != nullptr); if (nested_function == nullptr) { return nullptr; } - return std::make_shared>(nested_function, - arguments, sort_desc); + return std::make_shared>( + nested_function, arguments, sort_desc, state); }; } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h index 201bd5df62..1f19996f04 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sort.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h @@ -20,16 +20,9 @@ #include #include +#include "runtime/runtime_state.h" #include "vec/aggregate_functions/aggregate_function.h" -#include "vec/aggregate_functions/key_holder_helpers.h" #include "vec/columns/column.h" -#include "vec/columns/column_const.h" -#include "vec/common/aggregation_common.h" -#include "vec/common/assert_cast.h" -#include "vec/common/field_visitors.h" -#include "vec/common/hash_table/hash_set.h" -#include "vec/common/hash_table/hash_table.h" -#include "vec/common/sip_hash.h" #include "vec/core/sort_block.h" #include "vec/core/sort_description.h" #include "vec/io/io_helper.h" @@ -57,11 +50,11 @@ struct AggregateFunctionSortData { } } - void serialize(BufferWritable& buf) const { + void serialize(const RuntimeState* state, BufferWritable& buf) const { PBlock pblock; size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; - block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes, + block.serialize(state->be_exec_version(), &pblock, &uncompressed_bytes, &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY); write_string_binary(pblock.SerializeAsString(), buf); @@ -99,6 +92,7 @@ private: DataTypes _arguments; const SortDescription& _sort_desc; Block _block; + const RuntimeState* _state; AggregateDataPtr get_nested_place(AggregateDataPtr __restrict place) const noexcept { return place + prefix_size; @@ -110,12 +104,13 @@ private: public: AggregateFunctionSort(const AggregateFunctionPtr& nested_func, const DataTypes& arguments, - const SortDescription& sort_desc) + const SortDescription& sort_desc, const RuntimeState* state) : IAggregateFunctionDataHelper( arguments, nested_func->get_parameters()), _nested_func(nested_func), _arguments(arguments), - _sort_desc(sort_desc) { + _sort_desc(sort_desc), + _state(state) { for (const auto& type : _arguments) { _block.insert({type, ""}); } @@ -132,7 +127,7 @@ public: } void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - this->data(place).serialize(buf); + this->data(place).serialize(_state, buf); } void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, @@ -179,5 +174,6 @@ public: AggregateFunctionPtr transform_to_sort_agg_function(const AggregateFunctionPtr& nested_function, const DataTypes& arguments, - const SortDescription& sort_desc); + const SortDescription& sort_desc, + RuntimeState* state); } // namespace doris::vectorized diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 80acd85fc8..c74bf45a8f 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -23,7 +23,7 @@ #include #include -#include "agent/heartbeat_server.h" +#include "agent/be_exec_version_manager.h" #include "common/status.h" #include "runtime/descriptors.h" #include "runtime/row_batch.h" @@ -65,7 +65,8 @@ Block::Block(const std::vector& slots, size_t block_size) { } Block::Block(const PBlock& pblock) { - CHECK(HeartbeatServer::check_be_exec_version(pblock.be_exec_version())); + int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0; + CHECK(BeExecVersionManager::check_be_exec_version(be_exec_version)); const char* buf = nullptr; std::string compression_scratch; @@ -692,11 +693,11 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee return Status::OK(); } -Status Block::serialize(PBlock* pblock, +Status Block::serialize(int be_exec_version, PBlock* pblock, /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes, size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data) const { - pblock->set_be_exec_version(HeartbeatServer::be_exec_version); + pblock->set_be_exec_version(be_exec_version); // calc uncompressed size for allocation size_t content_uncompressed_size = 0; diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index faf668ae49..6565e9b9d2 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -268,8 +268,8 @@ public: } // serialize block to PBlock - Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, - segment_v2::CompressionTypePB compression_type, + Status serialize(int be_exec_version, PBlock* pblock, size_t* uncompressed_bytes, + size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data = false) const; // serialize block to PRowbatch diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp index 6ba96eb4f0..1a8d400ab4 100644 --- a/be/src/vec/exprs/vbloom_predicate.cpp +++ b/be/src/vec/exprs/vbloom_predicate.cpp @@ -17,9 +17,6 @@ #include "vec/exprs/vbloom_predicate.h" -#include -#include - #include "common/status.h" #include "vec/data_types/data_type_nullable.h" @@ -42,6 +39,8 @@ Status VBloomPredicate::prepare(RuntimeState* state, const RowDescriptor& desc, auto column = child->data_type()->create_column(); argument_template.emplace_back(std::move(column), child->data_type(), child->expr_name()); } + + _be_exec_version = state->be_exec_version(); return Status::OK(); } @@ -79,7 +78,7 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result const StringValue v(ele.data, ele.size); ptr[i] = _filter->find(reinterpret_cast(&v)); } - } else if (type.is_int_or_uint() || type.is_float()) { + } else if (_be_exec_version > 0 && (type.is_int_or_uint() || type.is_float())) { if (argument_column->is_nullable()) { auto column_nested = reinterpret_cast(argument_column.get()) ->get_nested_column_ptr(); diff --git a/be/src/vec/exprs/vbloom_predicate.h b/be/src/vec/exprs/vbloom_predicate.h index ad0f15820e..dd1218bb63 100644 --- a/be/src/vec/exprs/vbloom_predicate.h +++ b/be/src/vec/exprs/vbloom_predicate.h @@ -24,7 +24,7 @@ namespace doris::vectorized { class VBloomPredicate final : public VExpr { public: VBloomPredicate(const TExprNode& node); - ~VBloomPredicate() = default; + ~VBloomPredicate() override = default; doris::Status execute(VExprContext* context, doris::vectorized::Block* block, int* result_column_id) override; doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc, @@ -44,5 +44,6 @@ public: private: std::shared_ptr _filter; std::string _expr_name; + int _be_exec_version; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 28433f7b85..a326c2348d 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -24,10 +24,8 @@ #include "vec/aggregate_functions/aggregate_function_rpc.h" #include "vec/aggregate_functions/aggregate_function_simple_factory.h" #include "vec/aggregate_functions/aggregate_function_sort.h" -#include "vec/columns/column_nullable.h" #include "vec/core/materialize_block.h" #include "vec/data_types/data_type_factory.hpp" -#include "vec/data_types/data_type_nullable.h" #include "vec/exprs/vexpr.h" namespace doris::vectorized { @@ -123,7 +121,7 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, M if (!_sort_description.empty()) { _function = transform_to_sort_agg_function(_function, _argument_types_with_sort, - _sort_description); + _sort_description, state); } _expr_name = fmt::format("{}({})", _fn.name.function_name, child_expr_name); return Status::OK(); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index e92929d0d9..0ca087ea59 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -600,8 +600,9 @@ Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_rece SCOPED_TIMER(_serialize_batch_timer); dest->Clear(); size_t uncompressed_bytes = 0, compressed_bytes = 0; - RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, - _compression_type, _transfer_large_data_by_brpc)); + RETURN_IF_ERROR(src->serialize(_state->be_exec_version(), dest, &uncompressed_bytes, + &compressed_bytes, _compression_type, + _transfer_large_data_by_brpc)); COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers); COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); COUNTER_UPDATE(_compress_timer, src->get_compress_time()); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 3f4dfb5e72..2a1bd946b6 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -269,7 +269,8 @@ void VNodeChannel::try_send_block(RuntimeState* state) { if (block.rows() > 0) { SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); size_t uncompressed_bytes = 0, compressed_bytes = 0; - Status st = block.serialize(request.mutable_block(), &uncompressed_bytes, &compressed_bytes, + Status st = block.serialize(state->be_exec_version(), request.mutable_block(), + &uncompressed_bytes, &compressed_bytes, state->fragement_transmission_compression_type(), _parent->_transfer_large_data_by_brpc); if (!st.ok()) { diff --git a/be/test/olap/bloom_filter_column_predicate_test.cpp b/be/test/olap/bloom_filter_column_predicate_test.cpp index cc98c90698..2db411c654 100644 --- a/be/test/olap/bloom_filter_column_predicate_test.cpp +++ b/be/test/olap/bloom_filter_column_predicate_test.cpp @@ -19,17 +19,15 @@ #include #include +#include "agent/be_exec_version_manager.h" #include "exprs/create_predicate_function.h" #include "olap/bloom_filter_predicate.h" #include "olap/column_predicate.h" -#include "olap/field.h" #include "olap/row_block2.h" #include "runtime/mem_pool.h" -#include "runtime/string_value.hpp" #include "vec/columns/column_nullable.h" #include "vec/columns/columns_number.h" #include "vec/columns/predicate_column.h" -#include "vec/core/block.h" using namespace doris::vectorized; @@ -90,7 +88,7 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) { value = 6.1; bloom_filter->insert(reinterpret_cast(&value)); ColumnPredicate* pred = BloomFilterColumnPredicateFactory::create_column_predicate( - 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT); + 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT, BeExecVersionManager::get_newest_version()); // for ColumnBlock no null init_row_block(tablet_schema, size); @@ -145,7 +143,7 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN_VEC) { bloom_filter->insert_fixed_len((char*)values, offsets, 3); ColumnPredicate* pred = BloomFilterColumnPredicateFactory::create_column_predicate( - 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT); + 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT, BeExecVersionManager::get_newest_version()); auto* col_data = reinterpret_cast(_mem_pool->allocate(size * sizeof(float))); // for vectorized::Block no null diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 5f222f5327..26fe4dfcf1 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -23,6 +23,7 @@ #include #include +#include "agent/be_exec_version_manager.h" #include "exec/schema_scanner.h" #include "gen_cpp/data.pb.h" #include "runtime/row_batch.h" @@ -186,7 +187,8 @@ void block_to_pb( segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; - Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes, compression_type); + Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, + &uncompressed_bytes, &compressed_bytes, compression_type); EXPECT_TRUE(st.ok()); EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); EXPECT_EQ(compressed_bytes, pblock->column_values().size()); diff --git a/be/test/vec/core/column_complex_test.cpp b/be/test/vec/core/column_complex_test.cpp index ae04b0725e..87e5998fa3 100644 --- a/be/test/vec/core/column_complex_test.cpp +++ b/be/test/vec/core/column_complex_test.cpp @@ -22,6 +22,7 @@ #include #include +#include "agent/be_exec_version_manager.h" #include "agent/heartbeat_server.h" #include "vec/core/block.h" #include "vec/data_types/data_type_bitmap.h" @@ -66,14 +67,15 @@ public: void check_serialize_and_deserialize(MutableColumnPtr& col) { auto column = assert_cast(col.get()); auto size = _bitmap_type.get_uncompressed_serialized_bytes( - *column, HeartbeatServer::max_be_exec_version); + *column, BeExecVersionManager::get_newest_version()); std::unique_ptr buf = std::make_unique(size); - auto result = - _bitmap_type.serialize(*column, buf.get(), HeartbeatServer::max_be_exec_version); + auto result = _bitmap_type.serialize(*column, buf.get(), + BeExecVersionManager::get_newest_version()); ASSERT_EQ(result, buf.get() + size); auto column2 = _bitmap_type.create_column(); - _bitmap_type.deserialize(buf.get(), column2.get(), HeartbeatServer::max_be_exec_version); + _bitmap_type.deserialize(buf.get(), column2.get(), + BeExecVersionManager::get_newest_version()); check_bitmap_column(*column, *column2.get()); } diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index f3a591bcd8..66ae4c75d7 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2217,15 +2217,15 @@ Specifically, for example, there are 2 BEs in the cluster, one of which can supp The default value is `max_be_exec_version`. If there are special needs, we can manually set the format version to lower, but it should not be lower than `min_be_exec_version`. -Note that we should always keep the value of this variable between `HeartbeatServer::min_be_exec_version` and `HeartbeatServer::max_be_exec_version` for all BEs. (That is to say, if a cluster that has completed the update needs to be downgraded, it should ensure the order of downgrading FE and then downgrading BE, or manually lower the variable in the settings and downgrade BE) +Note that we should always keep the value of this variable between `BeExecVersionManager::min_be_exec_version` and `BeExecVersionManager::max_be_exec_version` for all BEs. (That is to say, if a cluster that has completed the update needs to be downgraded, it should ensure the order of downgrading FE and then downgrading BE, or manually lower the variable in the settings and downgrade BE) ### `max_be_exec_version` -The latest data version currently supported, cannot be modified, and should be consistent with the `HeartbeatServer::max_be_exec_version` in the BE of the matching version. +The latest data version currently supported, cannot be modified, and should be consistent with the `BeExecVersionManager::max_be_exec_version` in the BE of the matching version. ### `min_be_exec_version` -The oldest data version currently supported, which cannot be modified, should be consistent with the `HeartbeatServer::min_be_exec_version` in the BE of the matching version. +The oldest data version currently supported, which cannot be modified, should be consistent with the `BeExecVersionManager::min_be_exec_version` in the BE of the matching version. ### `max_query_profile_num` diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 69dfbe325b..302c5fbdbe 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2272,15 +2272,15 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 默认值为`max_be_exec_version`,如果有特殊需要,我们可以手动设置将格式版本降低,但不应低于`min_be_exec_version`。 -需要注意的是,我们应该始终保持该变量的值处于**所有**BE的`HeartbeatServer::min_data_version`和`HeartbeatServer::max_data_version`之间。(也就是说如果一个已经完成更新的集群如果需要降级,应该保证先降级FE再降级BE的顺序,或者手动在设置中将该变量调低再降级BE) +需要注意的是,我们应该始终保持该变量的值处于**所有**BE的`BeExecVersionManager::min_be_exec_version`和`BeExecVersionManager::max_be_exec_version`之间。(也就是说如果一个已经完成更新的集群如果需要降级,应该保证先降级FE再降级BE的顺序,或者手动在设置中将该变量调低再降级BE) ### `max_be_exec_version` -目前支持的最新数据版本,不可修改,应与配套版本的BE中的`HeartbeatServer::max_data_version`一致。 +目前支持的最新数据版本,不可修改,应与配套版本的BE中的`BeExecVersionManager::max_be_exec_version`一致。 ### `min_be_exec_version` -目前支持的最旧数据版本,不可修改,应与配套版本的BE中的`HeartbeatServer::min_data_version`一致。 +目前支持的最旧数据版本,不可修改,应与配套版本的BE中的`BeExecVersionManager::min_be_exec_version`一致。 ### `max_query_profile_num` diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index c47699142b..d8d801d057 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -250,6 +250,7 @@ public class StreamLoadPlanner { // for stream load, we use exec_mem_limit to limit the memory usage of load channel. queryOptions.setLoadMemLimit(taskInfo.getMemLimit()); queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load); + queryOptions.setBeExecVersion(Config.be_exec_version); params.setQueryOptions(queryOptions); TQueryGlobals queryGlobals = new TQueryGlobals(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7ac58111aa..8b23940ff4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -313,6 +313,7 @@ public class Coordinator { private void initQueryOptions(ConnectContext context) { this.queryOptions = context.getSessionVariable().toThrift(); this.queryOptions.setEnableVectorizedEngine(VectorizedUtil.isVectorized()); + this.queryOptions.setBeExecVersion(Config.be_exec_version); } public long getJobId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 95d5d501ef..d91581b02c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; @@ -1170,6 +1171,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setIsReportSuccess(enableProfile); tResult.setCodegenLevel(codegenLevel); tResult.setEnableVectorizedEngine(enableVectorizedEngine); + tResult.setBeExecVersion(Config.be_exec_version); tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary); tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 3b35dc7864..234ebf3eec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -88,7 +88,6 @@ public class HeartbeatMgr extends MasterDaemon { tMasterInfo.setHttpPort(Config.http_port); long flags = heartbeatFlags.getHeartbeatFlags(); tMasterInfo.setHeartbeatFlags(flags); - tMasterInfo.setBeExecVersion(Config.be_exec_version); masterInfo.set(tMasterInfo); } diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 45e3e1bf53..9fe2f66d4c 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -33,7 +33,6 @@ struct TMasterInfo { 6: optional Types.TPort http_port 7: optional i64 heartbeat_flags 8: optional i64 backend_id - 9: optional i32 be_exec_version } struct TBackendInfo { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 6464f50a75..d7b794b95d 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -177,6 +177,8 @@ struct TQueryOptions { 49: optional bool skip_delete_predicate = false 50: optional bool enable_new_shuffle_hash_method + + 51: optional i32 be_exec_version = 0 }