diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 5e04c99cb8..82ab98d55d 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -29,6 +29,7 @@ #include "olap/utils.h" #include "service/backend_options.h" #include "util/thrift_server.h" +#include "runtime/heartbeat_flags.h" using std::fstream; using std::nothrow; @@ -147,6 +148,11 @@ Status HeartbeatServer::_heartbeat( _master_info->__set_http_port(master_info.http_port); } + if (master_info.__isset.heartbeat_flags) { + HeartbeatFlags* heartbeat_flags = ExecEnv::GetInstance()->heartbeat_flags(); + heartbeat_flags->update(master_info.heartbeat_flags); + } + if (need_report) { LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; _olap_engine->report_notify(true); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index b9f8fc7c89..fb806d3dc7 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -36,6 +36,8 @@ #include "runtime/mem_tracker.h" #include "common/resource_tls.h" #include "agent/cgroups_mgr.h" +#include "runtime/exec_env.h" +#include "runtime/heartbeat_flags.h" using std::deque; using std::list; @@ -977,7 +979,7 @@ bool SchemaChangeWithSorting::process( _temp_delta_versions.second), rowset_reader->version_hash(), new_tablet, - rowset_reader->rowset()->rowset_meta()->rowset_type(), + StorageEngine::instance()->default_rowset_type(), &rowset)) { LOG(WARNING) << "failed to sorting internally."; result = false; @@ -1034,7 +1036,7 @@ bool SchemaChangeWithSorting::process( Version(_temp_delta_versions.second, _temp_delta_versions.second), rowset_reader->version_hash(), new_tablet, - rowset_reader->rowset()->rowset_meta()->rowset_type(), + StorageEngine::instance()->default_rowset_type(), &rowset)) { LOG(WARNING) << "failed to sorting internally."; result = false; @@ -1471,7 +1473,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert( writer_context.tablet_id = new_tablet->tablet_id(); writer_context.partition_id = (*base_rowset)->partition_id(); writer_context.tablet_schema_hash = new_tablet->schema_hash(); - writer_context.rowset_type = (*base_rowset)->rowset_meta()->rowset_type(); + writer_context.rowset_type = StorageEngine::instance()->default_rowset_type(); writer_context.rowset_path_prefix = new_tablet->tablet_path(); writer_context.tablet_schema = &(new_tablet->tablet_schema()); writer_context.rowset_state = PREPARED; @@ -1697,7 +1699,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa writer_context.partition_id = new_tablet->partition_id(); writer_context.tablet_schema_hash = new_tablet->schema_hash(); // linked schema change can't change rowset type, therefore we preserve rowset type in schema change now - writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type(); + writer_context.rowset_type = StorageEngine::instance()->default_rowset_type(); writer_context.rowset_path_prefix = new_tablet->tablet_path(); writer_context.tablet_schema = &(new_tablet->tablet_schema()); writer_context.rowset_state = VISIBLE; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 2f89e72131..585674fe46 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -118,7 +118,8 @@ StorageEngine::StorageEngine(const EngineOptions& options) _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), _memtable_flush_executor(nullptr), _default_rowset_type(ALPHA_ROWSET), - _compaction_rowset_type(ALPHA_ROWSET) { + _compaction_rowset_type(ALPHA_ROWSET), + _heartbeat_flags(nullptr) { if (_s_instance == nullptr) { _s_instance = this; } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index d6289a79ea..a92c47ff94 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -48,6 +48,7 @@ #include "olap/txn_manager.h" #include "olap/task/engine_task.h" #include "olap/rowset/rowset_id_generator.h" +#include "runtime/heartbeat_flags.h" namespace doris { @@ -198,9 +199,23 @@ public: MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor; } - RowsetTypePB default_rowset_type() const { return _default_rowset_type; } + RowsetTypePB default_rowset_type() const { + if (_heartbeat_flags != nullptr && _heartbeat_flags->is_set_default_rowset_type_to_beta()) { + return BETA_ROWSET; + } + return _default_rowset_type; + } - RowsetTypePB compaction_rowset_type() const { return _compaction_rowset_type; } + RowsetTypePB compaction_rowset_type() const { + if (_heartbeat_flags != nullptr && _heartbeat_flags->is_set_default_rowset_type_to_beta()) { + return BETA_ROWSET; + } + return _compaction_rowset_type; + } + + void set_heartbeat_flags(HeartbeatFlags* heartbeat_flags) { + _heartbeat_flags = heartbeat_flags; + } private: @@ -363,6 +378,8 @@ private: // used to control the the process of converting old data RowsetTypePB _compaction_rowset_type; + HeartbeatFlags* _heartbeat_flags; + DISALLOW_COPY_AND_ASSIGN(StorageEngine); }; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 0d6d2f796f..125f039aec 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -60,6 +60,7 @@ class FrontendServiceClient; class TPaloBrokerServiceClient; class TExtDataSourceServiceClient; template class ClientCache; +class HeartbeatFlags; // Execution environment for queries/plan fragments. // Contains all required global structures, and handles to @@ -129,6 +130,7 @@ public: StreamLoadExecutor* stream_load_executor() { return _stream_load_executor; } RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; } + HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } private: Status _init(const std::vector& store_paths); @@ -178,6 +180,7 @@ private: StreamLoadExecutor* _stream_load_executor = nullptr; RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr; SmallFileMgr* _small_file_mgr = nullptr; + HeartbeatFlags* _heartbeat_flags = nullptr; }; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a186299471..8b4a28c78b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -61,6 +61,7 @@ #include "gen_cpp/TPaloBrokerService.h" #include "gen_cpp/TExtDataSourceService.h" #include "gen_cpp/HeartbeatService_types.h" +#include "runtime/heartbeat_flags.h" namespace doris { @@ -121,7 +122,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { _init_mem_tracker(); RETURN_IF_ERROR(_load_channel_mgr->init(_mem_tracker->limit())); - + _heartbeat_flags = new HeartbeatFlags(); return Status::OK(); } @@ -229,6 +230,7 @@ void ExecEnv::_destory() { delete _stream_load_executor; delete _routine_load_task_executor; delete _external_scan_context_mgr; + delete _heartbeat_flags; _metrics = nullptr; } diff --git a/be/src/runtime/heartbeat_flags.h b/be/src/runtime/heartbeat_flags.h new file mode 100644 index 0000000000..c89e11e4c9 --- /dev/null +++ b/be/src/runtime/heartbeat_flags.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "gen_cpp/HeartbeatService_constants.h" + +namespace doris { + +// This class is for parse control flags from heartbeat message +// between FE and BE. +class HeartbeatFlags { +public: + HeartbeatFlags(uint64_t origin_flags) : _flags(origin_flags) { } + + HeartbeatFlags() : HeartbeatFlags(0) { } + + void update(uint64_t flags) { + _flags = flags; + } + + bool is_set_default_rowset_type_to_beta() { + return _flags & g_HeartbeatService_constants.IS_SET_DEFAULT_ROWSET_TO_BETA_BIT; + } + +private: + std::atomic _flags; +}; + +} diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 1240ef3c34..b9f1d5765e 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -57,6 +57,7 @@ #include "common/resource_tls.h" #include "util/thrift_rpc_helper.h" #include "util/uid_util.h" +#include "runtime/heartbeat_flags.h" static void help(const char*); @@ -166,6 +167,7 @@ int main(int argc, char** argv) { auto exec_env = doris::ExecEnv::GetInstance(); doris::ExecEnv::init(exec_env, paths); exec_env->set_storage_engine(engine); + engine->set_heartbeat_flags(exec_env->heartbeat_flags()); doris::ThriftRpcHelper::setup(exec_env); doris::ThriftServer* be_server = nullptr; diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index 7bb59687fc..21fbb36567 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -58,6 +58,7 @@ ADD_BE_TEST(user_function_cache_test) ADD_BE_TEST(kafka_consumer_pipe_test) ADD_BE_TEST(routine_load_task_executor_test) ADD_BE_TEST(small_file_mgr_test) +ADD_BE_TEST(heartbeat_flags_test) ADD_BE_TEST(result_queue_mgr_test) ADD_BE_TEST(memory_scratch_sink_test) diff --git a/be/test/runtime/heartbeat_flags_test.cpp b/be/test/runtime/heartbeat_flags_test.cpp new file mode 100644 index 0000000000..19a4c17f69 --- /dev/null +++ b/be/test/runtime/heartbeat_flags_test.cpp @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "runtime/heartbeat_flags.h" + +namespace doris { + +class HeartbeatFlagsTest : public testing::Test { +private: + HeartbeatFlags _flags; +}; + +TEST_F(HeartbeatFlagsTest, normal) { + ASSERT_FALSE(_flags.is_set_default_rowset_type_to_beta()); + _flags.update(1); + ASSERT_TRUE(_flags.is_set_default_rowset_type_to_beta()); + _flags.update(2); + ASSERT_FALSE(_flags.is_set_default_rowset_type_to_beta()); +} + +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/docs/documentation/cn/administrator-guide/variables.md b/docs/documentation/cn/administrator-guide/variables.md index 5b25a53871..efa2bd5a85 100644 --- a/docs/documentation/cn/administrator-guide/variables.md +++ b/docs/documentation/cn/administrator-guide/variables.md @@ -306,3 +306,7 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `wait_timeout` 用于设置空闲连接的连接时长。当一个空闲连接在该时长内与 Doris 没有任何交互,则 Doris 会主动断开这个链接。默认为 8 小时,单位为秒。 + +* `default_rowset_type` + + 用于设置计算节点存储引擎默认的存储格式。当前支持的存储格式包括:alpha/beta。 diff --git a/docs/documentation/en/administrator-guide/variables_EN.md b/docs/documentation/en/administrator-guide/variables_EN.md index 12dddfaf5a..48a2e9866c 100644 --- a/docs/documentation/en/administrator-guide/variables_EN.md +++ b/docs/documentation/en/administrator-guide/variables_EN.md @@ -307,3 +307,7 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `wait_timeout` The length of the connection used to set up an idle connection. When an idle connection does not interact with Doris for that length of time, Doris will actively disconnect the link. The default is 8 hours, in seconds. + +* `default_rowset_type` + + Used for setting the default storage format of Backends storage engine. Valid options: alpha/beta diff --git a/fe/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/src/main/java/org/apache/doris/analysis/SetVar.java index 15a09f02b5..615102aafe 100644 --- a/fe/src/main/java/org/apache/doris/analysis/SetVar.java +++ b/fe/src/main/java/org/apache/doris/analysis/SetVar.java @@ -29,12 +29,10 @@ import org.apache.doris.mysql.privilege.UserResource; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.SqlModeHelper; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.doris.system.HeartbeatFlags; // change one variable. public class SetVar { - private static final Logger LOG = LogManager.getLogger(SetVar.class); private String variable; private Expr value; @@ -135,6 +133,14 @@ public class SetVar { throw new AnalysisException("Invalid resource group, now we support {low, normal, high}."); } } + if (variable.equalsIgnoreCase(SessionVariable.DEFAULT_ROWSET_TYPE)) { + if (type != SetType.GLOBAL) { + throw new AnalysisException("default_rowset_type must be global. use set global"); + } + if (result != null && !HeartbeatFlags.isValidRowsetType(result.getStringValue())) { + throw new AnalysisException("Invalid rowset type, now we support {alpha, beta}."); + } + } } public String toSql() { diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 68ae2a10e1..a8f34aabf2 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -88,6 +88,7 @@ public class SessionVariable implements Serializable, Writable { * Using only the exec_mem_limit variable does not make a good distinction of memory limit between the two parts. */ public static final String LOAD_MEM_LIMIT = "load_mem_limit"; + public static final String DEFAULT_ROWSET_TYPE = "default_rowset_type"; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) @@ -213,6 +214,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = LOAD_MEM_LIMIT) private long loadMemLimit = 0L; + // the default rowset type flag which will be passed to Backends througth heartbeat + @VariableMgr.VarAttr(name = DEFAULT_ROWSET_TYPE) + public static String defaultRowsetType = "alpha"; + public long getMaxExecMemByte() { return maxExecMemByte; } diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java b/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java new file mode 100644 index 0000000000..b37f906f9d --- /dev/null +++ b/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.system; + +import org.apache.doris.analysis.SysVariableDesc; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.VariableMgr; +import org.apache.doris.thrift.HeartbeatServiceConstants; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +// This class is to manage the control flag in heartbeat message between FE and BE. +// The control flag is for FE to control some behaviors of BE. +// Now the flag is represented by 64-bit long type, each bit can be used to control +// one behavior. The first bit is used for set default rowset type to beta flag. +public class HeartbeatFlags { + private static final Logger LOG = LogManager.getLogger(HeartbeatFlags.class); + + public static boolean isValidRowsetType(String rowsetType) { + return rowsetType.equalsIgnoreCase("alpha") || rowsetType.equalsIgnoreCase("beta"); + } + + public long getHeartbeatFlags () { + long heartbeatFlags = 0; + try { + String defaultRowsetType = VariableMgr.getValue(null, new SysVariableDesc(SessionVariable.DEFAULT_ROWSET_TYPE)); + if (defaultRowsetType.equalsIgnoreCase("beta")) { + heartbeatFlags |= HeartbeatServiceConstants.IS_SET_DEFAULT_ROWSET_TO_BETA_BIT; + } + } catch (AnalysisException e) { + LOG.warn("parse default rowset type failed.error:{}", e); + } + + return heartbeatFlags; + } +} diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java index dc614c30dc..bc12f7ba48 100644 --- a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -66,6 +66,7 @@ public class HeartbeatMgr extends MasterDaemon { private final ExecutorService executor; private SystemInfoService nodeMgr; + private HeartbeatFlags heartbeatFlags; private static volatile AtomicReference masterInfo = new AtomicReference(); @@ -73,6 +74,7 @@ public class HeartbeatMgr extends MasterDaemon { super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000); this.nodeMgr = nodeMgr; this.executor = Executors.newCachedThreadPool(); + this.heartbeatFlags = new HeartbeatFlags(); } public void setMaster(int clusterId, String token, long epoch) { @@ -80,6 +82,8 @@ public class HeartbeatMgr extends MasterDaemon { new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port), clusterId, epoch); tMasterInfo.setToken(token); tMasterInfo.setHttp_port(Config.http_port); + long flags = heartbeatFlags.getHeartbeatFlags(); + tMasterInfo.setHeartbeat_flags(flags); masterInfo.set(tMasterInfo); } @@ -216,6 +220,8 @@ public class HeartbeatMgr extends MasterDaemon { TMasterInfo copiedMasterInfo = new TMasterInfo(masterInfo.get()); copiedMasterInfo.setBackend_ip(backend.getHost()); + long flags = heartbeatFlags.getHeartbeatFlags(); + copiedMasterInfo.setHeartbeat_flags(flags); THeartbeatResult result = client.heartbeat(copiedMasterInfo); ok = true; diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 268052d050..bf590e7a48 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -18,6 +18,8 @@ namespace java org.apache.doris.thrift include "Status.thrift" include "Types.thrift" +const i64 IS_SET_DEFAULT_ROWSET_TO_BETA_BIT = 0x01; + struct TMasterInfo { 1: required Types.TNetworkAddress network_address 2: required Types.TClusterId cluster_id @@ -25,6 +27,7 @@ struct TMasterInfo { 4: optional string token 5: optional string backend_ip 6: optional Types.TPort http_port + 7: optional i64 heartbeat_flags } struct TBackendInfo { diff --git a/run-ut.sh b/run-ut.sh index 90f8edde44..4fdb1191f7 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -292,6 +292,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/selection_vector_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test ${DORIS_TEST_BINARY_DIR}/runtime/routine_load_task_executor_test +${DORIS_TEST_BINARY_DIR}/runtime/heartbeat_flags_test # Running agent unittest # Prepare agent testdata