修复建租户时发生unit迁移导致的建租户失败的bug
This commit is contained in:
parent
c13243ff23
commit
7373d1b50b
@ -7664,6 +7664,7 @@ int ObServerBalancer::ResourceSum::append_resource(
|
||||
resource_sum_.cpu_ += resource.cpu_;
|
||||
resource_sum_.mem_total_ += resource.mem_total_;
|
||||
resource_sum_.disk_total_ += resource.disk_total_;
|
||||
resource_sum_.log_disk_total_ += resource.log_disk_total_;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -7674,6 +7675,7 @@ int ObServerBalancer::ResourceSum::append_resource(
|
||||
resource_sum_.cpu_ += resource.resource_sum_.cpu_;
|
||||
resource_sum_.mem_total_ += resource.resource_sum_.mem_total_;
|
||||
resource_sum_.disk_total_ += resource.resource_sum_.disk_total_;
|
||||
resource_sum_.log_disk_total_ += resource.resource_sum_.log_disk_total_;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -246,6 +246,7 @@ int ObUnitManager::init(ObMySQLProxy &proxy,
|
||||
|
||||
int ObUnitManager::load()
|
||||
{
|
||||
DEBUG_SYNC(BEFORE_RELOAD_UNIT);
|
||||
int ret = OB_SUCCESS;
|
||||
SpinWLockGuard guard(lock_);
|
||||
if (!inited_) {
|
||||
@ -4748,7 +4749,6 @@ int ObUnitManager::get_tenant_unit_servers(
|
||||
common::ObIArray<common::ObAddr> &server_array) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// TODO(cangming.zl): may need lock_guard here
|
||||
ObArray<share::ObResourcePool *> *pools = nullptr;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
@ -8247,7 +8247,7 @@ int ObUnitManager::admin_migrate_unit(
|
||||
ret = OB_MACHINE_RESOURCE_NOT_ENOUGH;
|
||||
LOG_WARN("left resource can't hold unit", "server", dst,
|
||||
K(hard_limit), K(left_resource), "config", unit_info.config_, KR(ret));
|
||||
} else if (OB_FAIL(migrate_unit(unit_id, dst, is_manual))) {
|
||||
} else if (OB_FAIL(migrate_unit_(unit_id, dst, is_manual))) {
|
||||
LOG_WARN("migrate unit failed", K(unit_id), "destination", dst, KR(ret));
|
||||
}
|
||||
|
||||
@ -8356,196 +8356,273 @@ int ObUnitManager::try_migrate_unit(const uint64_t unit_id,
|
||||
K(required_size), K(total_size), K(limit_percent), K(ret));
|
||||
}
|
||||
|
||||
if (FAILEDx(migrate_unit(unit_id, dst, is_manual))) {
|
||||
if (FAILEDx(migrate_unit_(unit_id, dst, is_manual))) {
|
||||
LOG_WARN("fail migrate unit", K(unit_id), K(dst), K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUnitManager::migrate_unit(const uint64_t unit_id, const ObAddr &dst, const bool is_manual)
|
||||
int ObUnitManager::migrate_unit_(const uint64_t unit_id, const ObAddr &dst, const bool is_manual)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
|
||||
LOG_INFO("start to migrate unit", KR(ret), K(unit_id), K(dst), K(is_manual));
|
||||
ObUnit *unit = NULL;
|
||||
share::ObResourcePool *pool = NULL;
|
||||
ObZone zone;
|
||||
if (!check_inner_stat()) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
LOG_WARN("check_inner_stat failed", K(inited_), K(loaded_), K(ret));
|
||||
} else if (OB_INVALID_ID == unit_id || !dst.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(unit_id), K(dst), K(ret));
|
||||
} else if (OB_UNLIKELY(nullptr == srv_rpc_proxy_)) {
|
||||
} else if (OB_FAIL(get_unit_by_id(unit_id, unit))) {
|
||||
LOG_WARN("get_unit_by_id failed", K(unit_id), K(ret));
|
||||
} else if (OB_ISNULL(unit)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("srv_rpc_proxy_ ptr is null", K(ret));
|
||||
LOG_WARN("unit is null", KP(unit), K(ret));
|
||||
} else if (ObUnit::UNIT_STATUS_ACTIVE != unit->status_) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("cannot migrate unit which is in deleting", K(ret), K(unit_id));
|
||||
} else if (unit->server_ == dst) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("unit->server same as migrate destination server",
|
||||
"unit", *unit, K(dst), K(ret));
|
||||
} else if (OB_FAIL(SVR_TRACER.get_server_zone(dst, zone))) {
|
||||
LOG_WARN("get_server_zone failed", KR(ret), K(dst));
|
||||
} else if (OB_UNLIKELY(zone != unit->zone_)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("migrate unit between zones not supported", KR(ret), KP(unit), K(dst), K(zone));
|
||||
} else if (unit->migrate_from_server_.is_valid()) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("unit is already migrating, cannot migrate any more", "unit", *unit, K(ret));
|
||||
LOG_USER_ERROR(OB_NOT_SUPPORTED, "migrate unit already in migrating status");
|
||||
} else if (OB_FAIL(get_resource_pool_by_id(unit->resource_pool_id_, pool))) {
|
||||
LOG_WARN("get_resource_pool_by_id failed",
|
||||
"resource pool id", unit->resource_pool_id_, K(ret));
|
||||
} else if (OB_ISNULL(pool)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("pool is null", KP(pool), K(ret));
|
||||
} else {
|
||||
ObUnit *unit = NULL;
|
||||
share::ObResourcePool *pool = NULL;
|
||||
if (OB_FAIL(get_unit_by_id(unit_id, unit))) {
|
||||
LOG_WARN("get_unit_by_id failed", K(unit_id), K(ret));
|
||||
} else if (NULL == unit) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unit is null", KP(unit), K(ret));
|
||||
} else if (ObUnit::UNIT_STATUS_ACTIVE != unit->status_) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("cannot migrate unit which is in deleting", K(ret), K(unit_id));
|
||||
} else if (OB_FAIL(get_resource_pool_by_id(unit->resource_pool_id_, pool))) {
|
||||
LOG_WARN("get_resource_pool_by_id failed",
|
||||
"resource pool id", unit->resource_pool_id_, K(ret));
|
||||
} else if (NULL == pool) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("pool is null", KP(pool), K(ret));
|
||||
} else if (nullptr == schema_service_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema service ptr is null", K(ret));
|
||||
} else if (!pool->is_granted_to_tenant()) {
|
||||
// by pass
|
||||
} else if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(pool->tenant_id_, compat_mode))) {
|
||||
LOG_WARN("fail to get tenant compat mode", K(ret));
|
||||
const bool granted = pool->is_granted_to_tenant();
|
||||
const ObAddr src = unit->server_;
|
||||
ObUnit new_unit = *unit;
|
||||
new_unit.server_ = dst;
|
||||
new_unit.migrate_from_server_ = granted ? src : ObAddr();
|
||||
new_unit.is_manual_migrate_ = is_manual;
|
||||
|
||||
LOG_INFO("do migrate unit", KPC(unit), K(new_unit), KPC(pool), K(granted));
|
||||
|
||||
// STEP 1: try notify unit persistence on destination ObServer
|
||||
if (OB_FAIL(do_migrate_unit_notify_resource_(*pool, new_unit, is_manual, granted))) {
|
||||
LOG_WARN("do_migrate_unit_notify_resource failed", KR(ret), KPC(pool), K(new_unit), K(is_manual), K(granted));
|
||||
}
|
||||
|
||||
// STEP 2: Update info in inner_table in trans
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(do_migrate_unit_in_trans_(*pool, new_unit, is_manual, granted))) {
|
||||
LOG_WARN("do_migrate_unit_in_trans failed", KR(ret), KPC(pool), K(new_unit), K(is_manual), K(granted));
|
||||
}
|
||||
|
||||
// STEP 3: Update in-memory info (unit & unit_load & migrate_unit)
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(do_migrate_unit_inmemory_(new_unit, unit, is_manual, granted))) {
|
||||
LOG_WARN("do_migrate_unit_inmemory failed", KR(ret), K(dst), KPC(unit), K(is_manual), K(granted));
|
||||
}
|
||||
|
||||
// STEP 4: migration succeed, do some postprocess
|
||||
if (OB_SUCC(ret)) {
|
||||
ObAddr src;
|
||||
ObZone zone;
|
||||
// granted: If the unit has not been assigned to the tenant, the migration can be performed immediately
|
||||
const bool granted = pool->is_granted_to_tenant();
|
||||
if (!granted
|
||||
&& common::STANDBY_CLUSTER == ObClusterInfoGetter::get_cluster_role_v2()) {
|
||||
//Units without grant on the standby database are not allowed to be migrated
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("migrate not grant unit not valid", K(ret));
|
||||
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "migrate unit which has not been granted");
|
||||
} else if (OB_FAIL(SVR_TRACER.get_server_zone(dst, zone))) {
|
||||
LOG_WARN("get_server_zone failed", KR(ret), K(dst));
|
||||
} else if (unit->server_ == dst) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("unit->server same as migrate destination server",
|
||||
"unit", *unit, K(dst), K(ret));
|
||||
} else {
|
||||
ObNotifyTenantServerResourceProxy notify_proxy(
|
||||
*srv_rpc_proxy_,
|
||||
&obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource);
|
||||
ObUnit new_unit = *unit;
|
||||
src = unit->server_;
|
||||
if (unit->migrate_from_server_.is_valid()) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("unit is already migrating, cannot migrate any more", "unit", *unit, K(ret));
|
||||
LOG_USER_ERROR(OB_NOT_SUPPORTED, "migrate unit already in migrating status");
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
common::ObMySQLTransaction trans;
|
||||
new_unit.zone_ = zone;
|
||||
if (granted) {
|
||||
new_unit.migrate_from_server_ = unit->server_;
|
||||
}
|
||||
new_unit.server_ = dst;
|
||||
new_unit.is_manual_migrate_ = is_manual;
|
||||
const bool is_delete = false; // is_delete is false when migrate unit
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_FAIL(try_notify_tenant_server_unit_resource(
|
||||
pool->tenant_id_, is_delete, notify_proxy,
|
||||
*pool, compat_mode, new_unit, false/*if not grant*/,
|
||||
false/*skip offline server*/))) {
|
||||
LOG_WARN("fail to try notify server unit resource", K(ret));
|
||||
}
|
||||
|
||||
if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) {
|
||||
LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret));
|
||||
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
|
||||
}
|
||||
ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("start to rollback unit persistence", KR(ret), K(new_unit), K(pool->tenant_id_));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObArray<ObUnit> units;
|
||||
if (OB_TMP_FAIL(units.push_back(new_unit))) {
|
||||
LOG_WARN("fail to push an element into units", KR(ret), KR(tmp_ret), KPC(unit));
|
||||
} else if (OB_TMP_FAIL(rollback_persistent_units(
|
||||
units,
|
||||
*pool,
|
||||
compat_mode,
|
||||
false/*if not grant*/,
|
||||
false/*skip offline server*/,
|
||||
notify_proxy))) {
|
||||
LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret),
|
||||
K(units), KPC(pool), K(compat_mode));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) {
|
||||
LOG_WARN("failed to start trans", K(ret));
|
||||
} else if (is_manual) {
|
||||
char ip_buf[common::MAX_IP_ADDR_LENGTH];
|
||||
(void)dst.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH);
|
||||
int64_t job_id = 0;
|
||||
if (OB_FAIL(RS_JOB_CREATE_WITH_RET(
|
||||
job_id,
|
||||
ObRsJobType::JOB_TYPE_MIGRATE_UNIT,
|
||||
trans,
|
||||
"unit_id", unit_id,
|
||||
"svr_ip", ip_buf,
|
||||
"svr_port", dst.get_port(),
|
||||
"tenant_id", pool->tenant_id_))) {
|
||||
LOG_WARN("fail to create rs job MIGRATE_UNIT", KR(ret), "tenant_id", pool->tenant_id_,
|
||||
K(unit_id));
|
||||
} else if (!granted) {
|
||||
if (OB_FAIL(RS_JOB_COMPLETE(job_id, OB_SUCCESS, trans))) {
|
||||
LOG_WARN("all_rootservice_job update failed", K(ret), K(job_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ut_operator_.update_unit(trans, new_unit))) {
|
||||
LOG_WARN("update_unit failed", K(new_unit), K(ret));
|
||||
} else {
|
||||
*unit = new_unit;
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("trans commit failed", K(tmp_ret), K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete unit load if needed, insert unit load on dst
|
||||
ObUnitLoad load;
|
||||
if (OB_SUCC(ret)) {
|
||||
root_balance_->wakeup();
|
||||
if (!granted) {
|
||||
if (OB_FAIL(delete_unit_load(src, unit_id))) {
|
||||
LOG_WARN("delete_unit_load failed", K(src), K(unit_id), K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(gen_unit_load(unit, load))) {
|
||||
LOG_WARN("gen_unit_load failed", "unit", *unit, K(ret));
|
||||
} else if (OB_FAIL(insert_unit_load(dst, load))) {
|
||||
LOG_WARN("insert_unit_load failed", K(dst), K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (granted) {
|
||||
//ObArray<ObAddr> servers;
|
||||
if (OB_FAIL(insert_migrate_unit(unit->migrate_from_server_, unit->unit_id_))) {
|
||||
LOG_WARN("insert_migrate_unit failed", "unit", *unit, K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ROOTSERVICE_EVENT_ADD("unit", "migrate_unit",
|
||||
"unit_id", unit->unit_id_,
|
||||
"migrate_from_server", unit->migrate_from_server_,
|
||||
"server", unit->server_,
|
||||
"tenant_id", pool->tenant_id_);
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_INFO("migrate unit succeed", K(unit_id), K(src), K(dst), K(granted));
|
||||
// wakeup rootbalance thread to make disaster_recovery process more quickly
|
||||
root_balance_->wakeup();
|
||||
// add migrate_unit rootservice event
|
||||
ROOTSERVICE_EVENT_ADD("unit", "migrate_unit",
|
||||
"unit_id", unit->unit_id_,
|
||||
"migrate_from_server", unit->migrate_from_server_,
|
||||
"server", unit->server_,
|
||||
"tenant_id", pool->tenant_id_);
|
||||
}
|
||||
}
|
||||
LOG_INFO("finish migrate unit", KR(ret), K(unit_id), K(dst), K(is_manual));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUnitManager::do_migrate_unit_notify_resource_(const share::ObResourcePool &pool,
|
||||
const share::ObUnit &new_unit,
|
||||
const bool is_manual,
|
||||
const bool granted)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
|
||||
if (!check_inner_stat()) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
LOG_WARN("check_inner_stat failed", K(inited_), K(loaded_), K(ret));
|
||||
} else if (!granted) {
|
||||
// do nothing. If unit is not granted, there's no need to notify observer.
|
||||
} else if (OB_UNLIKELY(nullptr == srv_rpc_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("srv_rpc_proxy_ ptr is null", KR(ret));
|
||||
} else if (OB_UNLIKELY(new_unit.resource_pool_id_ != pool.resource_pool_id_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("new_unit not belong to pool", KR(ret), K(pool), K(new_unit));
|
||||
} else if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(pool.tenant_id_, compat_mode))) {
|
||||
LOG_WARN("fail to get tenant compat mode", KR(ret), K(pool));
|
||||
} else {
|
||||
ObNotifyTenantServerResourceProxy notify_proxy(*srv_rpc_proxy_,
|
||||
&obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource);
|
||||
// only notify new unit resource on dst server here.
|
||||
// Old unit on src server will be delete later when doing end_migrate
|
||||
if (OB_FAIL(try_notify_tenant_server_unit_resource(
|
||||
pool.tenant_id_, false/*is_delete*/, notify_proxy, // is_delete is false when migrate unit
|
||||
pool, compat_mode, new_unit, false/*if not grant*/,
|
||||
false/*skip offline server*/))) {
|
||||
LOG_WARN("fail to try notify server unit resource", K(ret));
|
||||
}
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) {
|
||||
LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret));
|
||||
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
|
||||
}
|
||||
|
||||
// Rollback persistent unit if persistence failed
|
||||
ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret;
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("start to rollback unit persistence", KR(ret), K(new_unit), K(pool.tenant_id_));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObArray<ObUnit> units;
|
||||
if (OB_TMP_FAIL(units.push_back(new_unit))) {
|
||||
LOG_WARN("fail to push an element into units", KR(ret), KR(tmp_ret), K(new_unit));
|
||||
} else if (OB_TMP_FAIL(rollback_persistent_units(
|
||||
units,
|
||||
pool,
|
||||
compat_mode,
|
||||
false/*if not grant*/,
|
||||
false/*skip offline server*/,
|
||||
notify_proxy))) {
|
||||
LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret),
|
||||
K(units), K(pool), K(compat_mode));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUnitManager::do_migrate_unit_in_trans_(const share::ObResourcePool &pool,
|
||||
const share::ObUnit &new_unit,
|
||||
const bool is_manual,
|
||||
const bool granted)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObMySQLTransaction trans;
|
||||
share::ObResourcePool real_pool;
|
||||
if (!check_inner_stat()) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
LOG_WARN("check_inner_stat failed", K(inited_), K(loaded_), K(ret));
|
||||
} else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) {
|
||||
LOG_WARN("failed to start trans", K(ret));
|
||||
}
|
||||
// Double check whether exactly unit is granted by querying pool from inner_table.
|
||||
// Because during creating or dropping tenant, there's a period when inner_table
|
||||
// and persistence units are updated while in-memory info not updated yet.
|
||||
// We need lock by SELECT FOR UPDATE to assure no other trans is still updating pool when checking.
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ut_operator_.get_resource_pool(trans,
|
||||
pool.resource_pool_id_,
|
||||
true/*select_for_update*/,
|
||||
real_pool))) {
|
||||
LOG_WARN("fail to get resource_pools from table", KR(ret));
|
||||
} else {
|
||||
if (pool.is_granted_to_tenant() != real_pool.is_granted_to_tenant()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("in-memory pool info not accurate, cannot migrate unit now.",
|
||||
KR(ret), K(new_unit), K(pool), K(real_pool));
|
||||
}
|
||||
}
|
||||
// Create migrate RS_JOB if it's a manual migration:
|
||||
// * If not granted, the job will be set as complete on creating.
|
||||
// * If granted, the job will be completed when end_migrate_unit is called.
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (is_manual) {
|
||||
char ip_buf[common::MAX_IP_ADDR_LENGTH];
|
||||
const ObAddr &dst = new_unit.server_;
|
||||
(void)dst.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH);
|
||||
int64_t job_id = 0;
|
||||
if (OB_FAIL(RS_JOB_CREATE_WITH_RET(
|
||||
job_id,
|
||||
ObRsJobType::JOB_TYPE_MIGRATE_UNIT,
|
||||
trans,
|
||||
"unit_id", new_unit.unit_id_,
|
||||
"svr_ip", ip_buf,
|
||||
"svr_port", dst.get_port(),
|
||||
"tenant_id", pool.tenant_id_))) {
|
||||
LOG_WARN("fail to create rs job MIGRATE_UNIT", KR(ret),
|
||||
"tenant_id", pool.tenant_id_,
|
||||
"unit_id", new_unit.unit_id_);
|
||||
} else if (!granted) {
|
||||
// not granted, migration can be done at once, so mark RS_JOB completed
|
||||
if (OB_FAIL(RS_JOB_COMPLETE(job_id, OB_SUCCESS, trans))) {
|
||||
LOG_WARN("all_rootservice_job update failed", K(ret), K(job_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
// Update unit info
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ut_operator_.update_unit(trans, new_unit))) {
|
||||
LOG_WARN("update_unit failed", K(new_unit), K(ret));
|
||||
}
|
||||
// End this transaction
|
||||
if (trans.is_started()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("trans commit failed", K(tmp_ret), K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUnitManager::do_migrate_unit_inmemory_(const share::ObUnit &new_unit,
|
||||
share::ObUnit *unit,
|
||||
const bool is_manual,
|
||||
const bool granted)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!check_inner_stat()) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
LOG_WARN("check_inner_stat failed", K(inited_), K(loaded_), K(ret));
|
||||
} else if (OB_ISNULL(unit)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unit ptr is null", KR(ret), KP(unit));
|
||||
} else if (OB_UNLIKELY(unit->unit_id_ != new_unit.unit_id_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("old and new unit id not the same", KR(ret), KPC(unit), K(new_unit));
|
||||
} else {
|
||||
const ObAddr src = unit->server_;
|
||||
const ObAddr dst = new_unit.server_;
|
||||
// do update unit
|
||||
*unit = new_unit;
|
||||
// update unit_load
|
||||
// delete old unit load imediately if pool not granted
|
||||
if (!granted) {
|
||||
if (OB_FAIL(delete_unit_load(src, unit->unit_id_))) {
|
||||
LOG_WARN("delete_unit_load failed", K(src), "unit_id", unit->unit_id_, KR(ret));
|
||||
}
|
||||
}
|
||||
// insert new unit load
|
||||
ObUnitLoad load;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(gen_unit_load(unit, load))) {
|
||||
LOG_WARN("gen_unit_load failed", "unit", *unit, K(ret));
|
||||
} else if (OB_FAIL(insert_unit_load(dst, load))) {
|
||||
LOG_WARN("insert_unit_load failed", K(dst), K(ret));
|
||||
}
|
||||
// update migrate_unit list
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (granted) {
|
||||
if (OB_FAIL(insert_migrate_unit(unit->migrate_from_server_, unit->unit_id_))) {
|
||||
LOG_WARN("insert_migrate_unit failed", "unit", *unit, K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_INFO("migrate unit", K(unit_id), K(dst), K(ret));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -429,7 +429,19 @@ private:
|
||||
const double limit,
|
||||
bool &is_enough,
|
||||
AlterResourceErr &err_index) const;
|
||||
virtual int migrate_unit(const uint64_t unit_id, const common::ObAddr &dst, const bool is_manual = false);
|
||||
virtual int migrate_unit_(const uint64_t unit_id, const common::ObAddr &dst, const bool is_manual = false);
|
||||
int do_migrate_unit_notify_resource_(const share::ObResourcePool &pool,
|
||||
const share::ObUnit &new_unit,
|
||||
const bool is_manual,
|
||||
const bool granted);
|
||||
int do_migrate_unit_in_trans_(const share::ObResourcePool &pool,
|
||||
const share::ObUnit &new_unit,
|
||||
const bool is_manual,
|
||||
const bool granted);
|
||||
int do_migrate_unit_inmemory_(const share::ObUnit &new_unit,
|
||||
share::ObUnit *unit,
|
||||
const bool is_manual,
|
||||
const bool granted);
|
||||
int inner_get_unit_info_by_id(const uint64_t unit_id, share::ObUnitInfo &unit) const;
|
||||
int check_server_enough(const uint64_t tenant_id,
|
||||
const common::ObIArray<share::ObResourcePoolName> &pool_names,
|
||||
|
@ -556,6 +556,7 @@ class ObString;
|
||||
ACT(BEFORE_FINISH_PRIMARY_ZONE,)\
|
||||
ACT(BEFORE_FINISH_UNIT_NUM,)\
|
||||
ACT(BEFORE_CHECK_PRIMARY_ZONE,)\
|
||||
ACT(BEFORE_RELOAD_UNIT,)\
|
||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||
|
||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||
|
@ -403,6 +403,36 @@ int ObUnitTableOperator::get_resource_pools(const common::ObIArray<uint64_t> &po
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUnitTableOperator::get_resource_pool(common::ObISQLClient &sql_client,
|
||||
const uint64_t pool_id,
|
||||
const bool select_for_update,
|
||||
ObResourcePool &resource_pool) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql_string;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else {
|
||||
sqlclient::ObMySQLResult *result = NULL;
|
||||
if (OB_FAIL(sql_string.assign_fmt("SELECT * FROM %s WHERE resource_pool_id = %lu%s",
|
||||
OB_ALL_RESOURCE_POOL_TNAME, pool_id,
|
||||
select_for_update ? " FOR UPDATE" : ""))) {
|
||||
LOG_WARN("assign sql string failed", K(ret), K(pool_id), K(select_for_update));
|
||||
} else if (OB_FAIL(sql_client.read(res, sql_string.ptr()))) {
|
||||
LOG_WARN("update status of ddl task record failed", K(ret), K(sql_string));
|
||||
} else if (OB_UNLIKELY(NULL == (result = res.get_result()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get sql result", K(ret), KP(result));
|
||||
} else if (OB_FAIL(read_resource_pool(*result, resource_pool))) {
|
||||
LOG_WARN("fail to read resource pool from result", KR(ret), K(pool_id), K(select_for_update));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUnitTableOperator::update_resource_pool(common::ObISQLClient &client,
|
||||
const ObResourcePool &resource_pool)
|
||||
{
|
||||
|
@ -55,6 +55,10 @@ public:
|
||||
common::ObIArray<ObResourcePool> &pools) const;
|
||||
virtual int get_resource_pools(const common::ObIArray<uint64_t> &pool_ids,
|
||||
common::ObIArray<ObResourcePool> &pools) const;
|
||||
int get_resource_pool(ObISQLClient &sql_client,
|
||||
const uint64_t pool_id,
|
||||
const bool select_for_update,
|
||||
ObResourcePool &resource_pool) const;
|
||||
virtual int update_resource_pool(common::ObISQLClient &client,
|
||||
const ObResourcePool &resource_pool);
|
||||
virtual int remove_resource_pool(common::ObISQLClient &client,
|
||||
|
Loading…
x
Reference in New Issue
Block a user