[Segment V2] Add a control framework between FE and BE through heartbeat #2247 (#2364)

The control framework is implemented through heartbeat message. Use uint64_t as flags to control different functions. 
Now add a flag to set the default rowset type to beta.
This commit is contained in:
kangpinghuang
2019-12-12 12:18:32 +08:00
committed by Mingyu Chen
parent 7f2144e7e5
commit c07f37d78c
18 changed files with 214 additions and 11 deletions

View File

@ -29,6 +29,7 @@
#include "olap/utils.h"
#include "service/backend_options.h"
#include "util/thrift_server.h"
#include "runtime/heartbeat_flags.h"
using std::fstream;
using std::nothrow;
@ -147,6 +148,11 @@ Status HeartbeatServer::_heartbeat(
_master_info->__set_http_port(master_info.http_port);
}
if (master_info.__isset.heartbeat_flags) {
HeartbeatFlags* heartbeat_flags = ExecEnv::GetInstance()->heartbeat_flags();
heartbeat_flags->update(master_info.heartbeat_flags);
}
if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_olap_engine->report_notify(true);

View File

@ -36,6 +36,8 @@
#include "runtime/mem_tracker.h"
#include "common/resource_tls.h"
#include "agent/cgroups_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/heartbeat_flags.h"
using std::deque;
using std::list;
@ -977,7 +979,7 @@ bool SchemaChangeWithSorting::process(
_temp_delta_versions.second),
rowset_reader->version_hash(),
new_tablet,
rowset_reader->rowset()->rowset_meta()->rowset_type(),
StorageEngine::instance()->default_rowset_type(),
&rowset)) {
LOG(WARNING) << "failed to sorting internally.";
result = false;
@ -1034,7 +1036,7 @@ bool SchemaChangeWithSorting::process(
Version(_temp_delta_versions.second, _temp_delta_versions.second),
rowset_reader->version_hash(),
new_tablet,
rowset_reader->rowset()->rowset_meta()->rowset_type(),
StorageEngine::instance()->default_rowset_type(),
&rowset)) {
LOG(WARNING) << "failed to sorting internally.";
result = false;
@ -1471,7 +1473,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
writer_context.tablet_id = new_tablet->tablet_id();
writer_context.partition_id = (*base_rowset)->partition_id();
writer_context.tablet_schema_hash = new_tablet->schema_hash();
writer_context.rowset_type = (*base_rowset)->rowset_meta()->rowset_type();
writer_context.rowset_type = StorageEngine::instance()->default_rowset_type();
writer_context.rowset_path_prefix = new_tablet->tablet_path();
writer_context.tablet_schema = &(new_tablet->tablet_schema());
writer_context.rowset_state = PREPARED;
@ -1697,7 +1699,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
writer_context.partition_id = new_tablet->partition_id();
writer_context.tablet_schema_hash = new_tablet->schema_hash();
// linked schema change can't change rowset type, therefore we preserve rowset type in schema change now
writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type();
writer_context.rowset_type = StorageEngine::instance()->default_rowset_type();
writer_context.rowset_path_prefix = new_tablet->tablet_path();
writer_context.tablet_schema = &(new_tablet->tablet_schema());
writer_context.rowset_state = VISIBLE;

View File

@ -118,7 +118,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)),
_memtable_flush_executor(nullptr),
_default_rowset_type(ALPHA_ROWSET),
_compaction_rowset_type(ALPHA_ROWSET) {
_compaction_rowset_type(ALPHA_ROWSET),
_heartbeat_flags(nullptr) {
if (_s_instance == nullptr) {
_s_instance = this;
}

View File

@ -48,6 +48,7 @@
#include "olap/txn_manager.h"
#include "olap/task/engine_task.h"
#include "olap/rowset/rowset_id_generator.h"
#include "runtime/heartbeat_flags.h"
namespace doris {
@ -198,9 +199,23 @@ public:
MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor; }
RowsetTypePB default_rowset_type() const { return _default_rowset_type; }
RowsetTypePB default_rowset_type() const {
if (_heartbeat_flags != nullptr && _heartbeat_flags->is_set_default_rowset_type_to_beta()) {
return BETA_ROWSET;
}
return _default_rowset_type;
}
RowsetTypePB compaction_rowset_type() const { return _compaction_rowset_type; }
RowsetTypePB compaction_rowset_type() const {
if (_heartbeat_flags != nullptr && _heartbeat_flags->is_set_default_rowset_type_to_beta()) {
return BETA_ROWSET;
}
return _compaction_rowset_type;
}
void set_heartbeat_flags(HeartbeatFlags* heartbeat_flags) {
_heartbeat_flags = heartbeat_flags;
}
private:
@ -363,6 +378,8 @@ private:
// used to control the the process of converting old data
RowsetTypePB _compaction_rowset_type;
HeartbeatFlags* _heartbeat_flags;
DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};

View File

@ -60,6 +60,7 @@ class FrontendServiceClient;
class TPaloBrokerServiceClient;
class TExtDataSourceServiceClient;
template<class T> class ClientCache;
class HeartbeatFlags;
// Execution environment for queries/plan fragments.
// Contains all required global structures, and handles to
@ -129,6 +130,7 @@ public:
StreamLoadExecutor* stream_load_executor() { return _stream_load_executor; }
RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; }
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
private:
Status _init(const std::vector<StorePath>& store_paths);
@ -178,6 +180,7 @@ private:
StreamLoadExecutor* _stream_load_executor = nullptr;
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
};

