server id persistence

This commit is contained in:
linqiucen 2023-05-12 12:41:17 +00:00 committed by ob-robot
parent e806dd89c3
commit ecdc3d2fde
24 changed files with 177 additions and 173 deletions

View File

@ -1827,6 +1827,12 @@ OB_INLINE bool is_valid_idx(const int64_t idx)
return (0 <= idx);
}
// check whether a server_id is valid
OB_INLINE bool is_valid_server_id(const uint64_t server_id)
{
return (0 < server_id) && (OB_INVALID_ID != server_id);
}
//check whether an tenant_id is valid
OB_INLINE bool is_valid_tenant_id(const uint64_t tenant_id)
{

View File

@ -658,7 +658,7 @@ PCODE_DEF(OB_PRE_PROCESS_SERVER, 0x734)
//PCODE_DEF(OB_RECOVER_PG_FILE_REPLY, 0x737): not used on 4.0
PCODE_DEF(OB_RENEW_IN_ZONE_HB, 0x738)
//PCODE_DEF(OB_MOVE_REPLICA_CONTROL, 0x739) // not supported on 4.0
PCODE_DEF(OB_PRE_BOOTSTRAP_CREATE_SERVER_WORKING_DIR, 0x73A)
// PCODE_DEF(OB_PRE_BOOTSTRAP_CREATE_SERVER_WORKING_DIR, 0x73A)
//PCODE_DEF(OB_OBS_DISCONNECT_CLUSTER, 0x73B) // 4.0 not supported
PCODE_DEF(OB_DUMP_TX_DATA_MEMTABLE, 0x73C)
PCODE_DEF(OB_ADMIN_REFRESH_IO_CALIBRATION, 0x73D)

View File

@ -27,7 +27,6 @@
#include "observer/ob_server_schema_updater.h"
#include "observer/ob_server.h"
#include "observer/omt/ob_tenant_config_mgr.h"
#include "observer/ob_heartbeat_handler.h"
#include "common/ob_timeout_ctx.h"
#include "storage/slog/ob_storage_logger_manager.h"
@ -136,11 +135,61 @@ int ObHeartBeatProcess::init_lease_request(ObLeaseRequest &lease_request)
return ret;
}
void ObHeartBeatProcess::check_and_update_server_id_(const uint64_t server_id)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
} else if (OB_UNLIKELY(!is_valid_server_id(server_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid server_id", KR(ret), K(server_id));
} else {
// once server_id is confirmed, it cannnot be changed
// in 4.1, server_id persistance is not supported, observer can only get its server_id via heartbeat
// in 4.2, server_id is persisted when the server is added into the cluster
// in upgrade period 4.1 -> 4.2, we need to persist the server_id via heartbeat
const int64_t delay = 0;
const bool repeat = false;
if (0 == GCTX.server_id_) {
GCTX.server_id_ = server_id;
LOG_INFO("receive new server id in GCTX", K(server_id));
} else if (server_id != GCTX.server_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("GCTX.server_id_ is not the same as server_id in RS", KR(ret),
K(GCTX.server_id_), K(server_id));
}
if (OB_FAIL(ret)) {
} else if (0 == GCONF.server_id) {
GCONF.server_id = server_id;
LOG_INFO("receive new server id in GCONF", K(server_id));
if (OB_SUCCESS != (tmp_ret = TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, server_id_persist_task_, delay, repeat))) {
server_id_persist_task_.enable_need_retry_flag();
LOG_WARN("schedule server_id persist task failed", K(tmp_ret));
} else {
server_id_persist_task_.disable_need_retry_flag();
}
} else if (server_id != GCONF.server_id) {
ret = OB_ERR_UNEXPECTED;
uint64_t server_id_in_GCONF = GCONF.server_id;
LOG_ERROR("GCONF.server_id is not the same as server_id in RS", KR(ret),
K(server_id_in_GCONF), K(server_id));
}
if (server_id_persist_task_.is_need_retry()) {
if (OB_SUCCESS != (tmp_ret = TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, server_id_persist_task_, delay, repeat))) {
LOG_WARN("schedule server_id persist task failed", K(tmp_ret));
} else {
server_id_persist_task_.disable_need_retry_flag();
}
}
}
}
//pay attention to concurrency control
int ObHeartBeatProcess::do_heartbeat_event(const ObLeaseResponse &lease_response)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
@ -153,28 +202,9 @@ int ObHeartBeatProcess::do_heartbeat_event(const ObLeaseResponse &lease_response
LITERAL_K(ObLeaseResponse::LEASE_VERSION), KR(ret));
} else {
LOG_DEBUG("get lease_response", K(lease_response));
if (OB_INVALID_ID != lease_response.server_id_) {
if (GCTX.server_id_ != lease_response.server_id_) {
LOG_INFO("receive new server id",
"old_id", GCTX.server_id_,
"new_id", lease_response.server_id_);
GCTX.server_id_ = lease_response.server_id_;
GCONF.server_id = lease_response.server_id_;
const int64_t delay = 0;
const bool repeat = false;
if (OB_SUCCESS != (tmp_ret = TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, server_id_persist_task_, delay, repeat))) {
server_id_persist_task_.enable_need_retry_flag();
LOG_WARN("schedule server_id persist task failed", K(tmp_ret));
} else {
server_id_persist_task_.disable_need_retry_flag();
}
}
}
int tmp_ret = OB_SUCCESS;
(void) check_and_update_server_id_(lease_response.server_id_);
if (!ObHeartbeatHandler::is_rs_epoch_id_valid()) {
///// if the new heartbeat service has not started, this heartbeat is responsible for
//// update server_id_ and rs_server_status_
if (RSS_INVALID != lease_response.rs_server_status_) {
if (GCTX.rs_server_status_ != lease_response.rs_server_status_) {
LOG_INFO("receive new server status recorded in rs",
@ -195,13 +225,13 @@ int ObHeartBeatProcess::do_heartbeat_event(const ObLeaseResponse &lease_response
"refresh_schema_info", lease_response.refresh_schema_info_, K(schema_ret));
}
const int64_t delay = 0;
const bool repeat = false;
// while rootservice startup, lease_info_version may be set to 0.
if (lease_response.lease_info_version_ > 0) {
newest_lease_info_version_ = lease_response.lease_info_version_;
}
bool is_exist = false;
const int64_t delay = 0;
const bool repeat = false;
if (OB_FAIL(TG_TASK_EXIST(lib::TGDefIDs::ObHeartbeat, update_task_, is_exist))) {
LOG_WARN("check exist failed", KR(ret));
} else if (is_exist) {
@ -213,14 +243,6 @@ int ObHeartBeatProcess::do_heartbeat_event(const ObLeaseResponse &lease_response
if (OB_SUCCESS != (tmp_ret = OTC_MGR.got_versions(lease_response.tenant_config_version_))) {
LOG_WARN("tenant got versions failed", K(tmp_ret));
}
if (server_id_persist_task_.is_need_retry()) {
if (OB_SUCCESS != (tmp_ret = TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, server_id_persist_task_, delay, repeat))) {
LOG_WARN("schedule server_id persist task failed", K(tmp_ret));
} else {
server_id_persist_task_.disable_need_retry_flag();
}
}
}
return ret;
}

View File

@ -68,6 +68,7 @@ private:
int try_reload_config(const int64_t config_version);
int try_reload_time_zone_info(const int64_t time_zone_info_version);
private:
void check_and_update_server_id_(const uint64_t server_id);
bool inited_;
ObZoneLeaseInfoUpdateTask update_task_;
share::ObZoneLeaseInfo zone_lease_info_;

View File

@ -1982,18 +1982,6 @@ int ObPreProcessServerP::process()
return ret;
}
int ObPreBootstrapCreateServerWorkingDirP::process()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(gctx_.ob_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("observer is null", K(ret));
} else {
ret = OB_NOT_SUPPORTED;
}
return ret;
}
int ObHandlePartTransCtxP::process()
{

View File

@ -195,7 +195,6 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_GET_TENANT_REFRESHED_SCHEMA_VERSION, ObGetTenantSc
OB_DEFINE_PROCESSOR_OBADMIN(Srv, OB_UPDATE_TENANT_MEMORY, ObUpdateTenantMemoryP);
OB_DEFINE_PROCESSOR_S(Srv, OB_RENEW_IN_ZONE_HB, ObRenewInZoneHbP);
OB_DEFINE_PROCESSOR_S(Srv, OB_PRE_PROCESS_SERVER, ObPreProcessServerP);
OB_DEFINE_PROCESSOR_S(Srv, OB_PRE_BOOTSTRAP_CREATE_SERVER_WORKING_DIR, ObPreBootstrapCreateServerWorkingDirP);
OB_DEFINE_PROCESSOR_S(Srv, OB_HANDLE_PART_TRANS_CTX, ObHandlePartTransCtxP);
OB_DEFINE_PROCESSOR_S(Srv, OB_SERVER_FLUSH_OPT_STAT_MONITORING_INFO, ObFlushLocalOptStatMonitoringInfoP);
OB_DEFINE_PROCESSOR_S(Srv, OB_SET_MEMBER_LIST, ObRpcSetMemberListP);

View File

@ -2215,6 +2215,9 @@ int ObServer::init_global_context()
gctx_.flashback_scn_ = opts_.flashback_scn_;
gctx_.server_id_ = config_.server_id;
if (is_valid_server_id(gctx_.server_id_)) {
LOG_INFO("this observer has had a valid server_id", K(gctx_.server_id_));
}
if ((PHY_FLASHBACK_MODE == gctx_.startup_mode_ || PHY_FLASHBACK_VERIFY_MODE == gctx_.startup_mode_)
&& 0 >= gctx_.flashback_scn_) {
ret = OB_INVALID_ARGUMENT;

View File

@ -1399,13 +1399,11 @@ int ObService::bootstrap(const obrpc::ObBootstrapArg &arg)
bool server_empty = false;
ObCheckServerEmptyArg new_arg;
new_arg.mode_ = ObCheckServerEmptyArg::BOOTSTRAP;
// when OFS mode, this server dir hasn't been created, skip log scan
const bool wait_log_scan = true;
if (OB_FAIL(check_server_empty(new_arg, wait_log_scan, server_empty))) {
if (OB_FAIL(check_server_empty(server_empty))) {
BOOTSTRAP_LOG(WARN, "check_server_empty failed", K(ret), K(new_arg));
} else if (!server_empty) {
ret = OB_ERR_SYS;
BOOTSTRAP_LOG(WARN, "observer is not empty", K(ret));
BOOTSTRAP_LOG(WARN, "this observer is not empty", KR(ret), K(GCTX.self_addr()));
} else if (OB_FAIL(pre_bootstrap.prepare_bootstrap(master_rs))) {
BOOTSTRAP_LOG(ERROR, "failed to prepare boot strap", K(rs_list), K(ret));
} else {
@ -1478,10 +1476,8 @@ int ObService::is_empty_server(const obrpc::ObCheckServerEmptyArg &arg, obrpc::B
KR(ret), K(arg), K(sys_data_version));
} else {
bool server_empty = false;
// server dir must be created when 1) local mode, 2) OFS bootstrap this server
const bool wait_log_scan = ObCheckServerEmptyArg::BOOTSTRAP == arg.mode_;
if (OB_FAIL(check_server_empty(arg, wait_log_scan, server_empty))) {
LOG_WARN("check_server_empty failed", K(ret), K(arg));
if (OB_FAIL(check_server_empty(server_empty))) {
LOG_WARN("check_server_empty failed", K(ret));
} else {
is_empty = server_empty;
}
@ -1497,6 +1493,9 @@ int ObService::check_server_for_adding_server(
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", KR(ret), K(arg));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, sys_tenant_data_version))) {
LOG_WARN("fail to get sys tenant data version", KR(ret));
} else if (arg.get_sys_tenant_data_version() > 0
@ -1506,19 +1505,28 @@ int ObService::check_server_for_adding_server(
KR(ret), K(arg), K(sys_tenant_data_version), K(arg.get_sys_tenant_data_version()));
} else {
bool server_empty = false;
ObCheckServerEmptyArg check_server_empty_arg;
check_server_empty_arg.mode_ = ObCheckServerEmptyArg::ADD_SERVER;
const bool wait_log_scan = ObCheckServerEmptyArg::BOOTSTRAP == check_server_empty_arg.mode_;
if (OB_FAIL(check_server_empty(check_server_empty_arg, wait_log_scan, server_empty))) {
LOG_WARN("check_server_empty failed", KR(ret), K(check_server_empty_arg), K(wait_log_scan));
if (OB_FAIL(check_server_empty(server_empty))) {
LOG_WARN("check_server_empty failed", KR(ret));
} else {
char build_version[common::OB_SERVER_VERSION_LENGTH] = {0};
ObServerInfoInTable::ObBuildVersion build_version_string;
ObZone zone;
int64_t sql_port = GCONF.mysql_port;
get_package_and_svn(build_version, sizeof(build_version));
if (OB_FAIL(zone.assign(GCONF.zone.str()))) {
if (OB_SUCC(ret) && server_empty) {
uint64_t server_id = arg.get_server_id();
GCTX.server_id_ = server_id;
GCONF.server_id = server_id;
if (OB_ISNULL(GCTX.config_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("GCTX.config_mgr_ is null", KR(ret));
} else if (OB_FAIL(GCTX.config_mgr_->dump2file())) {
LOG_ERROR("fail to execute dump2file, this server cannot be added, "
"please clear it and try again", KR(ret));
}
}
if (FAILEDx(zone.assign(GCONF.zone.str()))) {
LOG_WARN("fail to assign zone", KR(ret), K(GCONF.zone.str()));
} else if (OB_FAIL(build_version_string.assign(build_version))) {
LOG_WARN("fail to assign build version", KR(ret), K(build_version));
@ -1532,7 +1540,7 @@ int ObService::check_server_for_adding_server(
} else {}
}
}
LOG_INFO("generate result", KR(ret), K(arg), K(result));
FLOG_INFO("[CHECK_SERVER_EMPTY] generate result", KR(ret), K(arg), K(result));
return ret;
}
@ -1624,59 +1632,24 @@ int ObService::get_partition_count(obrpc::ObGetPartitionCountResult &result)
}
int ObService::check_server_empty(const ObCheckServerEmptyArg &arg, const bool wait_log_scan, bool &is_empty)
int ObService::check_server_empty(bool &is_empty)
{
// **TODO (linqiucen.lqc): if rs_epoch has been already valid, this server is not empty
int ret = OB_SUCCESS;
is_empty = true;
UNUSED(wait_log_scan);
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
if (ObCheckServerEmptyArg::BOOTSTRAP == arg.mode_) {
// "is_valid_heartbeat" is valid:
// 1. RS is between "start_service" and "full_service", the server has not added to RS;
// 2. RS is in "full_service" and the server has added to RS;
// 3. To avoid misjudgment in scenario 1 while add server, this check is skipped here
if (lease_state_mgr_.is_valid_heartbeat()) {
LOG_WARN("server already in rootservice lease");
is_empty = false;
}
}
// wait log scan finish
//
// For 4.0, it is not necessary to wait log scan finished.
//
// if (is_empty && wait_log_scan) {
// const int64_t WAIT_LOG_SCAN_TIME_US = 2 * 1000 * 1000; // only wait 2s for empty server
// const int64_t SLEEP_INTERVAL_US = 500;
// const int64_t start_time_us = ObTimeUtility::current_time();
// int64_t end_time_us = start_time_us;
// int64_t timeout_ts = THIS_WORKER.get_timeout_ts();
// if (INT64_MAX == THIS_WORKER.get_timeout_ts()) {
// timeout_ts = start_time_us + WAIT_LOG_SCAN_TIME_US;
// }
// while (!stopped_ && !gctx_.par_ser_->is_scan_disk_finished()) {
// end_time_us = ObTimeUtility::current_time();
// if (end_time_us > timeout_ts) {
// LOG_WARN("wait log scan finish timeout", K(timeout_ts), LITERAL_K(WAIT_LOG_SCAN_TIME_US));
// is_empty = false;
// break;
// }
// ob_usleep(static_cast<int32_t>(std::min(timeout_ts - end_time_us, SLEEP_INTERVAL_US)));
// }
// }
uint64_t server_id_in_GCONF = GCONF.server_id;
if (is_empty) {
// if (!gctx_.par_ser_->is_empty()) {
// LOG_WARN("partition service is not empty");
// is_empty = false;
// }
if (is_valid_server_id(GCTX.server_id_) || is_valid_server_id(server_id_in_GCONF)) {
is_empty = false;
FLOG_WARN("[CHECK_SERVER_EMPTY] server_id exists", K(GCTX.server_id_), K(server_id_in_GCONF));
}
}
if (is_empty) {
if (!OBSERVER.is_log_dir_empty()) {
LOG_WARN("log dir is not empty");
FLOG_WARN("[CHECK_SERVER_EMPTY] log dir is not empty");
is_empty = false;
}
}

View File

@ -255,7 +255,7 @@ private:
const bool need_checksum);
int register_self();
int check_server_empty(const obrpc::ObCheckServerEmptyArg &arg, const bool wait_log_scan, bool &server_empty);
int check_server_empty(bool &server_empty);
int handle_server_freeze_req_(const obrpc::ObMinorFreezeArg &arg);
int handle_tenant_freeze_req_(const obrpc::ObMinorFreezeArg &arg);

View File

@ -92,7 +92,6 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) {
RPC_PROCESSOR(ObRenewInZoneHbP, gctx_);
RPC_PROCESSOR(ObPreProcessServerP, gctx_);
RPC_PROCESSOR(ObRpcBroadcastRsListP, gctx_);
RPC_PROCESSOR(ObPreBootstrapCreateServerWorkingDirP, gctx_);
RPC_PROCESSOR(ObRpcBuildDDLSingleReplicaRequestP, gctx_);
RPC_PROCESSOR(ObRpcFetchTabletAutoincSeqCacheP, gctx_);
RPC_PROCESSOR(ObRpcBatchGetTabletAutoincSeqP, gctx_);

View File

@ -2232,6 +2232,7 @@ int ObRootService::renew_lease(const ObLeaseRequest &lease_request, ObLeaseRespo
int temp_ret = OB_SUCCESS;
int64_t lease_info_version = 0;
bool is_stopped = false;
lease_response.rs_server_status_ = RSS_INVALID;
if (is_full_service()) {
if (OB_FAIL(zone_manager_.get_lease_info_version(lease_info_version))) {
LOG_WARN("get_lease_info_version failed", K(ret));
@ -2243,6 +2244,8 @@ int ObRootService::renew_lease(const ObLeaseRequest &lease_request, ObLeaseRespo
if (!ObHeartbeatService::is_service_enabled()) {
if (FAILEDx(server_manager_.is_server_stopped(lease_request.server_, is_stopped))) {
LOG_WARN("check_server_stopped failed", KR(ret), "server", lease_request.server_);
} else {
lease_response.rs_server_status_ = is_stopped ? RSS_IS_STOPPED : RSS_IS_WORKING;
}
}
}
@ -2253,12 +2256,6 @@ int ObRootService::renew_lease(const ObLeaseRequest &lease_request, ObLeaseRespo
lease_response.server_id_ = server_id;
lease_response.force_frozen_status_ = to_alive;
lease_response.baseline_schema_version_ = baseline_schema_version_;
// set observer stopped after has no leader
if (is_full_service()) {
lease_response.rs_server_status_ = is_stopped ? RSS_IS_STOPPED : RSS_IS_WORKING;
} else {
lease_response.rs_server_status_ = RSS_INVALID;
}
(void)OTC_MGR.get_lease_response(lease_response);
// after split schema, the schema_version is not used, but for the legality detection, set schema_version to sys's schema_version

View File

@ -1770,7 +1770,7 @@ int ObServerManager::get_server_id(const ObZone &zone, const ObAddr &server, uin
} else {} // zone not match
}
if (OB_SUCC(ret) && OB_INVALID_ID == server_id) {
if (OB_SUCC(ret) && !is_valid_server_id(server_id)) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("get invalid server_id", K(ret), K(server_id), K(server_id));
}

View File

@ -83,17 +83,36 @@ int ObServerZoneOpService::add_servers(const ObIArray<ObAddr> &servers, const Ob
LOG_WARN("rpc_proxy_ is null", KR(ret), KP(rpc_proxy_));
} else if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx));
} else if (OB_FAIL(rpc_arg.init(
ObCheckServerForAddingServerArg::ADD_SERVER,
sys_tenant_data_version))) {
LOG_WARN("fail to init rpc arg", KR(ret), K(sys_tenant_data_version));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < servers.count(); ++i) {
const ObAddr &addr = servers.at(i);
int64_t timeout = ctx.get_timeout();
uint64_t server_id = OB_INVALID_ID;
const int64_t ERR_MSG_BUF_LEN = OB_MAX_SERVER_ADDR_SIZE + 100;
char non_empty_server_err_msg[ERR_MSG_BUF_LEN] = "";
int64_t pos = 0;
rpc_arg.reset();
if (OB_UNLIKELY(timeout <= 0)) {
ret = OB_TIMEOUT;
LOG_WARN("ctx time out", KR(ret), K(timeout));
} else if (OB_FAIL(databuff_printf(
non_empty_server_err_msg,
ERR_MSG_BUF_LEN,
pos,
"add non-empty server %s",
to_cstring(addr)))) {
LOG_WARN("fail to execute databuff_printf", KR(ret), K(addr));
} else if (OB_FAIL(fetch_new_server_id_(server_id))) {
// fetch a new server id and insert the server into __all_server table
LOG_WARN("fail to fetch new server id", KR(ret));
} else if (OB_UNLIKELY(!is_valid_server_id(server_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("server id is invalid", KR(ret), K(server_id));
} else if (OB_FAIL(rpc_arg.init(
ObCheckServerForAddingServerArg::ADD_SERVER,
sys_tenant_data_version,
server_id))) {
LOG_WARN("fail to init rpc arg", KR(ret), K(sys_tenant_data_version), K(server_id));
} else if (OB_FAIL(rpc_proxy_->to(addr)
.timeout(timeout)
.check_server_for_adding_server(rpc_arg, rpc_result))) {
@ -101,16 +120,17 @@ int ObServerZoneOpService::add_servers(const ObIArray<ObAddr> &servers, const Ob
} else if (!rpc_result.get_is_server_empty()) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("adding non-empty server is not allowed", KR(ret));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "add non-empty server");
LOG_USER_ERROR(OB_OP_NOT_ALLOW, non_empty_server_err_msg);
} else if (OB_FAIL(zone_checking_for_adding_server_(zone, rpc_result.get_zone(), picked_zone))) {
LOG_WARN("zone checking for adding server is failed", KR(ret), K(zone), K(rpc_result.get_zone()));
} else if (OB_FAIL(add_server_(
addr,
server_id,
picked_zone,
rpc_result.get_sql_port(),
rpc_result.get_build_version()))) {
LOG_WARN("add_server failed", "server", addr, "zone", picked_zone, "sql_port",
rpc_result.get_sql_port(), "build_version", rpc_result.get_build_version(), KR(ret));
LOG_WARN("add_server failed", KR(ret), K(addr), K(server_id), K(picked_zone), "sql_port",
rpc_result.get_sql_port(), "build_version", rpc_result.get_build_version());
} else {}
}
}
@ -363,13 +383,13 @@ int ObServerZoneOpService::zone_checking_for_adding_server_(
}
int ObServerZoneOpService::add_server_(
const ObAddr &server,
const uint64_t server_id,
const ObZone &zone,
const int64_t sql_port,
const ObServerInfoInTable::ObBuildVersion &build_version)
{
int ret = OB_SUCCESS;
bool is_active = false;
uint64_t server_id = OB_INVALID_ID;
const int64_t now = ObTimeUtility::current_time();
ObServerInfoInTable server_info_in_table;
ObMySQLTransaction trans;
@ -377,12 +397,12 @@ int ObServerZoneOpService::add_server_(
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(is_inited_));
} else if (OB_UNLIKELY(!server.is_valid()
|| !is_valid_server_id(server_id)
|| zone.is_empty()
|| sql_port <= 0
|| build_version.is_empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(server), K(zone),
K(sql_port), K(build_version));
LOG_WARN("invalid argument", KR(ret), K(server), K(server_id), K(zone), K(sql_port), K(build_version));
} else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(server_change_callback_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql_proxy_ or server_change_callback_ is null", KR(ret),
@ -408,13 +428,7 @@ int ObServerZoneOpService::add_server_(
ret = OB_ENTRY_EXIST;
LOG_WARN("server exists", KR(ret), K(server_info_in_table));
}
if (FAILEDx(fetch_new_server_id_(server_id))) {
// fetch a new server id and insert the server into __all_server table
LOG_WARN("fail to fetch new server id", KR(ret));
} else if (OB_UNLIKELY(OB_INVALID_ID == server_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("server id is invalid", KR(ret), K(server_id));
} else if (OB_FAIL(server_info_in_table.init(
if (FAILEDx(server_info_in_table.init(
server,
server_id,
zone,

View File

@ -166,6 +166,7 @@ private:
ObZone &picked_zone);
int add_server_(
const common::ObAddr &server,
const uint64_t server_id,
const common::ObZone &zone,
const int64_t sql_port,
const share::ObServerInfoInTable::ObBuildVersion &build_version);

View File

@ -41,7 +41,7 @@ int ObDetectableIdGen::generate_detectable_id(ObDetectableId &detectable_id, uin
int ret = OB_SUCCESS;
// use server id to ensure that the detectable_id is unique in cluster
uint64_t server_id = GCTX.server_id_;
if (0 == server_id) {
if (!is_valid_server_id(server_id)) {
ret = OB_SERVER_IS_INIT;
LIB_LOG(WARN, "[DM] server id is invalid");
} else {

View File

@ -108,7 +108,7 @@ int ObHBRequest::init(
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!server.is_valid()
|| OB_INVALID_ID == server_id
|| !is_valid_server_id(server_id)
|| !rs_addr.is_valid()
|| palf::INVALID_PROPOSAL_ID == epoch_id)) {
ret = OB_INVALID_ARGUMENT;
@ -135,7 +135,7 @@ int ObHBRequest::assign(const ObHBRequest &other)
bool ObHBRequest::is_valid() const
{
return server_.is_valid()
&& OB_INVALID_ID != server_id_
&& is_valid_server_id(server_id_)
&& rs_addr_.is_valid()
&& rs_server_status_ > RSS_INVALID
&& rs_server_status_ < RSS_MAX

View File

@ -5882,20 +5882,39 @@ OB_SERIALIZE_MEMBER((ObLabelSeComponentDDLArg, ObDDLArg), ddl_type_, schema_, po
OB_SERIALIZE_MEMBER((ObLabelSeLabelDDLArg, ObDDLArg), ddl_type_, schema_, policy_name_);
OB_SERIALIZE_MEMBER((ObLabelSeUserLevelDDLArg, ObDDLArg), ddl_type_, level_schema_, policy_name_);
OB_SERIALIZE_MEMBER(ObCheckServerEmptyArg, mode_, sys_data_version_);
OB_SERIALIZE_MEMBER(ObCheckServerForAddingServerArg, mode_, sys_tenant_data_version_);
int ObCheckServerForAddingServerArg::init(const Mode &mode, const uint64_t sys_tenant_data_version)
OB_SERIALIZE_MEMBER(ObCheckServerForAddingServerArg, mode_, sys_tenant_data_version_, server_id_);
int ObCheckServerForAddingServerArg::init(
const Mode &mode,
const uint64_t sys_tenant_data_version,
const uint64_t server_id)
{
int ret = OB_SUCCESS;
mode_ = mode;
sys_tenant_data_version_ = sys_tenant_data_version;
if (0 == sys_tenant_data_version || !is_valid_server_id(server_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", KR(ret), K(mode), K(sys_tenant_data_version), K(server_id));
} else {
mode_ = mode;
sys_tenant_data_version_ = sys_tenant_data_version;
server_id_ = server_id;
}
return ret;
}
int ObCheckServerForAddingServerArg::assign(const ObCheckServerForAddingServerArg &other) {
int ret = OB_SUCCESS;
mode_ = other.mode_;
sys_tenant_data_version_ = other.sys_tenant_data_version_;
server_id_ = other.server_id_;
return ret;
}
bool ObCheckServerForAddingServerArg::is_valid() const
{
return 0 != sys_tenant_data_version_ && is_valid_server_id(server_id_);
}
void ObCheckServerForAddingServerArg::reset()
{
sys_tenant_data_version_ = 0;
server_id_ = OB_INVALID_ID;
}
OB_SERIALIZE_MEMBER(
ObCheckServerForAddingServerResult,
is_server_empty_,
@ -6376,18 +6395,6 @@ void CheckLeaderRpcIndex::reset()
tenant_id_ = OB_INVALID_TENANT_ID;
}
bool ObPreBootstrapCreateServerWorkingDirArg::is_valid() const
{
return server_id_ > 0;
}
void ObPreBootstrapCreateServerWorkingDirArg::reset()
{
server_id_ = 0;
}
OB_SERIALIZE_MEMBER(ObPreBootstrapCreateServerWorkingDirArg, server_id_);
bool ObBatchCheckRes::is_valid() const
{
return results_.count() > 0 && index_.is_valid();

View File

@ -7548,10 +7548,12 @@ public:
ADD_SERVER
};
ObCheckServerForAddingServerArg(): mode_(ADD_SERVER), sys_tenant_data_version_(0) {}
TO_STRING_KV(K_(mode), K_(sys_tenant_data_version));
int init(const Mode &mode, const uint64_t sys_tenant_data_version);
ObCheckServerForAddingServerArg(): mode_(ADD_SERVER), sys_tenant_data_version_(0), server_id_(OB_INVALID_ID) {}
TO_STRING_KV(K_(mode), K_(sys_tenant_data_version), K_(server_id));
int init(const Mode &mode, const uint64_t sys_tenant_data_version, const uint64_t server_id);
int assign(const ObCheckServerForAddingServerArg &other);
bool is_valid() const;
void reset();
Mode get_mode() const
{
return mode_;
@ -7560,9 +7562,14 @@ public:
{
return sys_tenant_data_version_;
}
uint64_t get_server_id() const
{
return server_id_;
}
private:
Mode mode_;
uint64_t sys_tenant_data_version_;
uint64_t server_id_;
};
struct ObCheckServerForAddingServerResult
{
@ -7784,18 +7791,6 @@ public:
K_(ml_pk_index), K_(pkey_info_start_index));
};
struct ObPreBootstrapCreateServerWorkingDirArg
{
OB_UNIS_VERSION(1);
public:
uint64_t server_id_;
ObPreBootstrapCreateServerWorkingDirArg() : server_id_(OB_INVALID_ID) {}
~ObPreBootstrapCreateServerWorkingDirArg() { reset(); };
bool is_valid() const;
void reset();
TO_STRING_KV(K_(server_id));
};
struct ObBatchCheckRes
{
OB_UNIS_VERSION(1);

View File

@ -82,7 +82,7 @@ bool ObServerStatus::is_status_valid() const
bool ObServerStatus::is_valid() const
{
return OB_INVALID_ID != id_ && server_.is_valid() && is_status_valid()
return is_valid_server_id(id_) && server_.is_valid() && is_status_valid()
&& register_time_ >= 0 && last_hb_time_ >= 0 && block_migrate_in_time_ >= 0
&& stop_time_ >= 0 && start_service_time_ >= 0;
}

View File

@ -54,7 +54,7 @@ int ObServerInfoInTable::init(
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!server.is_valid()
|| OB_INVALID_ID == server_id
|| !is_valid_server_id(server_id)
|| zone.is_empty()
|| sql_port <= 0
|| status >= ObServerStatus::OB_DISPLAY_MAX
@ -109,7 +109,7 @@ int ObServerInfoInTable::assign(const ObServerInfoInTable &other)
bool ObServerInfoInTable::is_valid() const
{
return server_.is_valid()
&& OB_INVALID_ID != server_id_
&& is_valid_server_id(server_id_)
&& !zone_.is_empty()
&& sql_port_ > 0
&& status_ < ObServerStatus::OB_DISPLAY_MAX

View File

@ -156,7 +156,6 @@ public:
RPC_S(PR5 update_tenant_memory, OB_UPDATE_TENANT_MEMORY, (obrpc::ObTenantMemoryArg));
RPC_S(PR5 renew_in_zone_hb, OB_RENEW_IN_ZONE_HB, (share::ObInZoneHbRequest), share::ObInZoneHbResponse);
RPC_S(PR5 pre_process_server_status, OB_PRE_PROCESS_SERVER, (obrpc::ObPreProcessServerArg));
RPC_S(PR5 pre_bootstrap_create_server_working_dir, OB_PRE_BOOTSTRAP_CREATE_SERVER_WORKING_DIR, (ObPreBootstrapCreateServerWorkingDirArg));
RPC_S(PR5 handle_part_trans_ctx, OB_HANDLE_PART_TRANS_CTX, (obrpc::ObTrxToolArg), ObTrxToolRes);
RPC_S(PR5 flush_local_opt_stat_monitoring_info, obrpc::OB_SERVER_FLUSH_OPT_STAT_MONITORING_INFO, (obrpc::ObFlushOptStatArg));
RPC_AP(PR5 set_member_list, OB_SET_MEMBER_LIST, (obrpc::ObSetMemberListArgV2), obrpc::ObSetMemberListResult);

View File

@ -1426,9 +1426,9 @@ DEF_STR(_load_tde_encrypt_engine, OB_CLUSTER_PARAMETER, "NONE",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_STR(local_ip, OB_CLUSTER_PARAMETER, "", "the IP address of the machine on which the ObServer will be installed",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::READONLY));
DEF_INT(server_id, OB_CLUSTER_PARAMETER, "0", "[1, 65536]",
DEF_INT(server_id, OB_CLUSTER_PARAMETER, "0", "[1, 18446744073709551615]",
"the unique id that been assigned by rootservice for each observer in cluster, "
"default: 0 (invalid id), Range: [1, 65536]",
"default: 0 (invalid id), Range: [1, 18446744073709551615]",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::READONLY));
DEF_INT(_pipelined_table_function_memory_limit, OB_TENANT_PARAMETER, "524288000", "[1024,18446744073709551615]",
"pipeline table function result set memory size limit. default 524288000 (500M), Range: [1024,18446744073709551615]",

View File

@ -352,7 +352,7 @@ int ObPxCoordOp::inner_open()
int ret = OB_SUCCESS;
ObDfo *root_dfo = NULL;
if (OB_FAIL(ObPxReceiveOp::inner_open())) {
} else if (GCTX.server_id_ <= 0) {
} else if (!is_valid_server_id(GCTX.server_id_)) {
ret = OB_SERVER_IS_INIT;
LOG_WARN("Server is initializing", K(ret), K(GCTX.server_id_));
} else if (OB_FAIL(post_init_op_ctx())) {

View File

@ -275,7 +275,7 @@ int ObP2PDatahubManager::generate_p2p_dh_id(int64_t &p2p_dh_id)
// generate p2p dh id
// | <16> | <28> | 20
// server_id timestamp sequence
if (0 >= GCTX.server_id_) {
if (!is_valid_server_id(GCTX.server_id_)) {
ret = OB_SERVER_IS_INIT;
LOG_WARN("server id is unexpected", K(ret));
} else {