server index

This commit is contained in:
linqiucen 2024-10-22 04:17:42 +00:00 committed by ob-robot
parent aa92749d39
commit 521ef6a5fb
43 changed files with 327 additions and 112 deletions

View File

@ -209,7 +209,7 @@ typedef ObPreciseDateTime ObCreateTime;
typedef uint64_t ObPsStmtId;
const int32_t NOT_CHECK_FLAG = 0;
const int64_t MAX_SERVER_COUNT = 1024;
const int64_t MAX_SERVER_COUNT = 4095;
const uint64_t OB_SERVER_USER_ID = 0;
const int64_t OB_MAX_INDEX_PER_TABLE = 128;
const int64_t OB_MAX_SSTABLE_PER_TABLE = OB_MAX_INDEX_PER_TABLE + 1;
@ -2071,6 +2071,12 @@ OB_INLINE bool is_valid_server_id(const uint64_t server_id)
return (0 < server_id) && (OB_INVALID_ID != server_id);
}
// check whether a server_index is valid
OB_INLINE bool is_valid_server_index(const uint64_t server_index)
{
return (0 < server_index) && (server_index <= MAX_SERVER_COUNT);
}
//check whether an tenant_id is valid
OB_INLINE bool is_valid_tenant_id(const uint64_t tenant_id)
{

View File

@ -635,7 +635,7 @@ void MockTenantModuleEnv::init_gctx_gconf()
GCTX.session_mgr_ = &session_mgr_;
GCTX.scramble_rand_ = &scramble_rand_;
GCTX.locality_manager_ = &locality_manager_;
GCTX.server_id_ = 1;
(void) GCTX.set_server_id(1);
GCTX.rs_rpc_proxy_ = &rs_rpc_proxy_;
GCTX.srv_rpc_proxy_ = &srv_rpc_proxy_;
GCTX.rs_mgr_ = &rs_mgr_;

View File

@ -324,7 +324,7 @@ int ObMPBase::free_session()
LOG_INFO("free session successfully", K(ctx));
conn->is_sess_free_ = true;
if (OB_UNLIKELY(OB_FAIL(sql::ObSQLSessionMgr::is_need_clear_sessid(conn, is_need_clear)))) {
LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn->sessid_, "server_id", GCTX.server_id_);
LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn->sessid_);
} else if (is_need_clear) {
if (OB_FAIL(GCTX.session_mgr_->mark_sessid_unused(conn->sessid_))) {
LOG_WARN("mark session id unused failed", K(ret), "sessid", conn->sessid_);

View File

@ -81,7 +81,7 @@ int ObMPDisconnect::run()
EVENT_INC(SQL_USER_LOGOUTS_CUMULATIVE);
LOG_INFO("free session successfully", "sessid", ctx_.sessid_);
if (OB_UNLIKELY(OB_FAIL(sql::ObSQLSessionMgr::is_need_clear_sessid(&conn, is_need_clear)))) {
LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn.sessid_, "server_id", GCTX.server_id_);
LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn.sessid_);
} else if (is_need_clear) {
if (OB_FAIL(GCTX.session_mgr_->mark_sessid_unused(conn.sessid_))) {
LOG_WARN("mark session id unused failed", K(ret), "sessid", conn.sessid_);

View File

@ -218,7 +218,7 @@ void ObSMConnectionCallback::destroy(ObSMConnection& conn)
} else if (is_need_clear) {
if (OB_FAIL(GCTX.session_mgr_->mark_sessid_unused(conn.sessid_))) {
LOG_ERROR("fail to mark sessid unused", K(ret), K(conn.sessid_),
"proxy_sessid", conn.proxy_sessid_, "server_id", GCTX.server_id_);
"proxy_sessid", conn.proxy_sessid_);
} else {
LOG_INFO("mark session id unused", K(conn.sessid_));
}
@ -231,7 +231,6 @@ void ObSMConnectionCallback::destroy(ObSMConnection& conn)
"sessid", conn.sessid_,
"proxy_sessid", conn.proxy_sessid_,
"tenant_id", conn.tenant_id_,
"server_id", GCTX.server_id_,
"from_proxy", conn.is_proxy_,
"from_java_client", conn.is_java_client_,
"c/s protocol", get_cs_protocol_type_name(conn.get_cs_protocol_type()),
@ -264,8 +263,7 @@ int ObSMConnectionCallback::on_disconnect(observer::ObSMConnection& conn)
sess_info->set_shadow(true);
}
}
LOG_INFO("kill and revert session", K(conn.sessid_),
"proxy_sessid", conn.proxy_sessid_, "server_id", GCTX.server_id_, K(ret));
LOG_INFO("kill and revert session", K(conn.sessid_), "proxy_sessid", conn.proxy_sessid_, K(ret));
return ret;
}

View File

@ -168,8 +168,7 @@ int ObSMHandler::on_disconnect(easy_connection_t *c)
}
}
LOG_INFO("kill and revert session", K(conn->sessid_),
"proxy_sessid", conn->proxy_sessid_, "server_id", GCTX.server_id_,
K(tmp_ret), K(eret));
"proxy_sessid", conn->proxy_sessid_, K(tmp_ret), K(eret));
}
return eret;
}
@ -245,10 +244,9 @@ int ObSMHandler::on_close(easy_connection_t *c)
} else if (is_need_clear) {
if (OB_UNLIKELY(OB_FAIL(gctx_.session_mgr_->mark_sessid_unused(conn->sessid_)))) {
LOG_ERROR("fail to mark sessid unused", K(ret), K(conn->sessid_),
"proxy_sessid", conn->proxy_sessid_, "server_id", GCTX.server_id_);
"proxy_sessid", conn->proxy_sessid_);
} else {
LOG_INFO("mark sessid unused", K(conn->sessid_),
"proxy_sessid", conn->proxy_sessid_, "server_id", GCTX.server_id_);
LOG_INFO("mark sessid unused", K(conn->sessid_), "proxy_sessid", conn->proxy_sessid_);
}
} else {/*do nothing*/}
}
@ -267,7 +265,6 @@ int ObSMHandler::on_close(easy_connection_t *c)
"sessid", conn->sessid_,
"proxy_sessid", conn->proxy_sessid_,
"tenant_id", conn->tenant_id_,
"server_id", gctx_.server_id_,
"from_proxy", conn->is_proxy_,
"from_java_client", conn->is_java_client_,
"c/s protocol", get_cs_protocol_type_name(conn->get_cs_protocol_type()),

View File

