From 7373d1b50b15614f811815d878f06a8225f11b72 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 22 Sep 2023 06:40:17 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=BB=BA=E7=A7=9F=E6=88=B7?= =?UTF-8?q?=E6=97=B6=E5=8F=91=E7=94=9Funit=E8=BF=81=E7=A7=BB=E5=AF=BC?= =?UTF-8?q?=E8=87=B4=E7=9A=84=E5=BB=BA=E7=A7=9F=E6=88=B7=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/rootserver/ob_server_balancer.cpp | 2 + src/rootserver/ob_unit_manager.cpp | 421 +++++++++++++++----------- src/rootserver/ob_unit_manager.h | 14 +- src/share/ob_debug_sync_point.h | 1 + src/share/ob_unit_table_operator.cpp | 30 ++ src/share/ob_unit_table_operator.h | 4 + 6 files changed, 299 insertions(+), 173 deletions(-) diff --git a/src/rootserver/ob_server_balancer.cpp b/src/rootserver/ob_server_balancer.cpp index 706f3c415..fe486b5d7 100644 --- a/src/rootserver/ob_server_balancer.cpp +++ b/src/rootserver/ob_server_balancer.cpp @@ -7664,6 +7664,7 @@ int ObServerBalancer::ResourceSum::append_resource( resource_sum_.cpu_ += resource.cpu_; resource_sum_.mem_total_ += resource.mem_total_; resource_sum_.disk_total_ += resource.disk_total_; + resource_sum_.log_disk_total_ += resource.log_disk_total_; return ret; } @@ -7674,6 +7675,7 @@ int ObServerBalancer::ResourceSum::append_resource( resource_sum_.cpu_ += resource.resource_sum_.cpu_; resource_sum_.mem_total_ += resource.resource_sum_.mem_total_; resource_sum_.disk_total_ += resource.resource_sum_.disk_total_; + resource_sum_.log_disk_total_ += resource.resource_sum_.log_disk_total_; return ret; } diff --git a/src/rootserver/ob_unit_manager.cpp b/src/rootserver/ob_unit_manager.cpp index db9e3a098..7b00c51ad 100644 --- a/src/rootserver/ob_unit_manager.cpp +++ b/src/rootserver/ob_unit_manager.cpp @@ -246,6 +246,7 @@ int ObUnitManager::init(ObMySQLProxy &proxy, int ObUnitManager::load() { + DEBUG_SYNC(BEFORE_RELOAD_UNIT); int ret = OB_SUCCESS; SpinWLockGuard guard(lock_); if (!inited_) { @@ -4748,7 +4749,6 @@ int ObUnitManager::get_tenant_unit_servers( common::ObIArray &server_array) const { int ret = OB_SUCCESS; - // TODO(cangming.zl): may need lock_guard here ObArray *pools = nullptr; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { ret = OB_INVALID_ARGUMENT; @@ -8247,7 +8247,7 @@ int ObUnitManager::admin_migrate_unit( ret = OB_MACHINE_RESOURCE_NOT_ENOUGH; LOG_WARN("left resource can't hold unit", "server", dst, K(hard_limit), K(left_resource), "config", unit_info.config_, KR(ret)); - } else if (OB_FAIL(migrate_unit(unit_id, dst, is_manual))) { + } else if (OB_FAIL(migrate_unit_(unit_id, dst, is_manual))) { LOG_WARN("migrate unit failed", K(unit_id), "destination", dst, KR(ret)); } @@ -8356,196 +8356,273 @@ int ObUnitManager::try_migrate_unit(const uint64_t unit_id, K(required_size), K(total_size), K(limit_percent), K(ret)); } - if (FAILEDx(migrate_unit(unit_id, dst, is_manual))) { + if (FAILEDx(migrate_unit_(unit_id, dst, is_manual))) { LOG_WARN("fail migrate unit", K(unit_id), K(dst), K(ret)); } } return ret; } -int ObUnitManager::migrate_unit(const uint64_t unit_id, const ObAddr &dst, const bool is_manual) +int ObUnitManager::migrate_unit_(const uint64_t unit_id, const ObAddr &dst, const bool is_manual) { int ret = OB_SUCCESS; - lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID; + LOG_INFO("start to migrate unit", KR(ret), K(unit_id), K(dst), K(is_manual)); + ObUnit *unit = NULL; + share::ObResourcePool *pool = NULL; + ObZone zone; if (!check_inner_stat()) { ret = OB_INNER_STAT_ERROR; LOG_WARN("check_inner_stat failed", K(inited_), K(loaded_), K(ret)); } else if (OB_INVALID_ID == unit_id || !dst.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(unit_id), K(dst), K(ret)); - } else if (OB_UNLIKELY(nullptr == srv_rpc_proxy_)) { + } else if (OB_FAIL(get_unit_by_id(unit_id, unit))) { + LOG_WARN("get_unit_by_id failed", K(unit_id), K(ret)); + } else if (OB_ISNULL(unit)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("srv_rpc_proxy_ ptr is null", K(ret)); + LOG_WARN("unit is null", KP(unit), K(ret)); + } else if (ObUnit::UNIT_STATUS_ACTIVE != unit->status_) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("cannot migrate unit which is in deleting", K(ret), K(unit_id)); + } else if (unit->server_ == dst) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("unit->server same as migrate destination server", + "unit", *unit, K(dst), K(ret)); + } else if (OB_FAIL(SVR_TRACER.get_server_zone(dst, zone))) { + LOG_WARN("get_server_zone failed", KR(ret), K(dst)); + } else if (OB_UNLIKELY(zone != unit->zone_)) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("migrate unit between zones not supported", KR(ret), KP(unit), K(dst), K(zone)); + } else if (unit->migrate_from_server_.is_valid()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("unit is already migrating, cannot migrate any more", "unit", *unit, K(ret)); + LOG_USER_ERROR(OB_NOT_SUPPORTED, "migrate unit already in migrating status"); + } else if (OB_FAIL(get_resource_pool_by_id(unit->resource_pool_id_, pool))) { + LOG_WARN("get_resource_pool_by_id failed", + "resource pool id", unit->resource_pool_id_, K(ret)); + } else if (OB_ISNULL(pool)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("pool is null", KP(pool), K(ret)); } else { - ObUnit *unit = NULL; - share::ObResourcePool *pool = NULL; - if (OB_FAIL(get_unit_by_id(unit_id, unit))) { - LOG_WARN("get_unit_by_id failed", K(unit_id), K(ret)); - } else if (NULL == unit) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unit is null", KP(unit), K(ret)); - } else if (ObUnit::UNIT_STATUS_ACTIVE != unit->status_) { - ret = OB_OP_NOT_ALLOW; - LOG_WARN("cannot migrate unit which is in deleting", K(ret), K(unit_id)); - } else if (OB_FAIL(get_resource_pool_by_id(unit->resource_pool_id_, pool))) { - LOG_WARN("get_resource_pool_by_id failed", - "resource pool id", unit->resource_pool_id_, K(ret)); - } else if (NULL == pool) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("pool is null", KP(pool), K(ret)); - } else if (nullptr == schema_service_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("schema service ptr is null", K(ret)); - } else if (!pool->is_granted_to_tenant()) { - // by pass - } else if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(pool->tenant_id_, compat_mode))) { - LOG_WARN("fail to get tenant compat mode", K(ret)); + const bool granted = pool->is_granted_to_tenant(); + const ObAddr src = unit->server_; + ObUnit new_unit = *unit; + new_unit.server_ = dst; + new_unit.migrate_from_server_ = granted ? src : ObAddr(); + new_unit.is_manual_migrate_ = is_manual; + + LOG_INFO("do migrate unit", KPC(unit), K(new_unit), KPC(pool), K(granted)); + + // STEP 1: try notify unit persistence on destination ObServer + if (OB_FAIL(do_migrate_unit_notify_resource_(*pool, new_unit, is_manual, granted))) { + LOG_WARN("do_migrate_unit_notify_resource failed", KR(ret), KPC(pool), K(new_unit), K(is_manual), K(granted)); } + + // STEP 2: Update info in inner_table in trans + if (OB_FAIL(ret)) { + } else if (OB_FAIL(do_migrate_unit_in_trans_(*pool, new_unit, is_manual, granted))) { + LOG_WARN("do_migrate_unit_in_trans failed", KR(ret), KPC(pool), K(new_unit), K(is_manual), K(granted)); + } + + // STEP 3: Update in-memory info (unit & unit_load & migrate_unit) + if (OB_FAIL(ret)) { + } else if (OB_FAIL(do_migrate_unit_inmemory_(new_unit, unit, is_manual, granted))) { + LOG_WARN("do_migrate_unit_inmemory failed", KR(ret), K(dst), KPC(unit), K(is_manual), K(granted)); + } + + // STEP 4: migration succeed, do some postprocess if (OB_SUCC(ret)) { - ObAddr src; - ObZone zone; - // granted: If the unit has not been assigned to the tenant, the migration can be performed immediately - const bool granted = pool->is_granted_to_tenant(); - if (!granted - && common::STANDBY_CLUSTER == ObClusterInfoGetter::get_cluster_role_v2()) { - //Units without grant on the standby database are not allowed to be migrated - ret = OB_OP_NOT_ALLOW; - LOG_WARN("migrate not grant unit not valid", K(ret)); - LOG_USER_ERROR(OB_OP_NOT_ALLOW, "migrate unit which has not been granted"); - } else if (OB_FAIL(SVR_TRACER.get_server_zone(dst, zone))) { - LOG_WARN("get_server_zone failed", KR(ret), K(dst)); - } else if (unit->server_ == dst) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("unit->server same as migrate destination server", - "unit", *unit, K(dst), K(ret)); - } else { - ObNotifyTenantServerResourceProxy notify_proxy( - *srv_rpc_proxy_, - &obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource); - ObUnit new_unit = *unit; - src = unit->server_; - if (unit->migrate_from_server_.is_valid()) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("unit is already migrating, cannot migrate any more", "unit", *unit, K(ret)); - LOG_USER_ERROR(OB_NOT_SUPPORTED, "migrate unit already in migrating status"); - } - - if (OB_SUCC(ret)) { - common::ObMySQLTransaction trans; - new_unit.zone_ = zone; - if (granted) { - new_unit.migrate_from_server_ = unit->server_; - } - new_unit.server_ = dst; - new_unit.is_manual_migrate_ = is_manual; - const bool is_delete = false; // is_delete is false when migrate unit - int tmp_ret = OB_SUCCESS; - if (OB_FAIL(try_notify_tenant_server_unit_resource( - pool->tenant_id_, is_delete, notify_proxy, - *pool, compat_mode, new_unit, false/*if not grant*/, - false/*skip offline server*/))) { - LOG_WARN("fail to try notify server unit resource", K(ret)); - } - - if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) { - LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret)); - ret = (OB_SUCCESS == ret) ? tmp_ret : ret; - } - ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret; - if (OB_FAIL(ret)) { - LOG_WARN("start to rollback unit persistence", KR(ret), K(new_unit), K(pool->tenant_id_)); - int tmp_ret = OB_SUCCESS; - ObArray units; - if (OB_TMP_FAIL(units.push_back(new_unit))) { - LOG_WARN("fail to push an element into units", KR(ret), KR(tmp_ret), KPC(unit)); - } else if (OB_TMP_FAIL(rollback_persistent_units( - units, - *pool, - compat_mode, - false/*if not grant*/, - false/*skip offline server*/, - notify_proxy))) { - LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret), - K(units), KPC(pool), K(compat_mode)); - } - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) { - LOG_WARN("failed to start trans", K(ret)); - } else if (is_manual) { - char ip_buf[common::MAX_IP_ADDR_LENGTH]; - (void)dst.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH); - int64_t job_id = 0; - if (OB_FAIL(RS_JOB_CREATE_WITH_RET( - job_id, - ObRsJobType::JOB_TYPE_MIGRATE_UNIT, - trans, - "unit_id", unit_id, - "svr_ip", ip_buf, - "svr_port", dst.get_port(), - "tenant_id", pool->tenant_id_))) { - LOG_WARN("fail to create rs job MIGRATE_UNIT", KR(ret), "tenant_id", pool->tenant_id_, - K(unit_id)); - } else if (!granted) { - if (OB_FAIL(RS_JOB_COMPLETE(job_id, OB_SUCCESS, trans))) { - LOG_WARN("all_rootservice_job update failed", K(ret), K(job_id)); - } - } - } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(ut_operator_.update_unit(trans, new_unit))) { - LOG_WARN("update_unit failed", K(new_unit), K(ret)); - } else { - *unit = new_unit; - } - if (trans.is_started()) { - if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) { - ret = OB_SUCC(ret) ? tmp_ret : ret; - LOG_WARN("trans commit failed", K(tmp_ret), K(ret)); - } - } - } - - // delete unit load if needed, insert unit load on dst - ObUnitLoad load; - if (OB_SUCC(ret)) { - root_balance_->wakeup(); - if (!granted) { - if (OB_FAIL(delete_unit_load(src, unit_id))) { - LOG_WARN("delete_unit_load failed", K(src), K(unit_id), K(ret)); - } - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(gen_unit_load(unit, load))) { - LOG_WARN("gen_unit_load failed", "unit", *unit, K(ret)); - } else if (OB_FAIL(insert_unit_load(dst, load))) { - LOG_WARN("insert_unit_load failed", K(dst), K(ret)); - } - } - - if (OB_SUCC(ret)) { - if (granted) { - //ObArray servers; - if (OB_FAIL(insert_migrate_unit(unit->migrate_from_server_, unit->unit_id_))) { - LOG_WARN("insert_migrate_unit failed", "unit", *unit, K(ret)); - } - } - - if (OB_SUCC(ret)) { - ROOTSERVICE_EVENT_ADD("unit", "migrate_unit", - "unit_id", unit->unit_id_, - "migrate_from_server", unit->migrate_from_server_, - "server", unit->server_, - "tenant_id", pool->tenant_id_); - } - } - } - LOG_INFO("migrate unit succeed", K(unit_id), K(src), K(dst), K(granted)); + // wakeup rootbalance thread to make disaster_recovery process more quickly + root_balance_->wakeup(); + // add migrate_unit rootservice event + ROOTSERVICE_EVENT_ADD("unit", "migrate_unit", + "unit_id", unit->unit_id_, + "migrate_from_server", unit->migrate_from_server_, + "server", unit->server_, + "tenant_id", pool->tenant_id_); + } + } + LOG_INFO("finish migrate unit", KR(ret), K(unit_id), K(dst), K(is_manual)); + return ret; +} + +int ObUnitManager::do_migrate_unit_notify_resource_(const share::ObResourcePool &pool, + const share::ObUnit &new_unit, + const bool is_manual, + const bool granted) +{ + int ret = OB_SUCCESS; + lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID; + if (!check_inner_stat()) { + ret = OB_INNER_STAT_ERROR; + LOG_WARN("check_inner_stat failed", K(inited_), K(loaded_), K(ret)); + } else if (!granted) { + // do nothing. If unit is not granted, there's no need to notify observer. + } else if (OB_UNLIKELY(nullptr == srv_rpc_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("srv_rpc_proxy_ ptr is null", KR(ret)); + } else if (OB_UNLIKELY(new_unit.resource_pool_id_ != pool.resource_pool_id_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("new_unit not belong to pool", KR(ret), K(pool), K(new_unit)); + } else if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(pool.tenant_id_, compat_mode))) { + LOG_WARN("fail to get tenant compat mode", KR(ret), K(pool)); + } else { + ObNotifyTenantServerResourceProxy notify_proxy(*srv_rpc_proxy_, + &obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource); + // only notify new unit resource on dst server here. + // Old unit on src server will be delete later when doing end_migrate + if (OB_FAIL(try_notify_tenant_server_unit_resource( + pool.tenant_id_, false/*is_delete*/, notify_proxy, // is_delete is false when migrate unit + pool, compat_mode, new_unit, false/*if not grant*/, + false/*skip offline server*/))) { + LOG_WARN("fail to try notify server unit resource", K(ret)); + } + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) { + LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret)); + ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } + + // Rollback persistent unit if persistence failed + ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret; + if (OB_FAIL(ret)) { + LOG_WARN("start to rollback unit persistence", KR(ret), K(new_unit), K(pool.tenant_id_)); + int tmp_ret = OB_SUCCESS; + ObArray units; + if (OB_TMP_FAIL(units.push_back(new_unit))) { + LOG_WARN("fail to push an element into units", KR(ret), KR(tmp_ret), K(new_unit)); + } else if (OB_TMP_FAIL(rollback_persistent_units( + units, + pool, + compat_mode, + false/*if not grant*/, + false/*skip offline server*/, + notify_proxy))) { + LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret), + K(units), K(pool), K(compat_mode)); + } + } + } + return ret; +} + +int ObUnitManager::do_migrate_unit_in_trans_(const share::ObResourcePool &pool, + const share::ObUnit &new_unit, + const bool is_manual, + const bool granted) +{ + int ret = OB_SUCCESS; + common::ObMySQLTransaction trans; + share::ObResourcePool real_pool; + if (!check_inner_stat()) { + ret = OB_INNER_STAT_ERROR; + LOG_WARN("check_inner_stat failed", K(inited_), K(loaded_), K(ret)); + } else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) { + LOG_WARN("failed to start trans", K(ret)); + } + // Double check whether exactly unit is granted by querying pool from inner_table. + // Because during creating or dropping tenant, there's a period when inner_table + // and persistence units are updated while in-memory info not updated yet. + // We need lock by SELECT FOR UPDATE to assure no other trans is still updating pool when checking. + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ut_operator_.get_resource_pool(trans, + pool.resource_pool_id_, + true/*select_for_update*/, + real_pool))) { + LOG_WARN("fail to get resource_pools from table", KR(ret)); + } else { + if (pool.is_granted_to_tenant() != real_pool.is_granted_to_tenant()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("in-memory pool info not accurate, cannot migrate unit now.", + KR(ret), K(new_unit), K(pool), K(real_pool)); + } + } + // Create migrate RS_JOB if it's a manual migration: + // * If not granted, the job will be set as complete on creating. + // * If granted, the job will be completed when end_migrate_unit is called. + if (OB_FAIL(ret)) { + } else if (is_manual) { + char ip_buf[common::MAX_IP_ADDR_LENGTH]; + const ObAddr &dst = new_unit.server_; + (void)dst.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH); + int64_t job_id = 0; + if (OB_FAIL(RS_JOB_CREATE_WITH_RET( + job_id, + ObRsJobType::JOB_TYPE_MIGRATE_UNIT, + trans, + "unit_id", new_unit.unit_id_, + "svr_ip", ip_buf, + "svr_port", dst.get_port(), + "tenant_id", pool.tenant_id_))) { + LOG_WARN("fail to create rs job MIGRATE_UNIT", KR(ret), + "tenant_id", pool.tenant_id_, + "unit_id", new_unit.unit_id_); + } else if (!granted) { + // not granted, migration can be done at once, so mark RS_JOB completed + if (OB_FAIL(RS_JOB_COMPLETE(job_id, OB_SUCCESS, trans))) { + LOG_WARN("all_rootservice_job update failed", K(ret), K(job_id)); + } + } + } + // Update unit info + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ut_operator_.update_unit(trans, new_unit))) { + LOG_WARN("update_unit failed", K(new_unit), K(ret)); + } + // End this transaction + if (trans.is_started()) { + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) { + ret = OB_SUCC(ret) ? tmp_ret : ret; + LOG_WARN("trans commit failed", K(tmp_ret), K(ret)); + } + } + return ret; +} + +int ObUnitManager::do_migrate_unit_inmemory_(const share::ObUnit &new_unit, + share::ObUnit *unit, + const bool is_manual, + const bool granted) +{ + int ret = OB_SUCCESS; + if (!check_inner_stat()) { + ret = OB_INNER_STAT_ERROR; + LOG_WARN("check_inner_stat failed", K(inited_), K(loaded_), K(ret)); + } else if (OB_ISNULL(unit)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unit ptr is null", KR(ret), KP(unit)); + } else if (OB_UNLIKELY(unit->unit_id_ != new_unit.unit_id_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("old and new unit id not the same", KR(ret), KPC(unit), K(new_unit)); + } else { + const ObAddr src = unit->server_; + const ObAddr dst = new_unit.server_; + // do update unit + *unit = new_unit; + // update unit_load + // delete old unit load imediately if pool not granted + if (!granted) { + if (OB_FAIL(delete_unit_load(src, unit->unit_id_))) { + LOG_WARN("delete_unit_load failed", K(src), "unit_id", unit->unit_id_, KR(ret)); + } + } + // insert new unit load + ObUnitLoad load; + if (OB_FAIL(ret)) { + } else if (OB_FAIL(gen_unit_load(unit, load))) { + LOG_WARN("gen_unit_load failed", "unit", *unit, K(ret)); + } else if (OB_FAIL(insert_unit_load(dst, load))) { + LOG_WARN("insert_unit_load failed", K(dst), K(ret)); + } + // update migrate_unit list + if (OB_FAIL(ret)) { + } else if (granted) { + if (OB_FAIL(insert_migrate_unit(unit->migrate_from_server_, unit->unit_id_))) { + LOG_WARN("insert_migrate_unit failed", "unit", *unit, K(ret)); + } } } - LOG_INFO("migrate unit", K(unit_id), K(dst), K(ret)); return ret; } diff --git a/src/rootserver/ob_unit_manager.h b/src/rootserver/ob_unit_manager.h index 8eb97119c..8e538caa8 100644 --- a/src/rootserver/ob_unit_manager.h +++ b/src/rootserver/ob_unit_manager.h @@ -429,7 +429,19 @@ private: const double limit, bool &is_enough, AlterResourceErr &err_index) const; - virtual int migrate_unit(const uint64_t unit_id, const common::ObAddr &dst, const bool is_manual = false); + virtual int migrate_unit_(const uint64_t unit_id, const common::ObAddr &dst, const bool is_manual = false); + int do_migrate_unit_notify_resource_(const share::ObResourcePool &pool, + const share::ObUnit &new_unit, + const bool is_manual, + const bool granted); + int do_migrate_unit_in_trans_(const share::ObResourcePool &pool, + const share::ObUnit &new_unit, + const bool is_manual, + const bool granted); + int do_migrate_unit_inmemory_(const share::ObUnit &new_unit, + share::ObUnit *unit, + const bool is_manual, + const bool granted); int inner_get_unit_info_by_id(const uint64_t unit_id, share::ObUnitInfo &unit) const; int check_server_enough(const uint64_t tenant_id, const common::ObIArray &pool_names, diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index e4fe8dbff..2d184fba8 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -556,6 +556,7 @@ class ObString; ACT(BEFORE_FINISH_PRIMARY_ZONE,)\ ACT(BEFORE_FINISH_UNIT_NUM,)\ ACT(BEFORE_CHECK_PRIMARY_ZONE,)\ + ACT(BEFORE_RELOAD_UNIT,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/ob_unit_table_operator.cpp b/src/share/ob_unit_table_operator.cpp index e5cf6a386..6fc29bc6d 100644 --- a/src/share/ob_unit_table_operator.cpp +++ b/src/share/ob_unit_table_operator.cpp @@ -403,6 +403,36 @@ int ObUnitTableOperator::get_resource_pools(const common::ObIArray &po return ret; } +int ObUnitTableOperator::get_resource_pool(common::ObISQLClient &sql_client, + const uint64_t pool_id, + const bool select_for_update, + ObResourcePool &resource_pool) const +{ + int ret = OB_SUCCESS; + ObSqlString sql_string; + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else { + sqlclient::ObMySQLResult *result = NULL; + if (OB_FAIL(sql_string.assign_fmt("SELECT * FROM %s WHERE resource_pool_id = %lu%s", + OB_ALL_RESOURCE_POOL_TNAME, pool_id, + select_for_update ? " FOR UPDATE" : ""))) { + LOG_WARN("assign sql string failed", K(ret), K(pool_id), K(select_for_update)); + } else if (OB_FAIL(sql_client.read(res, sql_string.ptr()))) { + LOG_WARN("update status of ddl task record failed", K(ret), K(sql_string)); + } else if (OB_UNLIKELY(NULL == (result = res.get_result()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", K(ret), KP(result)); + } else if (OB_FAIL(read_resource_pool(*result, resource_pool))) { + LOG_WARN("fail to read resource pool from result", KR(ret), K(pool_id), K(select_for_update)); + } + } + } + return ret; +} + int ObUnitTableOperator::update_resource_pool(common::ObISQLClient &client, const ObResourcePool &resource_pool) { diff --git a/src/share/ob_unit_table_operator.h b/src/share/ob_unit_table_operator.h index 93bd3bc47..7ffe478ad 100644 --- a/src/share/ob_unit_table_operator.h +++ b/src/share/ob_unit_table_operator.h @@ -55,6 +55,10 @@ public: common::ObIArray &pools) const; virtual int get_resource_pools(const common::ObIArray &pool_ids, common::ObIArray &pools) const; + int get_resource_pool(ObISQLClient &sql_client, + const uint64_t pool_id, + const bool select_for_update, + ObResourcePool &resource_pool) const; virtual int update_resource_pool(common::ObISQLClient &client, const ObResourcePool &resource_pool); virtual int remove_resource_pool(common::ObISQLClient &client,