View File

@ -61,6 +61,7 @@
#include "gen_cpp/TPaloBrokerService.h"
#include "gen_cpp/TExtDataSourceService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "runtime/heartbeat_flags.h"
namespace doris {
@ -121,7 +122,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_init_mem_tracker();
RETURN_IF_ERROR(_load_channel_mgr->init(_mem_tracker->limit()));
_heartbeat_flags = new HeartbeatFlags();
return Status::OK();
}
@ -229,6 +230,7 @@ void ExecEnv::_destory() {
delete _stream_load_executor;
delete _routine_load_task_executor;
delete _external_scan_context_mgr;
delete _heartbeat_flags;
_metrics = nullptr;
}

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include "gen_cpp/HeartbeatService_constants.h"
namespace doris {
// This class is for parse control flags from heartbeat message
// between FE and BE.
class HeartbeatFlags {
public:
HeartbeatFlags(uint64_t origin_flags) : _flags(origin_flags) { }
HeartbeatFlags() : HeartbeatFlags(0) { }
void update(uint64_t flags) {
_flags = flags;
}
bool is_set_default_rowset_type_to_beta() {
return _flags & g_HeartbeatService_constants.IS_SET_DEFAULT_ROWSET_TO_BETA_BIT;
}
private:
std::atomic<uint64_t> _flags;
};
}

View File