@ -194,13 +194,13 @@ void ObHeartBeatProcess::check_and_update_server_id_(const uint64_t server_id)
// in upgrade period 4.1 -> 4.2, we need to persist the server_id via heartbeat
const int64_t delay = 0;
const bool repeat = false;
if (0 == GCTX.server_id_) {
GCTX.server_id_ = server_id;
if (0 == GCTX.get_server_id()) {
(void) GCTX.set_server_id(server_id);
LOG_INFO("receive new server id in GCTX", K(server_id));
} else if (server_id != GCTX.server_id_) {
} else if (server_id != GCTX.get_server_id()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("GCTX.server_id_ is not the same as server_id in RS", KR(ret),
K(GCTX.server_id_), K(server_id));
LOG_ERROR("GCTX.get_server_id() is not the same as server_id in RS", KR(ret),
K(GCTX.get_server_id()), K(server_id));
}
if (OB_FAIL(ret)) {
} else if (0 == GCONF.observer_id) {

View File

@ -2987,9 +2987,9 @@ int ObServer::init_global_context()
gctx_.startup_accel_handler_ = &startup_accel_handler_;
gctx_.flashback_scn_ = opts_.flashback_scn_;
gctx_.server_id_ = config_.observer_id;
if (is_valid_server_id(gctx_.server_id_)) {
LOG_INFO("this observer has had a valid server_id", K(gctx_.server_id_));
(void) gctx_.set_server_id(config_.observer_id);
if (is_valid_server_id(gctx_.get_server_id())) {
LOG_INFO("this observer has had a valid server_id", K(gctx_.get_server_id()));
}
if ((PHY_FLASHBACK_MODE == gctx_.startup_mode_ || PHY_FLASHBACK_VERIFY_MODE == gctx_.startup_mode_)
&& 0 >= gctx_.flashback_scn_) {

View File

@ -1692,13 +1692,13 @@ int ObService::set_server_id_(const int64_t server_id)
if (OB_UNLIKELY(!is_valid_server_id(server_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid server_id", KR(ret), K(server_id));
} else if (is_valid_server_id(GCTX.server_id_) || is_valid_server_id(GCONF.observer_id)) {
} else if (is_valid_server_id(GCTX.get_server_id()) || is_valid_server_id(GCONF.observer_id)) {
ret = OB_ERR_UNEXPECTED;
uint64_t server_id_in_gconf = GCONF.observer_id;
LOG_WARN("server_id is only expected to be set once", KR(ret),
K(server_id), K(GCTX.server_id_), K(server_id_in_gconf));
K(server_id), K(GCTX.get_server_id()), K(server_id_in_gconf));
} else {
GCTX.server_id_ = server_id;
(void) GCTX.set_server_id(server_id);
GCONF.observer_id = server_id;
if (OB_ISNULL(GCTX.config_mgr_)) {
ret = OB_ERR_UNEXPECTED;
@ -1822,10 +1822,10 @@ int ObService::prepare_server_for_adding_server(
// If adding server during bootstrap, server is expected to be not empty.
// Just check this server_id same to the server_id set before.
const uint64_t server_id_in_GCONF = GCONF.observer_id;
if (server_id != GCTX.server_id_ || server_id != server_id_in_GCONF) {
if (server_id != GCTX.get_server_id() || server_id != server_id_in_GCONF) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("server_id not same to that set before.", KR(ret),
"server_id_for_adding_server", server_id, K(GCTX.server_id_), K(server_id_in_GCONF));
"server_id_for_adding_server", server_id, K(GCTX.get_server_id()), K(server_id_in_GCONF));
} else {
server_empty = false;
}
@ -2275,9 +2275,9 @@ int ObService::check_server_empty(bool &is_empty)
} else {
uint64_t server_id_in_GCONF = GCONF.observer_id;
if (is_empty) {
if (is_valid_server_id(GCTX.server_id_) || is_valid_server_id(server_id_in_GCONF)) {
if (is_valid_server_id(GCTX.get_server_id()) || is_valid_server_id(server_id_in_GCONF)) {
is_empty = false;
FLOG_WARN("[CHECK_SERVER_EMPTY] server_id exists", K(GCTX.server_id_), K(server_id_in_GCONF));
FLOG_WARN("[CHECK_SERVER_EMPTY] server_id exists", K(GCTX.get_server_id()), K(server_id_in_GCONF));
}
}
if (is_empty) {

View File

@ -739,6 +739,7 @@ int ObServerZoneOpService::add_server_(
bool is_active = false;
const int64_t now = ObTimeUtility::current_time();
ObServerInfoInTable server_info_in_table;
ObArray<uint64_t> server_id_in_cluster;
ObMySQLTransaction trans;
DEBUG_SYNC(BEFORE_ADD_SERVER_TRANS);
if (OB_UNLIKELY(!is_inited_)) {
@ -781,7 +782,13 @@ int ObServerZoneOpService::add_server_(
ret = OB_ENTRY_EXIST;
LOG_WARN("server exists", KR(ret), K(server_info_in_table));
}
if (FAILEDx(server_info_in_table.init(
if (FAILEDx(ObServerTableOperator::get_clusters_server_id(trans, server_id_in_cluster))) {
LOG_WARN("fail to get servers' id in the cluster", KR(ret));
} else if (OB_UNLIKELY(!check_server_index_(server_id, server_id_in_cluster))) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("server index is outdated due to concurrent operations", KR(ret), K(server_id), K(server_id_in_cluster));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "server index is outdated due to concurrent operations, ADD_SERVER is");
} else if (OB_FAIL(server_info_in_table.init(
server,
server_id,
zone,
@ -985,26 +992,68 @@ int ObServerZoneOpService::construct_rs_list_arg(ObRsListArg &rs_list_arg)
int ObServerZoneOpService::fetch_new_server_id_(uint64_t &server_id)
{
int ret = OB_SUCCESS;
ObArray<uint64_t> server_id_in_cluster;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid sql proxy", KR(ret), KP(sql_proxy_));
} else if (OB_FAIL(ObServerTableOperator::get_clusters_server_id(*sql_proxy_, server_id_in_cluster))) {
LOG_WARN("fail to get server_ids in the cluster", KR(ret), KP(sql_proxy_));
} else if (OB_UNLIKELY(server_id_in_cluster.count() >= MAX_SERVER_COUNT)) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("server count reaches the limit", KR(ret), K(server_id_in_cluster.count()));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "server count reaches the limit, ADD_SERVER is");
} else {
uint64_t new_max_id = OB_INVALID_ID;
uint64_t candidate_server_id = OB_INVALID_ID;
ObMaxIdFetcher id_fetcher(*sql_proxy_);
if (OB_FAIL(id_fetcher.fetch_new_max_id(
OB_SYS_TENANT_ID,
OB_MAX_USED_SERVER_ID_TYPE,
new_max_id))) {
candidate_server_id))) {
LOG_WARN("fetch_new_max_id failed", KR(ret));
} else {
server_id = new_max_id;
uint64_t new_candidate_server_id = candidate_server_id;
while (!check_server_index_(new_candidate_server_id, server_id_in_cluster)) {
if (new_candidate_server_id % 10 == 0) {
LOG_INFO("[FETCH NEW SERVER ID] periodical log", K(new_candidate_server_id), K(server_id_in_cluster));
}
++new_candidate_server_id;
}
if (new_candidate_server_id != candidate_server_id
&& OB_FAIL(id_fetcher.update_server_max_id(candidate_server_id, new_candidate_server_id))) {
LOG_WARN("fail to update server max id", KR(ret), K(candidate_server_id), K(new_candidate_server_id),
K(server_id_in_cluster));
}
if (OB_SUCC(ret)) {
server_id = new_candidate_server_id;
LOG_INFO("[FETCH NEW SERVER ID] new candidate server id", K(server_id), K(server_id_in_cluster));
}
}
}
return ret;
}
bool ObServerZoneOpService::check_server_index_(
const uint64_t candidate_server_id,
const common::ObIArray<uint64_t> &server_id_in_cluster) const
{
// server_index = server_id % 4096
// server_index cannot be zero and must be unique in the cluster
bool is_good_candidate = true;
const uint64_t candidate_index = ObShareUtil::compute_server_index(candidate_server_id);
if (0 == candidate_index) {
is_good_candidate = false;
} else {
for (int64_t i = 0; i < server_id_in_cluster.count() && is_good_candidate; ++i) {
const uint64_t server_index = ObShareUtil::compute_server_index(server_id_in_cluster.at(i));
if (candidate_index == server_index) {
is_good_candidate = false;
}
}
}
return is_good_candidate;
}
int ObServerZoneOpService::check_server_have_enough_resource_for_delete_server_(
const ObIArray<ObAddr> &servers,
const ObZone &zone)

View File

@ -205,6 +205,9 @@ private:
const obrpc::ObAdminServerArg::AdminServerOp &op);
int check_and_update_service_epoch_(common::ObMySQLTransaction &trans);
int fetch_new_server_id_(uint64_t &server_id);
bool check_server_index_(
const uint64_t candidate_server_id,
const common::ObIArray<uint64_t> &server_id_in_cluster) const;
int check_server_have_enough_resource_for_delete_server_(
const ObIArray<common::ObAddr> &servers,
const common::ObZone &zone);

View File

@ -1753,6 +1753,7 @@ int ObAdminUpgradeCmd::execute(const Bool &upgrade)
int ObAdminRollingUpgradeCmd::execute(const obrpc::ObAdminRollingUpgradeArg &arg)
{
int ret = OB_SUCCESS;
uint64_t max_server_id = 0;
HEAP_VAR(ObAdminSetConfigItem, item) {
obrpc::ObAdminSetConfigArg set_config_arg;
set_config_arg.is_inner_ = true;
@ -1808,6 +1809,14 @@ int ObAdminRollingUpgradeCmd::execute(const obrpc::ObAdminRollingUpgradeArg &arg
}
} // end while
}
if (OB_FAIL(ret) || GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_4_0) {
} else if (OB_FAIL(ObServerTableOperator::get_clusters_max_server_id(max_server_id))) {
LOG_WARN("fail to get max server id", KR(ret));
} else if (OB_UNLIKELY(!is_valid_server_index(max_server_id))) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("max_server_id should be a valid server index", KR(ret), K(max_server_id));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "max server id in the cluster cannot be larget than MAX_SERVER_COUNT, UPGRADE is");
}
// end rolling upgrade, should raise min_observer_version
const char *min_obs_version_name = "min_observer_version";
if (FAILEDx(item.name_.assign(min_obs_version_name))) {

View File

@ -34,18 +34,18 @@ ObDetectableIdGen &ObDetectableIdGen::instance()
int ObDetectableIdGen::generate_detectable_id(ObDetectableId &detectable_id, uint64_t tenant_id)
{
int ret = OB_SUCCESS;
// use server id to ensure that the detectable_id is unique in cluster
uint64_t server_id = GCTX.server_id_;
if (!is_valid_server_id(server_id)) {
// use server index to ensure that the detectable_id is unique in cluster
uint64_t server_index = GCTX.get_server_index();
if (OB_UNLIKELY(!is_valid_server_index(server_index))) {
ret = OB_SERVER_IS_INIT;
LIB_LOG(WARN, "[DM] server id is invalid");
LIB_LOG(WARN, "[DM] server index is invalid", K(server_index));
} else {
detectable_id.first_ = get_detect_sequence_id();
// use timestamp to ensure that the detectable_id is unique during the process
uint64_t timestamp = ObTimeUtility::current_time();
// [ server_id (16bits) ][ timestamp (32bits) ]
// [ server_index (16bits) ][ timestamp (32bits) ]
// only if qps > 2^48 or same server reboots after 2^48 the detectable_id be repeated
detectable_id.second_ = (server_id) << 48 | (timestamp & 0x0000FFFFFFFFFFFF);
detectable_id.second_ = (server_index) << 48 | (timestamp & 0x0000FFFFFFFFFFFF);
detectable_id.tenant_id_ = tenant_id;
}
return ret;

View File

@ -119,8 +119,8 @@ int ObHeartbeatHandler::handle_heartbeat(
} else {
// const uint64_t server_id = hb_request.get_server_id();
const share::RSServerStatus rs_server_status = hb_request.get_rs_server_status();
// if (GCTX.server_id_ != server_id) {
// LOG_INFO("receive new server id", "old server_id_", GCTX.server_id_, "new server_id_", server_id);
// if (GCTX.get_server_id() != server_id) {
// LOG_INFO("receive new server id", "old server_id_", GCTX.get_server_id(), "new server_id_", server_id);
// GCTX.server_id_ = server_id;
// }
if (GCTX.rs_server_status_ != rs_server_status) {
@ -168,7 +168,7 @@ int ObHeartbeatHandler::init_hb_response_(share::ObHBResponse &hb_response)
share::ObServerInfoInTable::ObBuildVersion build_version;
common::ObZone zone;
int64_t test_id = ERRSIM_DISK_ERROR ? 2 : OB_INVALID_ID;
if (test_id == GCTX.server_id_) {
if (test_id == GCTX.get_server_id()) {
server_health_status.reset();
server_health_status.init(ObServerHealthStatus::DATA_DISK_STATUS_ERROR);
}

View File

@ -422,6 +422,34 @@ int ObMaxIdFetcher::fetch_new_max_id(const uint64_t tenant_id,
return ret;
}
int ObMaxIdFetcher::update_server_max_id(const uint64_t max_server_id, const uint64_t next_max_server_id)
{
int ret = OB_SUCCESS;
uint64_t fetched_max_server_id = OB_INVALID_ID;
ObMySQLTransaction trans;
if (OB_FAIL(trans.start(&proxy_, OB_SYS_TENANT_ID, false))) {
LOG_WARN("fail to to start transaction", KR(ret));
} else if (OB_FAIL(fetch_max_id(trans, OB_SYS_TENANT_ID, OB_MAX_USED_SERVER_ID_TYPE, fetched_max_server_id))) {
LOG_WARN("failed to get max id", KR(ret));
} else if (OB_UNLIKELY(max_server_id != fetched_max_server_id)) {
ret = OB_NEED_RETRY;
LOG_WARN("max_server_id has been increased, please retry", KR(ret), K(max_server_id), K(fetched_max_server_id));
} else if (OB_FAIL(update_max_id(trans, OB_SYS_TENANT_ID, OB_MAX_USED_SERVER_ID_TYPE, next_max_server_id))) {
LOG_WARN("failed to update max id", KR(ret), K(next_max_server_id));
}
if (trans.is_started()) {
const bool is_commit = (OB_SUCC(ret));
int temp_ret = OB_SUCCESS;
if (OB_SUCCESS != (temp_ret = trans.end(is_commit))) {
LOG_WARN("failed to end trans", K(is_commit), K(temp_ret));
ret = (OB_SUCCESS == ret) ? temp_ret : ret;
}
}
LOG_INFO("update server max id", KR(ret), K(fetched_max_server_id), K(max_server_id), K(next_max_server_id));
return ret;
}
int ObMaxIdFetcher::update_max_id(ObISQLClient &sql_client, const uint64_t tenant_id,
ObMaxIdType max_id_type, const uint64_t max_id)
{

View File

@ -94,6 +94,7 @@ public:
int fetch_new_max_id(const uint64_t tenant_id, ObMaxIdType id_type,
uint64_t &max_id, const uint64_t initial = UINT64_MAX,
const int64_t size = 1);
int update_server_max_id(const uint64_t max_server_id, const uint64_t next_max_server_id);
// For generate new tablet_ids
int fetch_new_max_ids(const uint64_t tenant_id, ObMaxIdType id_type,
uint64_t &max_id, uint64_t size);

View File

@ -13,9 +13,11 @@
#define USING_LOG_PREFIX SERVER
#include "ob_server_struct.h"
#include "lib/thread_local/ob_tsi_factory.h"
#include "lib/ob_define.h"
#include "share/schema/ob_schema_service.h"
#include "share/ob_web_service_root_addr.h"
#include "share/ob_lease_struct.h"
#include "common/ob_version_def.h"
namespace oceanbase
{
namespace share
@ -45,6 +47,20 @@ share::ServerServiceStatus ObGlobalContext::get_server_service_status() const
return static_cast<share::ServerServiceStatus>(server_status);
}
uint64_t ObGlobalContext::get_server_index() const
{
uint64_t server_index = 0;
uint64_t server_id = ATOMIC_LOAD(&server_id_);
if (OB_UNLIKELY(!is_valid_server_id(server_id))) {
// return 0;
} else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_4_0) {
server_index = ObShareUtil::compute_server_index(server_id);
} else {
server_index = server_id;
}
return server_index;
}
DEF_TO_STRING(ObGlobalContext)
{
int64_t pos = 0;

View File

@ -244,7 +244,6 @@ struct ObGlobalContext
share::ObLocationService *location_service_;
int64_t start_time_;
int64_t *warm_up_start_time_;
uint64_t server_id_;
ObServiceStatus status_;
ObServerMode startup_mode_;
share::RSServerStatus rs_server_status_;
@ -282,6 +281,25 @@ struct ObGlobalContext
bool is_observer() const;
common::ObClusterRole get_cluster_role() const;
share::ServerServiceStatus get_server_service_status() const;
/*
Returns a globally unique, monotonically increasing server ID.
This ID is unique across the lifetime of the cluster and will not be reused.
*/
inline uint64_t get_server_id() const { return ATOMIC_LOAD(&server_id_); }
inline void set_server_id(const uint64_t id) { ATOMIC_SET(&server_id_, id); }
/*
Returns a currently unique server index within the cluster.
This index is unique among current servers in the cluster, but may be reused if a server is removed from the cluster.
When server ID has size limitation, like only 12 bits are allocated for server ID in session_id
to ensure its uniqueness, which implies the server ID cannot be greater than or equal to 4096,
we should consider using server index instead of server ID.
However, using server index requires a guarantee that no remnants of an old server remain after it is deleted.
For example, no sessions from the deleted server should exist anymore. If remnants persist after server deletion,
using server index is not permitted, otherwise, correctness issues may arise.
In such cases, you need to carefully consider how to resolve this problem by yourself.
*/
uint64_t get_server_index() const;
void set_upgrade_stage(obrpc::ObUpgradeStage upgrade_stage) { upgrade_stage_ = upgrade_stage; }
obrpc::ObUpgradeStage get_upgrade_stage() { return upgrade_stage_; }
DECLARE_TO_STRING;
@ -304,6 +322,7 @@ private:
bool has_start_service() const { return 0 < start_service_time_; }
obrpc::ObUpgradeStage upgrade_stage_;
uint64_t server_id_;
};
} // end of namespace share

View File

@ -657,7 +657,7 @@ int ObServerTableOperator::get_start_service_time(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get sql result", K(sql), K(ret));
} else if (OB_FAIL(result->next())) {
LOG_WARN("fail to get next", KR(ret), K(sql));;
LOG_WARN("fail to get next", KR(ret), K(sql));
} else {
EXTRACT_INT_FIELD_MYSQL(*result, "start_service_time", start_service_time, int64_t);
}
@ -679,6 +679,80 @@ int ObServerTableOperator::get(
const bool ONLY_ACTIVE_SERVERS = false;
return get_servers_info_of_zone(sql_proxy, empty_zone, ONLY_ACTIVE_SERVERS, all_servers_info_in_table);
}
int ObServerTableOperator::get_clusters_server_id(common::ObISQLClient &sql_proxy, ObIArray<uint64_t> &server_id_in_cluster)
{
int ret = OB_SUCCESS;
ObSqlString sql;
ObTimeoutCtx ctx;
server_id_in_cluster.reset();
if (OB_FAIL(ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
LOG_WARN("fail to get timeout ctx", K(ret), K(ctx));
} else if (OB_FAIL(sql.assign_fmt("SELECT id FROM %s", OB_ALL_SERVER_TNAME))) {
LOG_WARN("fail to append sql", K(ret));
} else {
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
if (OB_FAIL(sql_proxy.read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
LOG_WARN("fail to execute sql", K(sql), K(ret));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get sql result", K(sql), K(ret));
} else {
while (OB_SUCC(ret)) {
if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
LOG_WARN("result next failed", KR(ret));
} else {
ret = OB_SUCCESS;
break;
}
} else {
uint64_t server_id = 0;
EXTRACT_INT_FIELD_MYSQL(*result, "id", server_id, uint64_t);
if (FAILEDx(server_id_in_cluster.push_back(server_id))) {
LOG_WARN("fail to push back", KR(ret), K(server_id), K(server_id_in_cluster));
}
}
}
}
}
}
return ret;
}
int ObServerTableOperator::get_clusters_max_server_id(uint64_t &max_server_id)
{
int ret = OB_SUCCESS;
ObSqlString sql;
ObTimeoutCtx ctx;
max_server_id = 0;
if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), KP(GCTX.sql_proxy_));
} else if (OB_FAIL(ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
LOG_WARN("fail to get timeout ctx", K(ret), K(ctx));
} else if (OB_FAIL(sql.assign_fmt("SELECT max(id) AS max_id FROM %s", OB_ALL_SERVER_TNAME))) {
LOG_WARN("fail to append sql", K(ret));
} else {
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
if (OB_FAIL(GCTX.sql_proxy_->read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
LOG_WARN("fail to execute sql", K(sql), K(ret));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get sql result", K(sql), K(ret));
} else if (OB_FAIL(result->next())) {
LOG_WARN("fail to get next", KR(ret), K(sql));
} else {
EXTRACT_INT_FIELD_MYSQL(*result, "max_id", max_server_id, uint64_t);
}
}
}
return ret;
}
int ObServerTableOperator::get_servers_info_of_zone(
common::ObISQLClient &sql_proxy,
const ObZone &zone,
@ -738,7 +812,7 @@ int ObServerTableOperator::get(
if (OB_ITER_END == ret) {
ret = OB_SERVER_NOT_IN_WHITE_LIST;
}
LOG_WARN("fail to get next", KR(ret), K(sql));;
LOG_WARN("fail to get next", KR(ret), K(sql));
} else if (OB_FAIL(build_server_status(*result, server_status))) {
LOG_WARN("fail to build server_status",KR(ret));
} else if (OB_FAIL(server_info_in_table.build_server_info_in_table(server_status))) {

View File

@ -167,6 +167,10 @@ public:
common::ObISQLClient &sql_proxy,
const common::ObAddr &server,
ObServerInfoInTable &server_info_in_table);
// get all server's id in the cluster
static int get_clusters_server_id(common::ObISQLClient &sql_proxy, ObIArray<uint64_t> &server_id_in_cluster);
// get the max server_id in the cluster
static int get_clusters_max_server_id(uint64_t &max_server_id);
// insert the new server's info into __all_server table,
// it is only called when we want to add a new server into clusters
//

View File

@ -158,6 +158,9 @@ public:
static const char *replica_type_to_string(const ObReplicaType type);
static ObReplicaType string_to_replica_type(const char *str);
static ObReplicaType string_to_replica_type(const ObString &str);
static inline uint64_t compute_server_index(uint64_t server_id) {
return server_id % (MAX_SERVER_COUNT + 1);
}
private:
static int check_compat_data_version_(
const uint64_t required_data_version,

View File

@ -169,7 +169,7 @@ int ObUnitTableOperator::check_server_empty(const common::ObAddr &server, bool &
LOG_WARN("fail to get units", KR(ret), K(server));
} else if (units.count() > 0) {
is_empty = false;
LOG_DEBUG("server exists in the server list or migrate_from_server list", K(server), K(units));
LOG_TRACE("server exists in the server list or migrate_from_server list", K(server), K(units));
}
return ret;
}

View File

@ -342,16 +342,16 @@ OB_INLINE uint64_t ObDtlChannel::generate_id(uint64_t ch_cnt)
//
int64_t start_id = (common::ObTimeUtility::current_time() / 1000000) << 20;
static volatile uint64_t sequence = start_id;
const uint64_t svr_id = GCTX.server_id_;
const uint64_t server_index = GCTX.get_server_index();
uint64_t ch_id = -1;
if (1 < ch_cnt) {
uint64_t org_ch_id = 0;
do {
org_ch_id = (sequence & 0x0000FFFFFFFFFFFF) | (svr_id << 48);
ch_id = ((ATOMIC_AAF(&sequence, ch_cnt) & 0x0000FFFFFFFFFFFF) | (svr_id << 48));
org_ch_id = (sequence & 0x0000FFFFFFFFFFFF) | (server_index << 48);
ch_id = ((ATOMIC_AAF(&sequence, ch_cnt) & 0x0000FFFFFFFFFFFF) | (server_index << 48));
} while (ch_id < org_ch_id);
} else {
ch_id = ((ATOMIC_AAF(&sequence, 1) & 0x0000FFFFFFFFFFFF) | (svr_id << 48));
ch_id = ((ATOMIC_AAF(&sequence, 1) & 0x0000FFFFFFFFFFFF) | (server_index << 48));
}
return ch_id;
}

View File

@ -460,7 +460,7 @@ int ObExprSysContext::eval_instance(const ObExpr &expr, common::ObDatum &res,
int ret = OB_SUCCESS;
UNUSED(arg1);
UNUSED(arg2);
uint64_t instance_id = GCTX.server_id_;
uint64_t instance_id = GCTX.get_server_id();
//OZ(uint_string(expr, ctx, instance_id, res));
char out_id[256];
sprintf(out_id, "%lu", instance_id);

View File

@ -259,7 +259,7 @@ int ObExprUserEnv::eval_instance(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &re
{
int ret = OB_SUCCESS;
UNUSED(expr);
uint64_t instance_id = GCTX.server_id_;
uint64_t instance_id = GCTX.get_server_id();
number::ObNumber res_nmb;
ObEvalCtx::TempAllocGuard alloc_guard(ctx);
ObIAllocator &calc_alloc = alloc_guard.get_allocator();

View File

@ -42,7 +42,7 @@ uint64_t ObExprUuidShort::generate_uuid_short()
// uuid_short
// | <8> | <32> | <24>
// server_id server_start_time incremented_variable
static volatile uint64_t server_id_and_server_startup_time = ((GCTX.server_id_ & 255) << 56) |
static volatile uint64_t server_id_and_server_startup_time = ((GCTX.get_server_id() & 255) << 56) |
((static_cast<uint64_t>(common::ObTimeUtility::current_time() / 1000000) << 24) &
((static_cast<uint64_t>(1) << 56) - 1));
uint64_t uuid_short = ATOMIC_AAF(&server_id_and_server_startup_time, 1);

View File

@ -699,7 +699,7 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
dfo->get_interrupt_id()))) {
LOG_WARN("fail gen dfo int id", K(ret));
} else {
dfo->set_qc_server_id(GCTX.server_id_);
dfo->set_qc_server_id(GCTX.get_server_index());
dfo->set_parent_dfo_id(parent_dfo->get_dfo_id());
LOG_TRACE("cur dfo dop",
"dfo_id", dfo->get_dfo_id(),

View File

@ -166,7 +166,7 @@ int ObPxCoordOp::init_dfc(ObDfo &dfo, dtl::ObDtlChTotalInfo *ch_info)
K(dfo.get_qc_id()), K(dfo.get_dfo_id()));
} else {
ObDtlDfoKey dfo_key;
dfo_key.set(GCTX.server_id_, dfo.get_px_sequence_id(), dfo.get_qc_id(), dfo.get_dfo_id());
dfo_key.set(GCTX.get_server_index(), dfo.get_px_sequence_id(), dfo.get_qc_id(), dfo.get_dfo_id());
dfc_.set_timeout_ts(phy_plan_ctx->get_timeout_timestamp());
dfc_.set_receive();
dfc_.set_qc_coord();
@ -238,6 +238,7 @@ int ObPxCoordOp::inner_rescan()
int ObPxCoordOp::rescan()
{
int ret = OB_SUCCESS;
const uint64_t server_index = GCTX.get_server_index();
if (NULL == coord_info_.batch_rescan_ctl_
|| batch_rescan_param_version_ != coord_info_.batch_rescan_ctl_->param_version_) {
ObDfo *root_dfo = NULL;
@ -274,12 +275,12 @@ int ObPxCoordOp::rescan()
LOG_WARN("fail to register detectable_id", K(ret));
} else if (OB_FAIL(init_dfo_mgr(
ObDfoInterruptIdGen(interrupt_id_,
(uint32_t)GCTX.server_id_,
(uint32_t)server_index,
(uint32_t)MY_SPEC.qc_id_,
px_sequence_id_),
coord_info_.dfo_mgr_))) {
LOG_WARN("fail parse dfo tree",
"server_id", GCTX.server_id_,
"server_index", server_index,
"qc_id", MY_SPEC.qc_id_,
"execution_id", ctx_.get_my_session()->get_current_execution_id(),
K(ret));
@ -309,6 +310,7 @@ int ObPxCoordOp::rescan()
int ObPxCoordOp::inner_open()
{
int ret = OB_SUCCESS;
const uint64_t server_index = GCTX.get_server_index();
ObDfo *root_dfo = NULL;
ObString cur_query_str = ctx_.get_my_session()->get_current_query_string();
char *buf = reinterpret_cast<char*>(ctx_.get_allocator().alloc(cur_query_str.length() + 1));
@ -321,9 +323,9 @@ int ObPxCoordOp::inner_open()
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObPxReceiveOp::inner_open())) {
} else if (!is_valid_server_id(GCTX.server_id_)) {
} else if (OB_UNLIKELY(!is_valid_server_index(server_index))) {
ret = OB_SERVER_IS_INIT;
LOG_WARN("Server is initializing", K(ret), K(GCTX.server_id_));
LOG_WARN("Server is initializing", K(ret), K(server_index));
} else if (OB_FAIL(post_init_op_ctx())) {
LOG_WARN("init operator context failed", K(ret));
} else if (OB_FAIL(coord_info_.init())) {
@ -336,12 +338,12 @@ int ObPxCoordOp::inner_open()
LOG_WARN("fail to register detectable_id", K(ret));
} else if (OB_FAIL(init_dfo_mgr(
ObDfoInterruptIdGen(interrupt_id_,
(uint32_t)GCTX.server_id_,
(uint32_t)server_index,
(uint32_t)(static_cast<const ObPxCoordSpec*>(&get_spec()))->qc_id_,
px_sequence_id_),
coord_info_.dfo_mgr_))) {
LOG_WARN("fail parse dfo tree",
"server_id", GCTX.server_id_,
"server_index", server_index,
"qc_id", (static_cast<const ObPxCoordSpec*>(&get_spec()))->qc_id_,
"execution_id", ctx_.get_my_session()->get_current_execution_id(),
K(ret));
@ -890,7 +892,7 @@ int ObPxCoordOp::register_interrupt()
{
int ret = OB_SUCCESS;
px_sequence_id_ = GCTX.sql_engine_->get_px_sequence_id();
ObInterruptUtil::generate_query_interrupt_id((uint32_t)GCTX.server_id_,
ObInterruptUtil::generate_query_interrupt_id((uint32_t)GCTX.get_server_index(),
px_sequence_id_,
interrupt_id_);
if (OB_FAIL(SET_INTERRUPTABLE(interrupt_id_))) {

View File

@ -78,10 +78,14 @@ public:
static int interrupt_qc(ObPxSqcMeta &sqc, int code, ObExecContext *exec_ctx);
static int interrupt_qc(ObPxTask &task, int code, ObExecContext *exec_ctx);
// 将server_id、execution_id、qc_id共同组成中断id
static int generate_query_interrupt_id(const uint32_t server_id,
// Suggest using GCTX.get_server_index() instead of GCTX.get_server_id(),
// as it guarantees uniqueness within the cluster and is constrained to a maximum value of MAX_SERVER_COUNT.
static int generate_query_interrupt_id(const uint32_t server_index,
const uint64_t px_sequence_id,
common::ObInterruptibleTaskID &interrupt_id);
static int generate_px_interrupt_id(const uint32_t server_id,
// Suggest using GCTX.get_server_index() instead of GCTX.get_server_id(),
// as it guarantees uniqueness within the cluster and is constrained to a maximum value of MAX_SERVER_COUNT.
static int generate_px_interrupt_id(const uint32_t server_index,
const uint32_t qc_id,
const uint64_t px_sequence_id,
const int64_t dfo_id,

View File

@ -607,8 +607,8 @@ int ObPxTenantTargetMonitorP::process()
const uint64_t tenant_id = arg_.get_tenant_id();
const uint64_t follower_version = arg_.get_version();
// server id of the leader that the follower sync with previously.
const uint64_t prev_leader_server_id = ObPxTenantTargetMonitor::get_server_id(follower_version);
const uint64_t leader_server_id = GCTX.server_id_;
const uint64_t prev_leader_server_index = ObPxTenantTargetMonitor::get_server_index(follower_version);
const uint64_t leader_server_index = GCTX.get_server_index();
bool is_leader;
uint64_t leader_version;
result_.set_tenant_id(tenant_id);
@ -616,7 +616,7 @@ int ObPxTenantTargetMonitorP::process()
LOG_ERROR("get is_leader failed", K(ret), K(tenant_id));
} else if (!is_leader) {
result_.set_status(MONITOR_NOT_MASTER);
} else if (arg_.need_refresh_all_ || prev_leader_server_id != leader_server_id) {
} else if (arg_.need_refresh_all_ || prev_leader_server_index != leader_server_index) {
if (OB_FAIL(OB_PX_TARGET_MGR.reset_leader_statistics(tenant_id))) {
LOG_ERROR("reset leader statistics failed", K(ret));
} else if (OB_FAIL(OB_PX_TARGET_MGR.get_version(tenant_id, leader_version))) {
@ -625,7 +625,7 @@ int ObPxTenantTargetMonitorP::process()
result_.set_status(MONITOR_VERSION_NOT_MATCH);
result_.set_version(leader_version);
LOG_INFO("need refresh all", K(tenant_id), K(arg_.need_refresh_all_),
K(follower_version), K(prev_leader_server_id), K(leader_server_id));
K(follower_version), K(prev_leader_server_index), K(leader_server_index));
}
} else if (OB_FAIL(OB_PX_TARGET_MGR.get_version(tenant_id, leader_version))) {
LOG_WARN("get master_version failed", K(ret), K(tenant_id));

View File

@ -373,6 +373,7 @@ int ObPxTenantTargetMonitor::reset_follower_statistics(uint64_t version)
int ObPxTenantTargetMonitor::reset_leader_statistics()
{
int ret = OB_SUCCESS;
const uint64_t server_index = GCTX.get_server_index();
// write lock before reset map and refresh version.
SpinWLockGuard wlock_guard(spin_lock_);
global_target_usage_.clear();
@ -381,7 +382,7 @@ int ObPxTenantTargetMonitor::reset_leader_statistics()
} else {
version_ = get_new_version();
}
LOG_INFO("reset leader statistics", K(tenant_id_), K(ret), K(version_), K(GCTX.server_id_));
LOG_INFO("reset leader statistics", K(tenant_id_), K(ret), K(version_), K(server_index));
return ret;
}
@ -545,12 +546,12 @@ int ObPxTenantTargetMonitor::get_all_target_info(common::ObIArray<ObPxTargetInfo
uint64_t ObPxTenantTargetMonitor::get_new_version()
{
uint64_t current_time = common::ObTimeUtility::current_time();
uint64_t svr_id = GCTX.server_id_;
uint64_t new_version = ((current_time & 0x0000FFFFFFFFFFFF) | (svr_id << SERVER_ID_SHIFT));
uint64_t server_index = GCTX.get_server_index();
uint64_t new_version = ((current_time & 0x0000FFFFFFFFFFFF) | (server_index << SERVER_ID_SHIFT));
return new_version;
}
uint64_t ObPxTenantTargetMonitor::get_server_id(uint64_t version) {
uint64_t ObPxTenantTargetMonitor::get_server_index(uint64_t version) {
return (version >> SERVER_ID_SHIFT);
}

View File

@ -144,7 +144,7 @@ public:
// for virtual_table iter
int get_all_target_info(common::ObIArray<ObPxTargetInfo> &target_info_array);
static uint64_t get_server_id(uint64_t version);
static uint64_t get_server_index(uint64_t version);
TO_STRING_KV(K_(is_init), K_(tenant_id), K_(server), K_(dummy_cache_leader), K_(role));

View File

@ -249,14 +249,14 @@ int ObP2PDatahubManager::generate_p2p_dh_id(int64_t &p2p_dh_id)
// generate p2p dh id
// | <16> | <28> | 20
// server_id timestamp sequence
if (!is_valid_server_id(GCTX.server_id_)) {
const uint64_t server_index = GCTX.get_server_index();
if (OB_UNLIKELY(!is_valid_server_index(server_index))) {
ret = OB_SERVER_IS_INIT;
LOG_WARN("server id is unexpected", K(ret));
LOG_WARN("server index is unexpected", KR(ret), K(server_index));
} else {
const uint64_t svr_id = GCTX.server_id_;
int64_t ts = (common::ObTimeUtility::current_time() / 1000000) << 20;
int64_t seq_id = ATOMIC_AAF(&p2p_dh_id_, 1);
p2p_dh_id = (ts & 0x0000FFFFFFFFFFFF) | (svr_id << 48) | seq_id;
p2p_dh_id = (ts & 0x0000FFFFFFFFFFFF) | (server_index << 48) | seq_id;
}
return ret;
}

View File

@ -5546,7 +5546,7 @@ int ObLogicalOperator::allocate_partition_join_filter(const ObIArray<JoinFilterI
join_filter_create->set_tablet_id_expr(info.calc_part_id_expr_);
OZ(join_filter_create->compute_property());
OZ(bf_info.init(get_plan()->get_optimizer_context().get_session_info()->get_effective_tenant_id(),
filter_id, GCTX.server_id_,
filter_id, GCTX.get_server_id(),
join_filter_create->is_shared_join_filter(),
info.skip_subpart_,
join_filter_create->get_p2p_sequence_ids().at(0),

View File

@ -3657,17 +3657,17 @@ int ObSysVarEncoder::display_sess_info(ObSQLSessionInfo &sess, const char* curre
void ObSQLSessionInfo::gen_gtt_session_scope_unique_id()
{
static int64_t cur_ts = 0;
int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.server_id_);
int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.get_server_id());
gtt_session_scope_unique_id_ = next_ts;
LOG_DEBUG("check temporary table ssid session scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.server_id_), K(lbt()));
LOG_DEBUG("check temporary table ssid session scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.get_server_id()), K(lbt()));
}
void ObSQLSessionInfo::gen_gtt_trans_scope_unique_id()
{
static int64_t cur_ts = 0;
int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.server_id_);
int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.get_server_id());
gtt_trans_scope_unique_id_ = next_ts;
LOG_DEBUG("check temporary table ssid trans scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.server_id_), K(lbt()));
LOG_DEBUG("check temporary table ssid trans scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.get_server_id()), K(lbt()));
}
int ObAppInfoEncoder::serialize(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)

View File

@ -364,10 +364,10 @@ void ObSQLSessionMgr::destroy()
sess_hold_map_.destroy();
}
uint64_t ObSQLSessionMgr::extract_server_id(uint32_t sessid)
uint64_t ObSQLSessionMgr::extract_server_index(uint32_t sessid)
{
uint64_t server_id = sessid >> LOCAL_SEQ_LEN;
return server_id & MAX_SERVER_ID;
uint64_t server_index = sessid >> LOCAL_SEQ_LEN;
return server_index & MAX_SERVER_INDEX;
}
int ObSQLSessionMgr::inc_session_ref(const ObSQLSessionInfo *my_session)
@ -393,7 +393,7 @@ int ObSQLSessionMgr::inc_session_ref(const ObSQLSessionInfo *my_session)
//
//MASK: 1 表示是server自己生成connection id,
// 0 表示是proxy生成的connection id(已废弃,目前仅用于 in_mgr = false 的场景);
//Server Id: 集群中server的id由RS分配,集群内唯一;
//Server index: 集群中server的id由RS分配,集群内唯一,但在server删除后可能会被复用
//Local Seq: 一个server可用连接数,目前单台server最多有INT16_MAX个连接;
//
int ObSQLSessionMgr::create_sessid(uint32_t &sessid, bool in_mgr)
@ -401,29 +401,29 @@ int ObSQLSessionMgr::create_sessid(uint32_t &sessid, bool in_mgr)
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
sessid = 0;
const uint64_t server_id = GCTX.server_id_;
const uint64_t server_index = GCTX.get_server_index();
uint32_t local_seq = 0;
static uint32_t abnormal_seq = 0;//用于server_id == 0是的sessid分配
if (server_id > MAX_SERVER_ID) {
if (server_index > MAX_SERVER_INDEX) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("server id maybe invalid", K(ret), K(server_id));
LOG_ERROR("server index maybe invalid", K(ret), K(server_index));
} else if (!in_mgr) {
sessid = (GETTID() | LOCAL_SESSID_TAG);
sessid |= static_cast<uint32_t>(server_id << LOCAL_SEQ_LEN); // set observer
} else if (0 == server_id) {
sessid |= static_cast<uint32_t>(server_index << LOCAL_SEQ_LEN); // set observer
} else if (0 == server_index) {
local_seq = (ATOMIC_FAA(&abnormal_seq, 1) & MAX_LOCAL_SEQ);
uint32_t max_local_seq = MAX_LOCAL_SEQ;
uint32_t max_server_id = MAX_SERVER_ID;
LOG_WARN("server is initiating", K(server_id), K(local_seq), K(max_local_seq), K(max_server_id));
uint32_t max_server_index = MAX_SERVER_INDEX;
LOG_WARN("server is initiating", K(server_index), K(local_seq), K(max_local_seq), K(max_server_index));
} else if (OB_UNLIKELY(OB_SUCCESS != (ret = tmp_ret = get_avaiable_local_seq(local_seq)))) {
LOG_WARN("fail to get avaiable local_seq", K(local_seq));
} else {/*do nothing*/}
if (OB_SUCC(ret) && in_mgr) {
sessid = local_seq | SERVER_SESSID_TAG;// set observer sessid mark
sessid |= static_cast<uint32_t>(server_id << LOCAL_SEQ_LEN); // set observer
// high bit is reserved for server id
sessid |= static_cast<uint32_t>(1ULL << (LOCAL_SEQ_LEN + SERVER_ID_LEN));
sessid |= static_cast<uint32_t>(server_index << LOCAL_SEQ_LEN); // set observer
// high bit is reserved for server index
sessid |= static_cast<uint32_t>(1ULL << (LOCAL_SEQ_LEN + SERVER_INDEX_LEN));
}
return ret;
}
@ -815,8 +815,8 @@ int ObSQLSessionMgr::mark_sessid_used(uint32_t sess_id)
int ObSQLSessionMgr::mark_sessid_unused(uint32_t sess_id)
{
int ret = OB_SUCCESS;
uint64_t server_id = extract_server_id(sess_id);
if (server_id == 0) {
uint64_t server_index = extract_server_index(sess_id);
if (0 == server_index) {
// 参考:create_sessid方法
// 由于server_id == 0时, 此时的local_seq,是由ATOMIC_FAA(&abnormal_seq, 1)产生,
// 使用ATOMIC_FAA的原因无从考证(原作者的信息描述无任何具体信息),采取保守修改策略
@ -955,8 +955,8 @@ int ObSQLSessionMgr::is_need_clear_sessid(const ObSMConnection *conn, bool &is_n
LOG_WARN("unexpected parameter", K(conn));
} else if (is_server_sessid(conn->sessid_)
&& ObSMConnection::INITIAL_SESSID != conn->sessid_
&& 0 != extract_server_id(conn->sessid_)
&& GCTX.server_id_ == extract_server_id(conn->sessid_)
&& 0 != extract_server_index(conn->sessid_)
&& GCTX.get_server_index() == extract_server_index(conn->sessid_)
&& conn->is_need_clear_sessid_) {
is_need = true;
} else {/*do nothing*/ }

View File

@ -144,7 +144,7 @@ public:
int get_min_active_snapshot_version(share::SCN &snapshot_version);
//used for guarantee the unique sessid when observer generates sessid
static uint64_t extract_server_id(uint32_t sessid);
static uint64_t extract_server_index(uint32_t sessid);
static bool is_server_sessid(uint32_t sessid) {return SERVER_SESSID_TAG & sessid;}
static int is_need_clear_sessid(const observer::ObSMConnection *conn, bool &is_need);
int fetch_first_sessid();
@ -287,10 +287,11 @@ private:
// |Mask|resvd| Server Id | Local Seq = 16 + 2 |
// +----+------------------------------+--------------------------------+
static const uint16_t LOCAL_SEQ_LEN = 18;
static const uint16_t RESERVED_SERVER_ID_LEN = 1;
static const uint16_t SERVER_ID_LEN = 13 - RESERVED_SERVER_ID_LEN;
static const uint16_t RESERVED_SERVER_INDEX_LEN = 1;
static const uint16_t SERVER_INDEX_LEN = 13 - RESERVED_SERVER_INDEX_LEN;
static const uint32_t MAX_LOCAL_SEQ = (1ULL << LOCAL_SEQ_LEN) - 1;
static const uint64_t MAX_SERVER_ID = (1ULL << SERVER_ID_LEN) - 1;
// MAX_SERVER_INDEX cannot be larger than MAX_SERVER_COUNT
static const uint64_t MAX_SERVER_INDEX = (1ULL << SERVER_INDEX_LEN) - 1;
common::ObFixedQueue<void> sessid_sequence_;
uint32_t first_seq_;
uint32_t increment_sessid_;

View File

@ -985,8 +985,8 @@ void ObCOTabletOutputMergeCtx::after_update_tablet_for_major()
int tmp_ret = OB_SUCCESS;
ObBasicObjHandle<ObCompactionReportObj> report_obj_hdl;
if (OB_TMP_FAIL(MTL_SVR_OBJ_MGR.get_obj_handle(GCTX.server_id_, report_obj_hdl))) {
LOG_WARN_RET(tmp_ret, "failed to get report obj handle", "cur_svr_id", GCTX.server_id_);
if (OB_TMP_FAIL(MTL_SVR_OBJ_MGR.get_obj_handle(GCTX.get_server_id(), report_obj_hdl))) {
LOG_WARN_RET(tmp_ret, "failed to get report obj handle", "cur_svr_id", GCTX.get_server_id());
} else if (OB_TMP_FAIL(report_obj_hdl.get_obj()->update_exec_tablet(1, true/*is_finish_task*/))) {
LOG_WARN_RET(tmp_ret, "failed to inc exec tablet", K(get_ls_id()), K(get_tablet_id()));
}

View File

@ -523,7 +523,7 @@ void write_wrong_row(const ObTabletID &tablet_id, const ObDatumRow &row)
ret = OB_E(EventTable::EN_MAKE_DATA_CKM_ERROR_BY_WRITE_WRONG_ROW) ret;
if (OB_FAIL(ret)
&& tablet_id.id() > ObTabletID::MIN_USER_TABLET_ID
&& (OB_CHECKSUM_ERROR == ret || GCTX.server_id_ == -ret)) {
&& (OB_CHECKSUM_ERROR == ret || GCTX.get_server_id() == -ret)) {
ObDatumRow &tmp_row = const_cast<ObDatumRow &>(row);
tmp_row.storage_datums_[tmp_row.get_column_count() - 1].set_int(999);
LOG_ERROR("ERRSIM EN_MAKE_DATA_CKM_ERROR_BY_WRITE_WRONG_ROW", K(ret), K(tablet_id), K(row));

View File

@ -660,8 +660,8 @@ void ObTabletMajorOutputMergeCtx::after_update_tablet_for_major()
int tmp_ret = OB_SUCCESS;
ObBasicObjHandle<ObCompactionReportObj> report_obj_hdl;
if (OB_TMP_FAIL(MTL_SVR_OBJ_MGR.get_obj_handle(GCTX.server_id_, report_obj_hdl))) {
LOG_WARN_RET(tmp_ret, "failed to get report obj handle", "cur_svr_id", GCTX.server_id_);
if (OB_TMP_FAIL(MTL_SVR_OBJ_MGR.get_obj_handle(GCTX.get_server_id(), report_obj_hdl))) {
LOG_WARN_RET(tmp_ret, "failed to get report obj handle", "cur_svr_id", GCTX.get_server_id());
} else if (OB_TMP_FAIL(report_obj_hdl.get_obj()->update_exec_tablet(1, true/*is_finish_task*/))) {
LOG_WARN_RET(tmp_ret, "failed to inc exec tablet", K(get_ls_id()), K(get_tablet_id()));
}

View File

@ -1140,7 +1140,7 @@ int ObTabletMergeTask::process()
}
ret = SPECIFIED_SERVER_STOP_COMPACTION;
if (OB_FAIL(ret)) {
if (-ret == GCTX.server_id_) {
if (-ret == GCTX.get_server_id()) {
STORAGE_LOG(INFO, "ERRSIM SPECIFIED_SERVER_STOP_COMPACTION", K(ret));
return OB_EAGAIN;
} else {

View File

@ -78,7 +78,7 @@ void ObDiskUsageReportTask::runTimerTask()
int ret = OB_SUCCESS;
const ObAddr addr = GCTX.self_addr();
const int64_t self_svr_seq = GCTX.server_id_;
const int64_t self_svr_seq = GCTX.get_server_id();
char addr_buffer[MAX_IP_ADDR_LENGTH] = {};
if (OB_UNLIKELY(!is_inited_)) {

View File

@ -38,7 +38,7 @@ rpc::frame::ObReqTransport mock_transport(nullptr, nullptr);
static int init_dm()
{
int ret = OB_SUCCESS;
GCTX.server_id_ = 10086;
(void) GCTX.set_server_id(4095);
ObAddr self;
self.set_ip_addr("127.0.0.1", 8086);
if (OB_FAIL(dm->init(self, 1))) {