From 521ef6a5fbe05660aa2f4a7dec95c52e4a752349 Mon Sep 17 00:00:00 2001 From: linqiucen Date: Tue, 22 Oct 2024 04:17:42 +0000 Subject: [PATCH] server index --- deps/oblib/src/lib/ob_define.h | 8 +- mittest/mtlenv/mock_tenant_module_env.h | 2 +- src/observer/mysql/obmp_base.cpp | 2 +- src/observer/mysql/obmp_disconnect.cpp | 2 +- src/observer/mysql/obsm_conn_callback.cpp | 6 +- src/observer/mysql/obsm_handler.cpp | 9 +-- src/observer/ob_heartbeat.cpp | 10 +-- src/observer/ob_server.cpp | 6 +- src/observer/ob_service.cpp | 14 ++-- src/rootserver/ob_server_zone_op_service.cpp | 57 +++++++++++++- src/rootserver/ob_server_zone_op_service.h | 3 + src/rootserver/ob_system_admin_util.cpp | 9 +++ src/share/detect/ob_detect_manager.cpp | 12 +-- src/share/ob_heartbeat_handler.cpp | 6 +- src/share/ob_max_id_fetcher.cpp | 28 +++++++ src/share/ob_max_id_fetcher.h | 1 + src/share/ob_server_struct.cpp | 16 ++++ src/share/ob_server_struct.h | 21 ++++- src/share/ob_server_table_operator.cpp | 78 ++++++++++++++++++- src/share/ob_server_table_operator.h | 4 + src/share/ob_share_util.h | 3 + src/share/ob_unit_table_operator.cpp | 2 +- src/sql/dtl/ob_dtl_channel.h | 8 +- src/sql/engine/expr/ob_expr_sys_context.cpp | 2 +- src/sql/engine/expr/ob_expr_userenv.cpp | 2 +- src/sql/engine/expr/ob_expr_uuid_short.cpp | 2 +- src/sql/engine/px/ob_dfo_mgr.cpp | 2 +- src/sql/engine/px/ob_px_coord_op.cpp | 18 +++-- src/sql/engine/px/ob_px_interruption.h | 8 +- src/sql/engine/px/ob_px_rpc_processor.cpp | 8 +- .../engine/px/ob_px_tenant_target_monitor.cpp | 9 ++- .../engine/px/ob_px_tenant_target_monitor.h | 2 +- .../engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp | 8 +- src/sql/optimizer/ob_logical_operator.cpp | 2 +- src/sql/session/ob_sql_session_info.cpp | 8 +- src/sql/session/ob_sql_session_mgr.cpp | 36 ++++----- src/sql/session/ob_sql_session_mgr.h | 9 ++- src/storage/column_store/ob_co_merge_ctx.cpp | 4 +- .../compaction/ob_partition_merger.cpp | 2 +- .../compaction/ob_tablet_merge_ctx.cpp | 4 +- .../compaction/ob_tablet_merge_task.cpp | 2 +- src/storage/ob_disk_usage_reporter.cpp | 2 +- .../share/detect/test_ob_detect_manager.cpp | 2 +- 43 files changed, 327 insertions(+), 112 deletions(-) diff --git a/deps/oblib/src/lib/ob_define.h b/deps/oblib/src/lib/ob_define.h index 9446b0039..79cdf8693 100644 --- a/deps/oblib/src/lib/ob_define.h +++ b/deps/oblib/src/lib/ob_define.h @@ -209,7 +209,7 @@ typedef ObPreciseDateTime ObCreateTime; typedef uint64_t ObPsStmtId; const int32_t NOT_CHECK_FLAG = 0; -const int64_t MAX_SERVER_COUNT = 1024; +const int64_t MAX_SERVER_COUNT = 4095; const uint64_t OB_SERVER_USER_ID = 0; const int64_t OB_MAX_INDEX_PER_TABLE = 128; const int64_t OB_MAX_SSTABLE_PER_TABLE = OB_MAX_INDEX_PER_TABLE + 1; @@ -2071,6 +2071,12 @@ OB_INLINE bool is_valid_server_id(const uint64_t server_id) return (0 < server_id) && (OB_INVALID_ID != server_id); } +// check whether a server_index is valid +OB_INLINE bool is_valid_server_index(const uint64_t server_index) +{ + return (0 < server_index) && (server_index <= MAX_SERVER_COUNT); +} + //check whether an tenant_id is valid OB_INLINE bool is_valid_tenant_id(const uint64_t tenant_id) { diff --git a/mittest/mtlenv/mock_tenant_module_env.h b/mittest/mtlenv/mock_tenant_module_env.h index efc6a1c7f..9842db3ea 100644 --- a/mittest/mtlenv/mock_tenant_module_env.h +++ b/mittest/mtlenv/mock_tenant_module_env.h @@ -635,7 +635,7 @@ void MockTenantModuleEnv::init_gctx_gconf() GCTX.session_mgr_ = &session_mgr_; GCTX.scramble_rand_ = &scramble_rand_; GCTX.locality_manager_ = &locality_manager_; - GCTX.server_id_ = 1; + (void) GCTX.set_server_id(1); GCTX.rs_rpc_proxy_ = &rs_rpc_proxy_; GCTX.srv_rpc_proxy_ = &srv_rpc_proxy_; GCTX.rs_mgr_ = &rs_mgr_; diff --git a/src/observer/mysql/obmp_base.cpp b/src/observer/mysql/obmp_base.cpp index 58397a763..9cb528441 100644 --- a/src/observer/mysql/obmp_base.cpp +++ b/src/observer/mysql/obmp_base.cpp @@ -324,7 +324,7 @@ int ObMPBase::free_session() LOG_INFO("free session successfully", K(ctx)); conn->is_sess_free_ = true; if (OB_UNLIKELY(OB_FAIL(sql::ObSQLSessionMgr::is_need_clear_sessid(conn, is_need_clear)))) { - LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn->sessid_, "server_id", GCTX.server_id_); + LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn->sessid_); } else if (is_need_clear) { if (OB_FAIL(GCTX.session_mgr_->mark_sessid_unused(conn->sessid_))) { LOG_WARN("mark session id unused failed", K(ret), "sessid", conn->sessid_); diff --git a/src/observer/mysql/obmp_disconnect.cpp b/src/observer/mysql/obmp_disconnect.cpp index 2a005b8f1..5cac0dada 100644 --- a/src/observer/mysql/obmp_disconnect.cpp +++ b/src/observer/mysql/obmp_disconnect.cpp @@ -81,7 +81,7 @@ int ObMPDisconnect::run() EVENT_INC(SQL_USER_LOGOUTS_CUMULATIVE); LOG_INFO("free session successfully", "sessid", ctx_.sessid_); if (OB_UNLIKELY(OB_FAIL(sql::ObSQLSessionMgr::is_need_clear_sessid(&conn, is_need_clear)))) { - LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn.sessid_, "server_id", GCTX.server_id_); + LOG_ERROR("fail to judge need clear", K(ret), "sessid", conn.sessid_); } else if (is_need_clear) { if (OB_FAIL(GCTX.session_mgr_->mark_sessid_unused(conn.sessid_))) { LOG_WARN("mark session id unused failed", K(ret), "sessid", conn.sessid_); diff --git a/src/observer/mysql/obsm_conn_callback.cpp b/src/observer/mysql/obsm_conn_callback.cpp index 171ceabbe..5821aceaf 100644 --- a/src/observer/mysql/obsm_conn_callback.cpp +++ b/src/observer/mysql/obsm_conn_callback.cpp @@ -218,7 +218,7 @@ void ObSMConnectionCallback::destroy(ObSMConnection& conn) } else if (is_need_clear) { if (OB_FAIL(GCTX.session_mgr_->mark_sessid_unused(conn.sessid_))) { LOG_ERROR("fail to mark sessid unused", K(ret), K(conn.sessid_), - "proxy_sessid", conn.proxy_sessid_, "server_id", GCTX.server_id_); + "proxy_sessid", conn.proxy_sessid_); } else { LOG_INFO("mark session id unused", K(conn.sessid_)); } @@ -231,7 +231,6 @@ void ObSMConnectionCallback::destroy(ObSMConnection& conn) "sessid", conn.sessid_, "proxy_sessid", conn.proxy_sessid_, "tenant_id", conn.tenant_id_, - "server_id", GCTX.server_id_, "from_proxy", conn.is_proxy_, "from_java_client", conn.is_java_client_, "c/s protocol", get_cs_protocol_type_name(conn.get_cs_protocol_type()), @@ -264,8 +263,7 @@ int ObSMConnectionCallback::on_disconnect(observer::ObSMConnection& conn) sess_info->set_shadow(true); } } - LOG_INFO("kill and revert session", K(conn.sessid_), - "proxy_sessid", conn.proxy_sessid_, "server_id", GCTX.server_id_, K(ret)); + LOG_INFO("kill and revert session", K(conn.sessid_), "proxy_sessid", conn.proxy_sessid_, K(ret)); return ret; } diff --git a/src/observer/mysql/obsm_handler.cpp b/src/observer/mysql/obsm_handler.cpp index 131870175..43e044e93 100644 --- a/src/observer/mysql/obsm_handler.cpp +++ b/src/observer/mysql/obsm_handler.cpp @@ -168,8 +168,7 @@ int ObSMHandler::on_disconnect(easy_connection_t *c) } } LOG_INFO("kill and revert session", K(conn->sessid_), - "proxy_sessid", conn->proxy_sessid_, "server_id", GCTX.server_id_, - K(tmp_ret), K(eret)); + "proxy_sessid", conn->proxy_sessid_, K(tmp_ret), K(eret)); } return eret; } @@ -245,10 +244,9 @@ int ObSMHandler::on_close(easy_connection_t *c) } else if (is_need_clear) { if (OB_UNLIKELY(OB_FAIL(gctx_.session_mgr_->mark_sessid_unused(conn->sessid_)))) { LOG_ERROR("fail to mark sessid unused", K(ret), K(conn->sessid_), - "proxy_sessid", conn->proxy_sessid_, "server_id", GCTX.server_id_); + "proxy_sessid", conn->proxy_sessid_); } else { - LOG_INFO("mark sessid unused", K(conn->sessid_), - "proxy_sessid", conn->proxy_sessid_, "server_id", GCTX.server_id_); + LOG_INFO("mark sessid unused", K(conn->sessid_), "proxy_sessid", conn->proxy_sessid_); } } else {/*do nothing*/} } @@ -267,7 +265,6 @@ int ObSMHandler::on_close(easy_connection_t *c) "sessid", conn->sessid_, "proxy_sessid", conn->proxy_sessid_, "tenant_id", conn->tenant_id_, - "server_id", gctx_.server_id_, "from_proxy", conn->is_proxy_, "from_java_client", conn->is_java_client_, "c/s protocol", get_cs_protocol_type_name(conn->get_cs_protocol_type()), diff --git a/src/observer/ob_heartbeat.cpp b/src/observer/ob_heartbeat.cpp index 86b05db29..29845f390 100644 --- a/src/observer/ob_heartbeat.cpp +++ b/src/observer/ob_heartbeat.cpp @@ -194,13 +194,13 @@ void ObHeartBeatProcess::check_and_update_server_id_(const uint64_t server_id) // in upgrade period 4.1 -> 4.2, we need to persist the server_id via heartbeat const int64_t delay = 0; const bool repeat = false; - if (0 == GCTX.server_id_) { - GCTX.server_id_ = server_id; + if (0 == GCTX.get_server_id()) { + (void) GCTX.set_server_id(server_id); LOG_INFO("receive new server id in GCTX", K(server_id)); - } else if (server_id != GCTX.server_id_) { + } else if (server_id != GCTX.get_server_id()) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("GCTX.server_id_ is not the same as server_id in RS", KR(ret), - K(GCTX.server_id_), K(server_id)); + LOG_ERROR("GCTX.get_server_id() is not the same as server_id in RS", KR(ret), + K(GCTX.get_server_id()), K(server_id)); } if (OB_FAIL(ret)) { } else if (0 == GCONF.observer_id) { diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 241e4b694..cc25fd467 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -2987,9 +2987,9 @@ int ObServer::init_global_context() gctx_.startup_accel_handler_ = &startup_accel_handler_; gctx_.flashback_scn_ = opts_.flashback_scn_; - gctx_.server_id_ = config_.observer_id; - if (is_valid_server_id(gctx_.server_id_)) { - LOG_INFO("this observer has had a valid server_id", K(gctx_.server_id_)); + (void) gctx_.set_server_id(config_.observer_id); + if (is_valid_server_id(gctx_.get_server_id())) { + LOG_INFO("this observer has had a valid server_id", K(gctx_.get_server_id())); } if ((PHY_FLASHBACK_MODE == gctx_.startup_mode_ || PHY_FLASHBACK_VERIFY_MODE == gctx_.startup_mode_) && 0 >= gctx_.flashback_scn_) { diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 6fd8e6ccc..e8155a8f3 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -1692,13 +1692,13 @@ int ObService::set_server_id_(const int64_t server_id) if (OB_UNLIKELY(!is_valid_server_id(server_id))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid server_id", KR(ret), K(server_id)); - } else if (is_valid_server_id(GCTX.server_id_) || is_valid_server_id(GCONF.observer_id)) { + } else if (is_valid_server_id(GCTX.get_server_id()) || is_valid_server_id(GCONF.observer_id)) { ret = OB_ERR_UNEXPECTED; uint64_t server_id_in_gconf = GCONF.observer_id; LOG_WARN("server_id is only expected to be set once", KR(ret), - K(server_id), K(GCTX.server_id_), K(server_id_in_gconf)); + K(server_id), K(GCTX.get_server_id()), K(server_id_in_gconf)); } else { - GCTX.server_id_ = server_id; + (void) GCTX.set_server_id(server_id); GCONF.observer_id = server_id; if (OB_ISNULL(GCTX.config_mgr_)) { ret = OB_ERR_UNEXPECTED; @@ -1822,10 +1822,10 @@ int ObService::prepare_server_for_adding_server( // If adding server during bootstrap, server is expected to be not empty. // Just check this server_id same to the server_id set before. const uint64_t server_id_in_GCONF = GCONF.observer_id; - if (server_id != GCTX.server_id_ || server_id != server_id_in_GCONF) { + if (server_id != GCTX.get_server_id() || server_id != server_id_in_GCONF) { ret = OB_ERR_UNEXPECTED; LOG_WARN("server_id not same to that set before.", KR(ret), - "server_id_for_adding_server", server_id, K(GCTX.server_id_), K(server_id_in_GCONF)); + "server_id_for_adding_server", server_id, K(GCTX.get_server_id()), K(server_id_in_GCONF)); } else { server_empty = false; } @@ -2275,9 +2275,9 @@ int ObService::check_server_empty(bool &is_empty) } else { uint64_t server_id_in_GCONF = GCONF.observer_id; if (is_empty) { - if (is_valid_server_id(GCTX.server_id_) || is_valid_server_id(server_id_in_GCONF)) { + if (is_valid_server_id(GCTX.get_server_id()) || is_valid_server_id(server_id_in_GCONF)) { is_empty = false; - FLOG_WARN("[CHECK_SERVER_EMPTY] server_id exists", K(GCTX.server_id_), K(server_id_in_GCONF)); + FLOG_WARN("[CHECK_SERVER_EMPTY] server_id exists", K(GCTX.get_server_id()), K(server_id_in_GCONF)); } } if (is_empty) { diff --git a/src/rootserver/ob_server_zone_op_service.cpp b/src/rootserver/ob_server_zone_op_service.cpp index fff611985..8305c29e5 100644 --- a/src/rootserver/ob_server_zone_op_service.cpp +++ b/src/rootserver/ob_server_zone_op_service.cpp @@ -739,6 +739,7 @@ int ObServerZoneOpService::add_server_( bool is_active = false; const int64_t now = ObTimeUtility::current_time(); ObServerInfoInTable server_info_in_table; + ObArray server_id_in_cluster; ObMySQLTransaction trans; DEBUG_SYNC(BEFORE_ADD_SERVER_TRANS); if (OB_UNLIKELY(!is_inited_)) { @@ -781,7 +782,13 @@ int ObServerZoneOpService::add_server_( ret = OB_ENTRY_EXIST; LOG_WARN("server exists", KR(ret), K(server_info_in_table)); } - if (FAILEDx(server_info_in_table.init( + if (FAILEDx(ObServerTableOperator::get_clusters_server_id(trans, server_id_in_cluster))) { + LOG_WARN("fail to get servers' id in the cluster", KR(ret)); + } else if (OB_UNLIKELY(!check_server_index_(server_id, server_id_in_cluster))) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("server index is outdated due to concurrent operations", KR(ret), K(server_id), K(server_id_in_cluster)); + LOG_USER_ERROR(OB_OP_NOT_ALLOW, "server index is outdated due to concurrent operations, ADD_SERVER is"); + } else if (OB_FAIL(server_info_in_table.init( server, server_id, zone, @@ -985,26 +992,68 @@ int ObServerZoneOpService::construct_rs_list_arg(ObRsListArg &rs_list_arg) int ObServerZoneOpService::fetch_new_server_id_(uint64_t &server_id) { int ret = OB_SUCCESS; + ObArray server_id_in_cluster; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret), K(is_inited_)); } else if (OB_ISNULL(sql_proxy_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid sql proxy", KR(ret), KP(sql_proxy_)); + } else if (OB_FAIL(ObServerTableOperator::get_clusters_server_id(*sql_proxy_, server_id_in_cluster))) { + LOG_WARN("fail to get server_ids in the cluster", KR(ret), KP(sql_proxy_)); + } else if (OB_UNLIKELY(server_id_in_cluster.count() >= MAX_SERVER_COUNT)) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("server count reaches the limit", KR(ret), K(server_id_in_cluster.count())); + LOG_USER_ERROR(OB_OP_NOT_ALLOW, "server count reaches the limit, ADD_SERVER is"); } else { - uint64_t new_max_id = OB_INVALID_ID; + uint64_t candidate_server_id = OB_INVALID_ID; ObMaxIdFetcher id_fetcher(*sql_proxy_); if (OB_FAIL(id_fetcher.fetch_new_max_id( OB_SYS_TENANT_ID, OB_MAX_USED_SERVER_ID_TYPE, - new_max_id))) { + candidate_server_id))) { LOG_WARN("fetch_new_max_id failed", KR(ret)); } else { - server_id = new_max_id; + uint64_t new_candidate_server_id = candidate_server_id; + while (!check_server_index_(new_candidate_server_id, server_id_in_cluster)) { + if (new_candidate_server_id % 10 == 0) { + LOG_INFO("[FETCH NEW SERVER ID] periodical log", K(new_candidate_server_id), K(server_id_in_cluster)); + } + ++new_candidate_server_id; + } + if (new_candidate_server_id != candidate_server_id + && OB_FAIL(id_fetcher.update_server_max_id(candidate_server_id, new_candidate_server_id))) { + LOG_WARN("fail to update server max id", KR(ret), K(candidate_server_id), K(new_candidate_server_id), + K(server_id_in_cluster)); + } + if (OB_SUCC(ret)) { + server_id = new_candidate_server_id; + LOG_INFO("[FETCH NEW SERVER ID] new candidate server id", K(server_id), K(server_id_in_cluster)); + } } } return ret; } +bool ObServerZoneOpService::check_server_index_( + const uint64_t candidate_server_id, + const common::ObIArray &server_id_in_cluster) const +{ + // server_index = server_id % 4096 + // server_index cannot be zero and must be unique in the cluster + bool is_good_candidate = true; + const uint64_t candidate_index = ObShareUtil::compute_server_index(candidate_server_id); + if (0 == candidate_index) { + is_good_candidate = false; + } else { + for (int64_t i = 0; i < server_id_in_cluster.count() && is_good_candidate; ++i) { + const uint64_t server_index = ObShareUtil::compute_server_index(server_id_in_cluster.at(i)); + if (candidate_index == server_index) { + is_good_candidate = false; + } + } + } + return is_good_candidate; +} int ObServerZoneOpService::check_server_have_enough_resource_for_delete_server_( const ObIArray &servers, const ObZone &zone) diff --git a/src/rootserver/ob_server_zone_op_service.h b/src/rootserver/ob_server_zone_op_service.h index 52de104f1..17ead6406 100644 --- a/src/rootserver/ob_server_zone_op_service.h +++ b/src/rootserver/ob_server_zone_op_service.h @@ -205,6 +205,9 @@ private: const obrpc::ObAdminServerArg::AdminServerOp &op); int check_and_update_service_epoch_(common::ObMySQLTransaction &trans); int fetch_new_server_id_(uint64_t &server_id); + bool check_server_index_( + const uint64_t candidate_server_id, + const common::ObIArray &server_id_in_cluster) const; int check_server_have_enough_resource_for_delete_server_( const ObIArray &servers, const common::ObZone &zone); diff --git a/src/rootserver/ob_system_admin_util.cpp b/src/rootserver/ob_system_admin_util.cpp index 6d04c57dd..a0e24b1ad 100644 --- a/src/rootserver/ob_system_admin_util.cpp +++ b/src/rootserver/ob_system_admin_util.cpp @@ -1753,6 +1753,7 @@ int ObAdminUpgradeCmd::execute(const Bool &upgrade) int ObAdminRollingUpgradeCmd::execute(const obrpc::ObAdminRollingUpgradeArg &arg) { int ret = OB_SUCCESS; + uint64_t max_server_id = 0; HEAP_VAR(ObAdminSetConfigItem, item) { obrpc::ObAdminSetConfigArg set_config_arg; set_config_arg.is_inner_ = true; @@ -1808,6 +1809,14 @@ int ObAdminRollingUpgradeCmd::execute(const obrpc::ObAdminRollingUpgradeArg &arg } } // end while } + if (OB_FAIL(ret) || GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_4_0) { + } else if (OB_FAIL(ObServerTableOperator::get_clusters_max_server_id(max_server_id))) { + LOG_WARN("fail to get max server id", KR(ret)); + } else if (OB_UNLIKELY(!is_valid_server_index(max_server_id))) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("max_server_id should be a valid server index", KR(ret), K(max_server_id)); + LOG_USER_ERROR(OB_OP_NOT_ALLOW, "max server id in the cluster cannot be larget than MAX_SERVER_COUNT, UPGRADE is"); + } // end rolling upgrade, should raise min_observer_version const char *min_obs_version_name = "min_observer_version"; if (FAILEDx(item.name_.assign(min_obs_version_name))) { diff --git a/src/share/detect/ob_detect_manager.cpp b/src/share/detect/ob_detect_manager.cpp index 64c48e25d..93a3d900e 100644 --- a/src/share/detect/ob_detect_manager.cpp +++ b/src/share/detect/ob_detect_manager.cpp @@ -34,18 +34,18 @@ ObDetectableIdGen &ObDetectableIdGen::instance() int ObDetectableIdGen::generate_detectable_id(ObDetectableId &detectable_id, uint64_t tenant_id) { int ret = OB_SUCCESS; - // use server id to ensure that the detectable_id is unique in cluster - uint64_t server_id = GCTX.server_id_; - if (!is_valid_server_id(server_id)) { + // use server index to ensure that the detectable_id is unique in cluster + uint64_t server_index = GCTX.get_server_index(); + if (OB_UNLIKELY(!is_valid_server_index(server_index))) { ret = OB_SERVER_IS_INIT; - LIB_LOG(WARN, "[DM] server id is invalid"); + LIB_LOG(WARN, "[DM] server index is invalid", K(server_index)); } else { detectable_id.first_ = get_detect_sequence_id(); // use timestamp to ensure that the detectable_id is unique during the process uint64_t timestamp = ObTimeUtility::current_time(); - // [ server_id (16bits) ][ timestamp (32bits) ] + // [ server_index (16bits) ][ timestamp (32bits) ] // only if qps > 2^48 or same server reboots after 2^48 the detectable_id be repeated - detectable_id.second_ = (server_id) << 48 | (timestamp & 0x0000FFFFFFFFFFFF); + detectable_id.second_ = (server_index) << 48 | (timestamp & 0x0000FFFFFFFFFFFF); detectable_id.tenant_id_ = tenant_id; } return ret; diff --git a/src/share/ob_heartbeat_handler.cpp b/src/share/ob_heartbeat_handler.cpp index af3ca65fc..672066b07 100644 --- a/src/share/ob_heartbeat_handler.cpp +++ b/src/share/ob_heartbeat_handler.cpp @@ -119,8 +119,8 @@ int ObHeartbeatHandler::handle_heartbeat( } else { // const uint64_t server_id = hb_request.get_server_id(); const share::RSServerStatus rs_server_status = hb_request.get_rs_server_status(); - // if (GCTX.server_id_ != server_id) { - // LOG_INFO("receive new server id", "old server_id_", GCTX.server_id_, "new server_id_", server_id); + // if (GCTX.get_server_id() != server_id) { + // LOG_INFO("receive new server id", "old server_id_", GCTX.get_server_id(), "new server_id_", server_id); // GCTX.server_id_ = server_id; // } if (GCTX.rs_server_status_ != rs_server_status) { @@ -168,7 +168,7 @@ int ObHeartbeatHandler::init_hb_response_(share::ObHBResponse &hb_response) share::ObServerInfoInTable::ObBuildVersion build_version; common::ObZone zone; int64_t test_id = ERRSIM_DISK_ERROR ? 2 : OB_INVALID_ID; - if (test_id == GCTX.server_id_) { + if (test_id == GCTX.get_server_id()) { server_health_status.reset(); server_health_status.init(ObServerHealthStatus::DATA_DISK_STATUS_ERROR); } diff --git a/src/share/ob_max_id_fetcher.cpp b/src/share/ob_max_id_fetcher.cpp index ffb82b219..38c7efa2b 100755 --- a/src/share/ob_max_id_fetcher.cpp +++ b/src/share/ob_max_id_fetcher.cpp @@ -422,6 +422,34 @@ int ObMaxIdFetcher::fetch_new_max_id(const uint64_t tenant_id, return ret; } +int ObMaxIdFetcher::update_server_max_id(const uint64_t max_server_id, const uint64_t next_max_server_id) +{ + int ret = OB_SUCCESS; + uint64_t fetched_max_server_id = OB_INVALID_ID; + ObMySQLTransaction trans; + if (OB_FAIL(trans.start(&proxy_, OB_SYS_TENANT_ID, false))) { + LOG_WARN("fail to to start transaction", KR(ret)); + } else if (OB_FAIL(fetch_max_id(trans, OB_SYS_TENANT_ID, OB_MAX_USED_SERVER_ID_TYPE, fetched_max_server_id))) { + LOG_WARN("failed to get max id", KR(ret)); + } else if (OB_UNLIKELY(max_server_id != fetched_max_server_id)) { + ret = OB_NEED_RETRY; + LOG_WARN("max_server_id has been increased, please retry", KR(ret), K(max_server_id), K(fetched_max_server_id)); + } else if (OB_FAIL(update_max_id(trans, OB_SYS_TENANT_ID, OB_MAX_USED_SERVER_ID_TYPE, next_max_server_id))) { + LOG_WARN("failed to update max id", KR(ret), K(next_max_server_id)); + } + + if (trans.is_started()) { + const bool is_commit = (OB_SUCC(ret)); + int temp_ret = OB_SUCCESS; + if (OB_SUCCESS != (temp_ret = trans.end(is_commit))) { + LOG_WARN("failed to end trans", K(is_commit), K(temp_ret)); + ret = (OB_SUCCESS == ret) ? temp_ret : ret; + } + } + LOG_INFO("update server max id", KR(ret), K(fetched_max_server_id), K(max_server_id), K(next_max_server_id)); + return ret; +} + int ObMaxIdFetcher::update_max_id(ObISQLClient &sql_client, const uint64_t tenant_id, ObMaxIdType max_id_type, const uint64_t max_id) { diff --git a/src/share/ob_max_id_fetcher.h b/src/share/ob_max_id_fetcher.h index 9f0046335..fb0e4e9d2 100755 --- a/src/share/ob_max_id_fetcher.h +++ b/src/share/ob_max_id_fetcher.h @@ -94,6 +94,7 @@ public: int fetch_new_max_id(const uint64_t tenant_id, ObMaxIdType id_type, uint64_t &max_id, const uint64_t initial = UINT64_MAX, const int64_t size = 1); + int update_server_max_id(const uint64_t max_server_id, const uint64_t next_max_server_id); // For generate new tablet_ids int fetch_new_max_ids(const uint64_t tenant_id, ObMaxIdType id_type, uint64_t &max_id, uint64_t size); diff --git a/src/share/ob_server_struct.cpp b/src/share/ob_server_struct.cpp index 6538f49d0..b734e10bb 100644 --- a/src/share/ob_server_struct.cpp +++ b/src/share/ob_server_struct.cpp @@ -13,9 +13,11 @@ #define USING_LOG_PREFIX SERVER #include "ob_server_struct.h" #include "lib/thread_local/ob_tsi_factory.h" +#include "lib/ob_define.h" #include "share/schema/ob_schema_service.h" #include "share/ob_web_service_root_addr.h" #include "share/ob_lease_struct.h" +#include "common/ob_version_def.h" namespace oceanbase { namespace share @@ -45,6 +47,20 @@ share::ServerServiceStatus ObGlobalContext::get_server_service_status() const return static_cast(server_status); } +uint64_t ObGlobalContext::get_server_index() const +{ + uint64_t server_index = 0; + uint64_t server_id = ATOMIC_LOAD(&server_id_); + if (OB_UNLIKELY(!is_valid_server_id(server_id))) { + // return 0; + } else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_4_0) { + server_index = ObShareUtil::compute_server_index(server_id); + } else { + server_index = server_id; + } + return server_index; +} + DEF_TO_STRING(ObGlobalContext) { int64_t pos = 0; diff --git a/src/share/ob_server_struct.h b/src/share/ob_server_struct.h index 27daa2f1a..9fd359421 100644 --- a/src/share/ob_server_struct.h +++ b/src/share/ob_server_struct.h @@ -244,7 +244,6 @@ struct ObGlobalContext share::ObLocationService *location_service_; int64_t start_time_; int64_t *warm_up_start_time_; - uint64_t server_id_; ObServiceStatus status_; ObServerMode startup_mode_; share::RSServerStatus rs_server_status_; @@ -282,6 +281,25 @@ struct ObGlobalContext bool is_observer() const; common::ObClusterRole get_cluster_role() const; share::ServerServiceStatus get_server_service_status() const; + /* + Returns a globally unique, monotonically increasing server ID. + This ID is unique across the lifetime of the cluster and will not be reused. + */ + inline uint64_t get_server_id() const { return ATOMIC_LOAD(&server_id_); } + inline void set_server_id(const uint64_t id) { ATOMIC_SET(&server_id_, id); } + /* + Returns a currently unique server index within the cluster. + This index is unique among current servers in the cluster, but may be reused if a server is removed from the cluster. + + When server ID has size limitation, like only 12 bits are allocated for server ID in session_id + to ensure its uniqueness, which implies the server ID cannot be greater than or equal to 4096, + we should consider using server index instead of server ID. + However, using server index requires a guarantee that no remnants of an old server remain after it is deleted. + For example, no sessions from the deleted server should exist anymore. If remnants persist after server deletion, + using server index is not permitted, otherwise, correctness issues may arise. + In such cases, you need to carefully consider how to resolve this problem by yourself. + */ + uint64_t get_server_index() const; void set_upgrade_stage(obrpc::ObUpgradeStage upgrade_stage) { upgrade_stage_ = upgrade_stage; } obrpc::ObUpgradeStage get_upgrade_stage() { return upgrade_stage_; } DECLARE_TO_STRING; @@ -304,6 +322,7 @@ private: bool has_start_service() const { return 0 < start_service_time_; } obrpc::ObUpgradeStage upgrade_stage_; + uint64_t server_id_; }; } // end of namespace share diff --git a/src/share/ob_server_table_operator.cpp b/src/share/ob_server_table_operator.cpp index bb021e567..7ce45d31a 100644 --- a/src/share/ob_server_table_operator.cpp +++ b/src/share/ob_server_table_operator.cpp @@ -657,7 +657,7 @@ int ObServerTableOperator::get_start_service_time( ret = OB_ERR_UNEXPECTED; LOG_WARN("fail to get sql result", K(sql), K(ret)); } else if (OB_FAIL(result->next())) { - LOG_WARN("fail to get next", KR(ret), K(sql));; + LOG_WARN("fail to get next", KR(ret), K(sql)); } else { EXTRACT_INT_FIELD_MYSQL(*result, "start_service_time", start_service_time, int64_t); } @@ -679,6 +679,80 @@ int ObServerTableOperator::get( const bool ONLY_ACTIVE_SERVERS = false; return get_servers_info_of_zone(sql_proxy, empty_zone, ONLY_ACTIVE_SERVERS, all_servers_info_in_table); } + +int ObServerTableOperator::get_clusters_server_id(common::ObISQLClient &sql_proxy, ObIArray &server_id_in_cluster) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + ObTimeoutCtx ctx; + server_id_in_cluster.reset(); + if (OB_FAIL(ObRootUtils::get_rs_default_timeout_ctx(ctx))) { + LOG_WARN("fail to get timeout ctx", K(ret), K(ctx)); + } else if (OB_FAIL(sql.assign_fmt("SELECT id FROM %s", OB_ALL_SERVER_TNAME))) { + LOG_WARN("fail to append sql", K(ret)); + } else { + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObMySQLResult *result = NULL; + if (OB_FAIL(sql_proxy.read(res, OB_SYS_TENANT_ID, sql.ptr()))) { + LOG_WARN("fail to execute sql", K(sql), K(ret)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", K(sql), K(ret)); + } else { + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END != ret) { + LOG_WARN("result next failed", KR(ret)); + } else { + ret = OB_SUCCESS; + break; + } + } else { + uint64_t server_id = 0; + EXTRACT_INT_FIELD_MYSQL(*result, "id", server_id, uint64_t); + if (FAILEDx(server_id_in_cluster.push_back(server_id))) { + LOG_WARN("fail to push back", KR(ret), K(server_id), K(server_id_in_cluster)); + } + } + } + } + } + } + return ret; +} + +int ObServerTableOperator::get_clusters_max_server_id(uint64_t &max_server_id) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + ObTimeoutCtx ctx; + max_server_id = 0; + if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), KP(GCTX.sql_proxy_)); + } else if (OB_FAIL(ObRootUtils::get_rs_default_timeout_ctx(ctx))) { + LOG_WARN("fail to get timeout ctx", K(ret), K(ctx)); + } else if (OB_FAIL(sql.assign_fmt("SELECT max(id) AS max_id FROM %s", OB_ALL_SERVER_TNAME))) { + LOG_WARN("fail to append sql", K(ret)); + } else { + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObMySQLResult *result = NULL; + if (OB_FAIL(GCTX.sql_proxy_->read(res, OB_SYS_TENANT_ID, sql.ptr()))) { + LOG_WARN("fail to execute sql", K(sql), K(ret)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", K(sql), K(ret)); + } else if (OB_FAIL(result->next())) { + LOG_WARN("fail to get next", KR(ret), K(sql)); + } else { + EXTRACT_INT_FIELD_MYSQL(*result, "max_id", max_server_id, uint64_t); + } + } + } + return ret; +} + + int ObServerTableOperator::get_servers_info_of_zone( common::ObISQLClient &sql_proxy, const ObZone &zone, @@ -738,7 +812,7 @@ int ObServerTableOperator::get( if (OB_ITER_END == ret) { ret = OB_SERVER_NOT_IN_WHITE_LIST; } - LOG_WARN("fail to get next", KR(ret), K(sql));; + LOG_WARN("fail to get next", KR(ret), K(sql)); } else if (OB_FAIL(build_server_status(*result, server_status))) { LOG_WARN("fail to build server_status",KR(ret)); } else if (OB_FAIL(server_info_in_table.build_server_info_in_table(server_status))) { diff --git a/src/share/ob_server_table_operator.h b/src/share/ob_server_table_operator.h index f8f677f75..2cb94e809 100644 --- a/src/share/ob_server_table_operator.h +++ b/src/share/ob_server_table_operator.h @@ -167,6 +167,10 @@ public: common::ObISQLClient &sql_proxy, const common::ObAddr &server, ObServerInfoInTable &server_info_in_table); + // get all server's id in the cluster + static int get_clusters_server_id(common::ObISQLClient &sql_proxy, ObIArray &server_id_in_cluster); + // get the max server_id in the cluster + static int get_clusters_max_server_id(uint64_t &max_server_id); // insert the new server's info into __all_server table, // it is only called when we want to add a new server into clusters // diff --git a/src/share/ob_share_util.h b/src/share/ob_share_util.h index 893d8fd6a..52982b94f 100644 --- a/src/share/ob_share_util.h +++ b/src/share/ob_share_util.h @@ -158,6 +158,9 @@ public: static const char *replica_type_to_string(const ObReplicaType type); static ObReplicaType string_to_replica_type(const char *str); static ObReplicaType string_to_replica_type(const ObString &str); + static inline uint64_t compute_server_index(uint64_t server_id) { + return server_id % (MAX_SERVER_COUNT + 1); + } private: static int check_compat_data_version_( const uint64_t required_data_version, diff --git a/src/share/ob_unit_table_operator.cpp b/src/share/ob_unit_table_operator.cpp index 5560b5243..7d415c7ce 100644 --- a/src/share/ob_unit_table_operator.cpp +++ b/src/share/ob_unit_table_operator.cpp @@ -169,7 +169,7 @@ int ObUnitTableOperator::check_server_empty(const common::ObAddr &server, bool & LOG_WARN("fail to get units", KR(ret), K(server)); } else if (units.count() > 0) { is_empty = false; - LOG_DEBUG("server exists in the server list or migrate_from_server list", K(server), K(units)); + LOG_TRACE("server exists in the server list or migrate_from_server list", K(server), K(units)); } return ret; } diff --git a/src/sql/dtl/ob_dtl_channel.h b/src/sql/dtl/ob_dtl_channel.h index bfc498e7c..db18432d8 100644 --- a/src/sql/dtl/ob_dtl_channel.h +++ b/src/sql/dtl/ob_dtl_channel.h @@ -342,16 +342,16 @@ OB_INLINE uint64_t ObDtlChannel::generate_id(uint64_t ch_cnt) // int64_t start_id = (common::ObTimeUtility::current_time() / 1000000) << 20; static volatile uint64_t sequence = start_id; - const uint64_t svr_id = GCTX.server_id_; + const uint64_t server_index = GCTX.get_server_index(); uint64_t ch_id = -1; if (1 < ch_cnt) { uint64_t org_ch_id = 0; do { - org_ch_id = (sequence & 0x0000FFFFFFFFFFFF) | (svr_id << 48); - ch_id = ((ATOMIC_AAF(&sequence, ch_cnt) & 0x0000FFFFFFFFFFFF) | (svr_id << 48)); + org_ch_id = (sequence & 0x0000FFFFFFFFFFFF) | (server_index << 48); + ch_id = ((ATOMIC_AAF(&sequence, ch_cnt) & 0x0000FFFFFFFFFFFF) | (server_index << 48)); } while (ch_id < org_ch_id); } else { - ch_id = ((ATOMIC_AAF(&sequence, 1) & 0x0000FFFFFFFFFFFF) | (svr_id << 48)); + ch_id = ((ATOMIC_AAF(&sequence, 1) & 0x0000FFFFFFFFFFFF) | (server_index << 48)); } return ch_id; } diff --git a/src/sql/engine/expr/ob_expr_sys_context.cpp b/src/sql/engine/expr/ob_expr_sys_context.cpp index 174664acf..ea5276495 100644 --- a/src/sql/engine/expr/ob_expr_sys_context.cpp +++ b/src/sql/engine/expr/ob_expr_sys_context.cpp @@ -460,7 +460,7 @@ int ObExprSysContext::eval_instance(const ObExpr &expr, common::ObDatum &res, int ret = OB_SUCCESS; UNUSED(arg1); UNUSED(arg2); - uint64_t instance_id = GCTX.server_id_; + uint64_t instance_id = GCTX.get_server_id(); //OZ(uint_string(expr, ctx, instance_id, res)); char out_id[256]; sprintf(out_id, "%lu", instance_id); diff --git a/src/sql/engine/expr/ob_expr_userenv.cpp b/src/sql/engine/expr/ob_expr_userenv.cpp index 68abaed1b..39107e6b5 100644 --- a/src/sql/engine/expr/ob_expr_userenv.cpp +++ b/src/sql/engine/expr/ob_expr_userenv.cpp @@ -259,7 +259,7 @@ int ObExprUserEnv::eval_instance(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &re { int ret = OB_SUCCESS; UNUSED(expr); - uint64_t instance_id = GCTX.server_id_; + uint64_t instance_id = GCTX.get_server_id(); number::ObNumber res_nmb; ObEvalCtx::TempAllocGuard alloc_guard(ctx); ObIAllocator &calc_alloc = alloc_guard.get_allocator(); diff --git a/src/sql/engine/expr/ob_expr_uuid_short.cpp b/src/sql/engine/expr/ob_expr_uuid_short.cpp index d7316ba30..c58b4e078 100644 --- a/src/sql/engine/expr/ob_expr_uuid_short.cpp +++ b/src/sql/engine/expr/ob_expr_uuid_short.cpp @@ -42,7 +42,7 @@ uint64_t ObExprUuidShort::generate_uuid_short() // uuid_short // | <8> | <32> | <24> // server_id server_start_time incremented_variable - static volatile uint64_t server_id_and_server_startup_time = ((GCTX.server_id_ & 255) << 56) | + static volatile uint64_t server_id_and_server_startup_time = ((GCTX.get_server_id() & 255) << 56) | ((static_cast(common::ObTimeUtility::current_time() / 1000000) << 24) & ((static_cast(1) << 56) - 1)); uint64_t uuid_short = ATOMIC_AAF(&server_id_and_server_startup_time, 1); diff --git a/src/sql/engine/px/ob_dfo_mgr.cpp b/src/sql/engine/px/ob_dfo_mgr.cpp index 662537053..6630dd427 100644 --- a/src/sql/engine/px/ob_dfo_mgr.cpp +++ b/src/sql/engine/px/ob_dfo_mgr.cpp @@ -699,7 +699,7 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx, dfo->get_interrupt_id()))) { LOG_WARN("fail gen dfo int id", K(ret)); } else { - dfo->set_qc_server_id(GCTX.server_id_); + dfo->set_qc_server_id(GCTX.get_server_index()); dfo->set_parent_dfo_id(parent_dfo->get_dfo_id()); LOG_TRACE("cur dfo dop", "dfo_id", dfo->get_dfo_id(), diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index e0f32a40b..639570a80 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -166,7 +166,7 @@ int ObPxCoordOp::init_dfc(ObDfo &dfo, dtl::ObDtlChTotalInfo *ch_info) K(dfo.get_qc_id()), K(dfo.get_dfo_id())); } else { ObDtlDfoKey dfo_key; - dfo_key.set(GCTX.server_id_, dfo.get_px_sequence_id(), dfo.get_qc_id(), dfo.get_dfo_id()); + dfo_key.set(GCTX.get_server_index(), dfo.get_px_sequence_id(), dfo.get_qc_id(), dfo.get_dfo_id()); dfc_.set_timeout_ts(phy_plan_ctx->get_timeout_timestamp()); dfc_.set_receive(); dfc_.set_qc_coord(); @@ -238,6 +238,7 @@ int ObPxCoordOp::inner_rescan() int ObPxCoordOp::rescan() { int ret = OB_SUCCESS; + const uint64_t server_index = GCTX.get_server_index(); if (NULL == coord_info_.batch_rescan_ctl_ || batch_rescan_param_version_ != coord_info_.batch_rescan_ctl_->param_version_) { ObDfo *root_dfo = NULL; @@ -274,12 +275,12 @@ int ObPxCoordOp::rescan() LOG_WARN("fail to register detectable_id", K(ret)); } else if (OB_FAIL(init_dfo_mgr( ObDfoInterruptIdGen(interrupt_id_, - (uint32_t)GCTX.server_id_, + (uint32_t)server_index, (uint32_t)MY_SPEC.qc_id_, px_sequence_id_), coord_info_.dfo_mgr_))) { LOG_WARN("fail parse dfo tree", - "server_id", GCTX.server_id_, + "server_index", server_index, "qc_id", MY_SPEC.qc_id_, "execution_id", ctx_.get_my_session()->get_current_execution_id(), K(ret)); @@ -309,6 +310,7 @@ int ObPxCoordOp::rescan() int ObPxCoordOp::inner_open() { int ret = OB_SUCCESS; + const uint64_t server_index = GCTX.get_server_index(); ObDfo *root_dfo = NULL; ObString cur_query_str = ctx_.get_my_session()->get_current_query_string(); char *buf = reinterpret_cast(ctx_.get_allocator().alloc(cur_query_str.length() + 1)); @@ -321,9 +323,9 @@ int ObPxCoordOp::inner_open() } if (OB_FAIL(ret)) { } else if (OB_FAIL(ObPxReceiveOp::inner_open())) { - } else if (!is_valid_server_id(GCTX.server_id_)) { + } else if (OB_UNLIKELY(!is_valid_server_index(server_index))) { ret = OB_SERVER_IS_INIT; - LOG_WARN("Server is initializing", K(ret), K(GCTX.server_id_)); + LOG_WARN("Server is initializing", K(ret), K(server_index)); } else if (OB_FAIL(post_init_op_ctx())) { LOG_WARN("init operator context failed", K(ret)); } else if (OB_FAIL(coord_info_.init())) { @@ -336,12 +338,12 @@ int ObPxCoordOp::inner_open() LOG_WARN("fail to register detectable_id", K(ret)); } else if (OB_FAIL(init_dfo_mgr( ObDfoInterruptIdGen(interrupt_id_, - (uint32_t)GCTX.server_id_, + (uint32_t)server_index, (uint32_t)(static_cast(&get_spec()))->qc_id_, px_sequence_id_), coord_info_.dfo_mgr_))) { LOG_WARN("fail parse dfo tree", - "server_id", GCTX.server_id_, + "server_index", server_index, "qc_id", (static_cast(&get_spec()))->qc_id_, "execution_id", ctx_.get_my_session()->get_current_execution_id(), K(ret)); @@ -890,7 +892,7 @@ int ObPxCoordOp::register_interrupt() { int ret = OB_SUCCESS; px_sequence_id_ = GCTX.sql_engine_->get_px_sequence_id(); - ObInterruptUtil::generate_query_interrupt_id((uint32_t)GCTX.server_id_, + ObInterruptUtil::generate_query_interrupt_id((uint32_t)GCTX.get_server_index(), px_sequence_id_, interrupt_id_); if (OB_FAIL(SET_INTERRUPTABLE(interrupt_id_))) { diff --git a/src/sql/engine/px/ob_px_interruption.h b/src/sql/engine/px/ob_px_interruption.h index 992320201..3f0333cfe 100644 --- a/src/sql/engine/px/ob_px_interruption.h +++ b/src/sql/engine/px/ob_px_interruption.h @@ -78,10 +78,14 @@ public: static int interrupt_qc(ObPxSqcMeta &sqc, int code, ObExecContext *exec_ctx); static int interrupt_qc(ObPxTask &task, int code, ObExecContext *exec_ctx); // 将server_id、execution_id、qc_id共同组成中断id - static int generate_query_interrupt_id(const uint32_t server_id, + // Suggest using GCTX.get_server_index() instead of GCTX.get_server_id(), + // as it guarantees uniqueness within the cluster and is constrained to a maximum value of MAX_SERVER_COUNT. + static int generate_query_interrupt_id(const uint32_t server_index, const uint64_t px_sequence_id, common::ObInterruptibleTaskID &interrupt_id); - static int generate_px_interrupt_id(const uint32_t server_id, + // Suggest using GCTX.get_server_index() instead of GCTX.get_server_id(), + // as it guarantees uniqueness within the cluster and is constrained to a maximum value of MAX_SERVER_COUNT. + static int generate_px_interrupt_id(const uint32_t server_index, const uint32_t qc_id, const uint64_t px_sequence_id, const int64_t dfo_id, diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index ad791a5ec..4e2539a58 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -607,8 +607,8 @@ int ObPxTenantTargetMonitorP::process() const uint64_t tenant_id = arg_.get_tenant_id(); const uint64_t follower_version = arg_.get_version(); // server id of the leader that the follower sync with previously. - const uint64_t prev_leader_server_id = ObPxTenantTargetMonitor::get_server_id(follower_version); - const uint64_t leader_server_id = GCTX.server_id_; + const uint64_t prev_leader_server_index = ObPxTenantTargetMonitor::get_server_index(follower_version); + const uint64_t leader_server_index = GCTX.get_server_index(); bool is_leader; uint64_t leader_version; result_.set_tenant_id(tenant_id); @@ -616,7 +616,7 @@ int ObPxTenantTargetMonitorP::process() LOG_ERROR("get is_leader failed", K(ret), K(tenant_id)); } else if (!is_leader) { result_.set_status(MONITOR_NOT_MASTER); - } else if (arg_.need_refresh_all_ || prev_leader_server_id != leader_server_id) { + } else if (arg_.need_refresh_all_ || prev_leader_server_index != leader_server_index) { if (OB_FAIL(OB_PX_TARGET_MGR.reset_leader_statistics(tenant_id))) { LOG_ERROR("reset leader statistics failed", K(ret)); } else if (OB_FAIL(OB_PX_TARGET_MGR.get_version(tenant_id, leader_version))) { @@ -625,7 +625,7 @@ int ObPxTenantTargetMonitorP::process() result_.set_status(MONITOR_VERSION_NOT_MATCH); result_.set_version(leader_version); LOG_INFO("need refresh all", K(tenant_id), K(arg_.need_refresh_all_), - K(follower_version), K(prev_leader_server_id), K(leader_server_id)); + K(follower_version), K(prev_leader_server_index), K(leader_server_index)); } } else if (OB_FAIL(OB_PX_TARGET_MGR.get_version(tenant_id, leader_version))) { LOG_WARN("get master_version failed", K(ret), K(tenant_id)); diff --git a/src/sql/engine/px/ob_px_tenant_target_monitor.cpp b/src/sql/engine/px/ob_px_tenant_target_monitor.cpp index 6f0dc4cd8..168c1f139 100644 --- a/src/sql/engine/px/ob_px_tenant_target_monitor.cpp +++ b/src/sql/engine/px/ob_px_tenant_target_monitor.cpp @@ -373,6 +373,7 @@ int ObPxTenantTargetMonitor::reset_follower_statistics(uint64_t version) int ObPxTenantTargetMonitor::reset_leader_statistics() { int ret = OB_SUCCESS; + const uint64_t server_index = GCTX.get_server_index(); // write lock before reset map and refresh version. SpinWLockGuard wlock_guard(spin_lock_); global_target_usage_.clear(); @@ -381,7 +382,7 @@ int ObPxTenantTargetMonitor::reset_leader_statistics() } else { version_ = get_new_version(); } - LOG_INFO("reset leader statistics", K(tenant_id_), K(ret), K(version_), K(GCTX.server_id_)); + LOG_INFO("reset leader statistics", K(tenant_id_), K(ret), K(version_), K(server_index)); return ret; } @@ -545,12 +546,12 @@ int ObPxTenantTargetMonitor::get_all_target_info(common::ObIArray> SERVER_ID_SHIFT); } diff --git a/src/sql/engine/px/ob_px_tenant_target_monitor.h b/src/sql/engine/px/ob_px_tenant_target_monitor.h index a567e0548..fbdbbce53 100644 --- a/src/sql/engine/px/ob_px_tenant_target_monitor.h +++ b/src/sql/engine/px/ob_px_tenant_target_monitor.h @@ -144,7 +144,7 @@ public: // for virtual_table iter int get_all_target_info(common::ObIArray &target_info_array); - static uint64_t get_server_id(uint64_t version); + static uint64_t get_server_index(uint64_t version); TO_STRING_KV(K_(is_init), K_(tenant_id), K_(server), K_(dummy_cache_leader), K_(role)); diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp index ef30b1810..13e905e06 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp @@ -249,14 +249,14 @@ int ObP2PDatahubManager::generate_p2p_dh_id(int64_t &p2p_dh_id) // generate p2p dh id // | <16> | <28> | 20 // server_id timestamp sequence - if (!is_valid_server_id(GCTX.server_id_)) { + const uint64_t server_index = GCTX.get_server_index(); + if (OB_UNLIKELY(!is_valid_server_index(server_index))) { ret = OB_SERVER_IS_INIT; - LOG_WARN("server id is unexpected", K(ret)); + LOG_WARN("server index is unexpected", KR(ret), K(server_index)); } else { - const uint64_t svr_id = GCTX.server_id_; int64_t ts = (common::ObTimeUtility::current_time() / 1000000) << 20; int64_t seq_id = ATOMIC_AAF(&p2p_dh_id_, 1); - p2p_dh_id = (ts & 0x0000FFFFFFFFFFFF) | (svr_id << 48) | seq_id; + p2p_dh_id = (ts & 0x0000FFFFFFFFFFFF) | (server_index << 48) | seq_id; } return ret; } diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index 03d73935f..94c1ded8c 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -5546,7 +5546,7 @@ int ObLogicalOperator::allocate_partition_join_filter(const ObIArrayset_tablet_id_expr(info.calc_part_id_expr_); OZ(join_filter_create->compute_property()); OZ(bf_info.init(get_plan()->get_optimizer_context().get_session_info()->get_effective_tenant_id(), - filter_id, GCTX.server_id_, + filter_id, GCTX.get_server_id(), join_filter_create->is_shared_join_filter(), info.skip_subpart_, join_filter_create->get_p2p_sequence_ids().at(0), diff --git a/src/sql/session/ob_sql_session_info.cpp b/src/sql/session/ob_sql_session_info.cpp index 493c05337..7cb88a619 100644 --- a/src/sql/session/ob_sql_session_info.cpp +++ b/src/sql/session/ob_sql_session_info.cpp @@ -3657,17 +3657,17 @@ int ObSysVarEncoder::display_sess_info(ObSQLSessionInfo &sess, const char* curre void ObSQLSessionInfo::gen_gtt_session_scope_unique_id() { static int64_t cur_ts = 0; - int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.server_id_); + int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.get_server_id()); gtt_session_scope_unique_id_ = next_ts; - LOG_DEBUG("check temporary table ssid session scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.server_id_), K(lbt())); + LOG_DEBUG("check temporary table ssid session scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.get_server_id()), K(lbt())); } void ObSQLSessionInfo::gen_gtt_trans_scope_unique_id() { static int64_t cur_ts = 0; - int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.server_id_); + int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.get_server_id()); gtt_trans_scope_unique_id_ = next_ts; - LOG_DEBUG("check temporary table ssid trans scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.server_id_), K(lbt())); + LOG_DEBUG("check temporary table ssid trans scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.get_server_id()), K(lbt())); } int ObAppInfoEncoder::serialize(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) diff --git a/src/sql/session/ob_sql_session_mgr.cpp b/src/sql/session/ob_sql_session_mgr.cpp index 277082f65..81296420f 100644 --- a/src/sql/session/ob_sql_session_mgr.cpp +++ b/src/sql/session/ob_sql_session_mgr.cpp @@ -364,10 +364,10 @@ void ObSQLSessionMgr::destroy() sess_hold_map_.destroy(); } -uint64_t ObSQLSessionMgr::extract_server_id(uint32_t sessid) +uint64_t ObSQLSessionMgr::extract_server_index(uint32_t sessid) { - uint64_t server_id = sessid >> LOCAL_SEQ_LEN; - return server_id & MAX_SERVER_ID; + uint64_t server_index = sessid >> LOCAL_SEQ_LEN; + return server_index & MAX_SERVER_INDEX; } int ObSQLSessionMgr::inc_session_ref(const ObSQLSessionInfo *my_session) @@ -393,7 +393,7 @@ int ObSQLSessionMgr::inc_session_ref(const ObSQLSessionInfo *my_session) // //MASK: 1 表示是server自己生成connection id, // 0 表示是proxy生成的connection id(已废弃,目前仅用于 in_mgr = false 的场景); -//Server Id: 集群中server的id由RS分配,集群内唯一; +//Server index: 集群中server的id由RS分配,集群内唯一,但在server删除后可能会被复用; //Local Seq: 一个server可用连接数,目前单台server最多有INT16_MAX个连接; // int ObSQLSessionMgr::create_sessid(uint32_t &sessid, bool in_mgr) @@ -401,29 +401,29 @@ int ObSQLSessionMgr::create_sessid(uint32_t &sessid, bool in_mgr) int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; sessid = 0; - const uint64_t server_id = GCTX.server_id_; + const uint64_t server_index = GCTX.get_server_index(); uint32_t local_seq = 0; static uint32_t abnormal_seq = 0;//用于server_id == 0是的sessid分配 - if (server_id > MAX_SERVER_ID) { + if (server_index > MAX_SERVER_INDEX) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("server id maybe invalid", K(ret), K(server_id)); + LOG_ERROR("server index maybe invalid", K(ret), K(server_index)); } else if (!in_mgr) { sessid = (GETTID() | LOCAL_SESSID_TAG); - sessid |= static_cast(server_id << LOCAL_SEQ_LEN); // set observer - } else if (0 == server_id) { + sessid |= static_cast(server_index << LOCAL_SEQ_LEN); // set observer + } else if (0 == server_index) { local_seq = (ATOMIC_FAA(&abnormal_seq, 1) & MAX_LOCAL_SEQ); uint32_t max_local_seq = MAX_LOCAL_SEQ; - uint32_t max_server_id = MAX_SERVER_ID; - LOG_WARN("server is initiating", K(server_id), K(local_seq), K(max_local_seq), K(max_server_id)); + uint32_t max_server_index = MAX_SERVER_INDEX; + LOG_WARN("server is initiating", K(server_index), K(local_seq), K(max_local_seq), K(max_server_index)); } else if (OB_UNLIKELY(OB_SUCCESS != (ret = tmp_ret = get_avaiable_local_seq(local_seq)))) { LOG_WARN("fail to get avaiable local_seq", K(local_seq)); } else {/*do nothing*/} if (OB_SUCC(ret) && in_mgr) { sessid = local_seq | SERVER_SESSID_TAG;// set observer sessid mark - sessid |= static_cast(server_id << LOCAL_SEQ_LEN); // set observer - // high bit is reserved for server id - sessid |= static_cast(1ULL << (LOCAL_SEQ_LEN + SERVER_ID_LEN)); + sessid |= static_cast(server_index << LOCAL_SEQ_LEN); // set observer + // high bit is reserved for server index + sessid |= static_cast(1ULL << (LOCAL_SEQ_LEN + SERVER_INDEX_LEN)); } return ret; } @@ -815,8 +815,8 @@ int ObSQLSessionMgr::mark_sessid_used(uint32_t sess_id) int ObSQLSessionMgr::mark_sessid_unused(uint32_t sess_id) { int ret = OB_SUCCESS; - uint64_t server_id = extract_server_id(sess_id); - if (server_id == 0) { + uint64_t server_index = extract_server_index(sess_id); + if (0 == server_index) { // 参考:create_sessid方法 // 由于server_id == 0时, 此时的local_seq,是由ATOMIC_FAA(&abnormal_seq, 1)产生, // 使用ATOMIC_FAA的原因无从考证(原作者的信息描述无任何具体信息),采取保守修改策略 @@ -955,8 +955,8 @@ int ObSQLSessionMgr::is_need_clear_sessid(const ObSMConnection *conn, bool &is_n LOG_WARN("unexpected parameter", K(conn)); } else if (is_server_sessid(conn->sessid_) && ObSMConnection::INITIAL_SESSID != conn->sessid_ - && 0 != extract_server_id(conn->sessid_) - && GCTX.server_id_ == extract_server_id(conn->sessid_) + && 0 != extract_server_index(conn->sessid_) + && GCTX.get_server_index() == extract_server_index(conn->sessid_) && conn->is_need_clear_sessid_) { is_need = true; } else {/*do nothing*/ } diff --git a/src/sql/session/ob_sql_session_mgr.h b/src/sql/session/ob_sql_session_mgr.h index 358c07c7e..34dde32eb 100644 --- a/src/sql/session/ob_sql_session_mgr.h +++ b/src/sql/session/ob_sql_session_mgr.h @@ -144,7 +144,7 @@ public: int get_min_active_snapshot_version(share::SCN &snapshot_version); //used for guarantee the unique sessid when observer generates sessid - static uint64_t extract_server_id(uint32_t sessid); + static uint64_t extract_server_index(uint32_t sessid); static bool is_server_sessid(uint32_t sessid) {return SERVER_SESSID_TAG & sessid;} static int is_need_clear_sessid(const observer::ObSMConnection *conn, bool &is_need); int fetch_first_sessid(); @@ -287,10 +287,11 @@ private: // |Mask|resvd| Server Id | Local Seq = 16 + 2 | // +----+------------------------------+--------------------------------+ static const uint16_t LOCAL_SEQ_LEN = 18; - static const uint16_t RESERVED_SERVER_ID_LEN = 1; - static const uint16_t SERVER_ID_LEN = 13 - RESERVED_SERVER_ID_LEN; + static const uint16_t RESERVED_SERVER_INDEX_LEN = 1; + static const uint16_t SERVER_INDEX_LEN = 13 - RESERVED_SERVER_INDEX_LEN; static const uint32_t MAX_LOCAL_SEQ = (1ULL << LOCAL_SEQ_LEN) - 1; - static const uint64_t MAX_SERVER_ID = (1ULL << SERVER_ID_LEN) - 1; + // MAX_SERVER_INDEX cannot be larger than MAX_SERVER_COUNT + static const uint64_t MAX_SERVER_INDEX = (1ULL << SERVER_INDEX_LEN) - 1; common::ObFixedQueue sessid_sequence_; uint32_t first_seq_; uint32_t increment_sessid_; diff --git a/src/storage/column_store/ob_co_merge_ctx.cpp b/src/storage/column_store/ob_co_merge_ctx.cpp index 102ba95e7..5215bd7a8 100644 --- a/src/storage/column_store/ob_co_merge_ctx.cpp +++ b/src/storage/column_store/ob_co_merge_ctx.cpp @@ -985,8 +985,8 @@ void ObCOTabletOutputMergeCtx::after_update_tablet_for_major() int tmp_ret = OB_SUCCESS; ObBasicObjHandle report_obj_hdl; - if (OB_TMP_FAIL(MTL_SVR_OBJ_MGR.get_obj_handle(GCTX.server_id_, report_obj_hdl))) { - LOG_WARN_RET(tmp_ret, "failed to get report obj handle", "cur_svr_id", GCTX.server_id_); + if (OB_TMP_FAIL(MTL_SVR_OBJ_MGR.get_obj_handle(GCTX.get_server_id(), report_obj_hdl))) { + LOG_WARN_RET(tmp_ret, "failed to get report obj handle", "cur_svr_id", GCTX.get_server_id()); } else if (OB_TMP_FAIL(report_obj_hdl.get_obj()->update_exec_tablet(1, true/*is_finish_task*/))) { LOG_WARN_RET(tmp_ret, "failed to inc exec tablet", K(get_ls_id()), K(get_tablet_id())); } diff --git a/src/storage/compaction/ob_partition_merger.cpp b/src/storage/compaction/ob_partition_merger.cpp index 967b1662e..bcc1863bd 100644 --- a/src/storage/compaction/ob_partition_merger.cpp +++ b/src/storage/compaction/ob_partition_merger.cpp @@ -523,7 +523,7 @@ void write_wrong_row(const ObTabletID &tablet_id, const ObDatumRow &row) ret = OB_E(EventTable::EN_MAKE_DATA_CKM_ERROR_BY_WRITE_WRONG_ROW) ret; if (OB_FAIL(ret) && tablet_id.id() > ObTabletID::MIN_USER_TABLET_ID - && (OB_CHECKSUM_ERROR == ret || GCTX.server_id_ == -ret)) { + && (OB_CHECKSUM_ERROR == ret || GCTX.get_server_id() == -ret)) { ObDatumRow &tmp_row = const_cast(row); tmp_row.storage_datums_[tmp_row.get_column_count() - 1].set_int(999); LOG_ERROR("ERRSIM EN_MAKE_DATA_CKM_ERROR_BY_WRITE_WRONG_ROW", K(ret), K(tablet_id), K(row)); diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index d5ba08c48..ad0d43199 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -660,8 +660,8 @@ void ObTabletMajorOutputMergeCtx::after_update_tablet_for_major() int tmp_ret = OB_SUCCESS; ObBasicObjHandle report_obj_hdl; - if (OB_TMP_FAIL(MTL_SVR_OBJ_MGR.get_obj_handle(GCTX.server_id_, report_obj_hdl))) { - LOG_WARN_RET(tmp_ret, "failed to get report obj handle", "cur_svr_id", GCTX.server_id_); + if (OB_TMP_FAIL(MTL_SVR_OBJ_MGR.get_obj_handle(GCTX.get_server_id(), report_obj_hdl))) { + LOG_WARN_RET(tmp_ret, "failed to get report obj handle", "cur_svr_id", GCTX.get_server_id()); } else if (OB_TMP_FAIL(report_obj_hdl.get_obj()->update_exec_tablet(1, true/*is_finish_task*/))) { LOG_WARN_RET(tmp_ret, "failed to inc exec tablet", K(get_ls_id()), K(get_tablet_id())); } diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 9a0041a60..17bb1d647 100644 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -1140,7 +1140,7 @@ int ObTabletMergeTask::process() } ret = SPECIFIED_SERVER_STOP_COMPACTION; if (OB_FAIL(ret)) { - if (-ret == GCTX.server_id_) { + if (-ret == GCTX.get_server_id()) { STORAGE_LOG(INFO, "ERRSIM SPECIFIED_SERVER_STOP_COMPACTION", K(ret)); return OB_EAGAIN; } else { diff --git a/src/storage/ob_disk_usage_reporter.cpp b/src/storage/ob_disk_usage_reporter.cpp index 3accc0718..8302e1d14 100644 --- a/src/storage/ob_disk_usage_reporter.cpp +++ b/src/storage/ob_disk_usage_reporter.cpp @@ -78,7 +78,7 @@ void ObDiskUsageReportTask::runTimerTask() int ret = OB_SUCCESS; const ObAddr addr = GCTX.self_addr(); - const int64_t self_svr_seq = GCTX.server_id_; + const int64_t self_svr_seq = GCTX.get_server_id(); char addr_buffer[MAX_IP_ADDR_LENGTH] = {}; if (OB_UNLIKELY(!is_inited_)) { diff --git a/unittest/share/detect/test_ob_detect_manager.cpp b/unittest/share/detect/test_ob_detect_manager.cpp index e9dd241cc..854543ee3 100644 --- a/unittest/share/detect/test_ob_detect_manager.cpp +++ b/unittest/share/detect/test_ob_detect_manager.cpp @@ -38,7 +38,7 @@ rpc::frame::ObReqTransport mock_transport(nullptr, nullptr); static int init_dm() { int ret = OB_SUCCESS; - GCTX.server_id_ = 10086; + (void) GCTX.set_server_id(4095); ObAddr self; self.set_ip_addr("127.0.0.1", 8086); if (OB_FAIL(dm->init(self, 1))) {