@ -57,6 +57,7 @@
#include "common/resource_tls.h"
#include "util/thrift_rpc_helper.h"
#include "util/uid_util.h"
#include "runtime/heartbeat_flags.h"
static void help(const char*);
@ -166,6 +167,7 @@ int main(int argc, char** argv) {
auto exec_env = doris::ExecEnv::GetInstance();
doris::ExecEnv::init(exec_env, paths);
exec_env->set_storage_engine(engine);
engine->set_heartbeat_flags(exec_env->heartbeat_flags());
doris::ThriftRpcHelper::setup(exec_env);
doris::ThriftServer* be_server = nullptr;

View File

@ -58,6 +58,7 @@ ADD_BE_TEST(user_function_cache_test)
ADD_BE_TEST(kafka_consumer_pipe_test)
ADD_BE_TEST(routine_load_task_executor_test)
ADD_BE_TEST(small_file_mgr_test)
ADD_BE_TEST(heartbeat_flags_test)
ADD_BE_TEST(result_queue_mgr_test)
ADD_BE_TEST(memory_scratch_sink_test)

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include "runtime/heartbeat_flags.h"
namespace doris {
class HeartbeatFlagsTest : public testing::Test {
private:
HeartbeatFlags _flags;
};
TEST_F(HeartbeatFlagsTest, normal) {
ASSERT_FALSE(_flags.is_set_default_rowset_type_to_beta());
_flags.update(1);
ASSERT_TRUE(_flags.is_set_default_rowset_type_to_beta());
_flags.update(2);
ASSERT_FALSE(_flags.is_set_default_rowset_type_to_beta());
}
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -306,3 +306,7 @@ SET forward_to_master = concat('tr', 'u', 'e');
* `wait_timeout`
用于设置空闲连接的连接时长。当一个空闲连接在该时长内与 Doris 没有任何交互,则 Doris 会主动断开这个链接。默认为 8 小时,单位为秒。
* `default_rowset_type`
用于设置计算节点存储引擎默认的存储格式。当前支持的存储格式包括:alpha/beta。

View File

@ -307,3 +307,7 @@ SET forward_to_master = concat('tr', 'u', 'e');
* `wait_timeout`
The length of the connection used to set up an idle connection. When an idle connection does not interact with Doris for that length of time, Doris will actively disconnect the link. The default is 8 hours, in seconds.
* `default_rowset_type`
Used for setting the default storage format of Backends storage engine. Valid options: alpha/beta

View File

@ -29,12 +29,10 @@ import org.apache.doris.mysql.privilege.UserResource;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.doris.system.HeartbeatFlags;
// change one variable.
public class SetVar {
private static final Logger LOG = LogManager.getLogger(SetVar.class);
private String variable;
private Expr value;
@ -135,6 +133,14 @@ public class SetVar {
throw new AnalysisException("Invalid resource group, now we support {low, normal, high}.");
}
}
if (variable.equalsIgnoreCase(SessionVariable.DEFAULT_ROWSET_TYPE)) {
if (type != SetType.GLOBAL) {
throw new AnalysisException("default_rowset_type must be global. use set global");
}
if (result != null && !HeartbeatFlags.isValidRowsetType(result.getStringValue())) {
throw new AnalysisException("Invalid rowset type, now we support {alpha, beta}.");
}
}
}
public String toSql() {

View File

@ -88,6 +88,7 @@ public class SessionVariable implements Serializable, Writable {
* Using only the exec_mem_limit variable does not make a good distinction of memory limit between the two parts.
*/
public static final String LOAD_MEM_LIMIT = "load_mem_limit";
public static final String DEFAULT_ROWSET_TYPE = "default_rowset_type";
// max memory used on every backend.
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
@ -213,6 +214,10 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = LOAD_MEM_LIMIT)
private long loadMemLimit = 0L;
// the default rowset type flag which will be passed to Backends througth heartbeat
@VariableMgr.VarAttr(name = DEFAULT_ROWSET_TYPE)
public static String defaultRowsetType = "alpha";
public long getMaxExecMemByte() {
return maxExecMemByte;
}

View File

@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
import org.apache.doris.analysis.SysVariableDesc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.thrift.HeartbeatServiceConstants;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
// This class is to manage the control flag in heartbeat message between FE and BE.
// The control flag is for FE to control some behaviors of BE.
// Now the flag is represented by 64-bit long type, each bit can be used to control
// one behavior. The first bit is used for set default rowset type to beta flag.
public class HeartbeatFlags {
private static final Logger LOG = LogManager.getLogger(HeartbeatFlags.class);
public static boolean isValidRowsetType(String rowsetType) {
return rowsetType.equalsIgnoreCase("alpha") || rowsetType.equalsIgnoreCase("beta");
}
public long getHeartbeatFlags () {
long heartbeatFlags = 0;
try {
String defaultRowsetType = VariableMgr.getValue(null, new SysVariableDesc(SessionVariable.DEFAULT_ROWSET_TYPE));
if (defaultRowsetType.equalsIgnoreCase("beta")) {
heartbeatFlags |= HeartbeatServiceConstants.IS_SET_DEFAULT_ROWSET_TO_BETA_BIT;
}
} catch (AnalysisException e) {
LOG.warn("parse default rowset type failed.error:{}", e);
}
return heartbeatFlags;
}
}

View File

@ -66,6 +66,7 @@ public class HeartbeatMgr extends MasterDaemon {
private final ExecutorService executor;
private SystemInfoService nodeMgr;
private HeartbeatFlags heartbeatFlags;
private static volatile AtomicReference<TMasterInfo> masterInfo = new AtomicReference<TMasterInfo>();
@ -73,6 +74,7 @@ public class HeartbeatMgr extends MasterDaemon {
super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000);
this.nodeMgr = nodeMgr;
this.executor = Executors.newCachedThreadPool();
this.heartbeatFlags = new HeartbeatFlags();
}
public void setMaster(int clusterId, String token, long epoch) {
@ -80,6 +82,8 @@ public class HeartbeatMgr extends MasterDaemon {
new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port), clusterId, epoch);
tMasterInfo.setToken(token);
tMasterInfo.setHttp_port(Config.http_port);
long flags = heartbeatFlags.getHeartbeatFlags();
tMasterInfo.setHeartbeat_flags(flags);
masterInfo.set(tMasterInfo);
}
@ -216,6 +220,8 @@ public class HeartbeatMgr extends MasterDaemon {
TMasterInfo copiedMasterInfo = new TMasterInfo(masterInfo.get());
copiedMasterInfo.setBackend_ip(backend.getHost());
long flags = heartbeatFlags.getHeartbeatFlags();
copiedMasterInfo.setHeartbeat_flags(flags);
THeartbeatResult result = client.heartbeat(copiedMasterInfo);
ok = true;

View File

@ -18,6 +18,8 @@ namespace java org.apache.doris.thrift
include "Status.thrift"
include "Types.thrift"
const i64 IS_SET_DEFAULT_ROWSET_TO_BETA_BIT = 0x01;
struct TMasterInfo {
1: required Types.TNetworkAddress network_address
2: required Types.TClusterId cluster_id
@ -25,6 +27,7 @@ struct TMasterInfo {
4: optional string token
5: optional string backend_ip
6: optional Types.TPort http_port
7: optional i64 heartbeat_flags
}
struct TBackendInfo {

View File

@ -292,6 +292,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/selection_vector_test
# Running routine load test
${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test
${DORIS_TEST_BINARY_DIR}/runtime/routine_load_task_executor_test
${DORIS_TEST_BINARY_DIR}/runtime/heartbeat_flags_test
# Running agent unittest
# Prepare agent testdata