remove 'lock_all_dll_operation' from RS major_freeze
This commit is contained in:
@ -275,36 +275,39 @@ int ObFreezeInfoManager::inner_reload(ObFreezeInfo &freeze_info)
|
|||||||
int ObFreezeInfoManager::set_freeze_info()
|
int ObFreezeInfoManager::set_freeze_info()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMySQLTransaction trans;
|
SCN new_frozen_scn;
|
||||||
ObFreezeInfoProxy freeze_info_proxy(tenant_id_);
|
SCN remote_snapshot_gc_scn;
|
||||||
|
ObSimpleFrozenStatus frozen_status;
|
||||||
|
|
||||||
ObRecursiveMutexGuard guard(lock_);
|
ObRecursiveMutexGuard guard(lock_);
|
||||||
|
|
||||||
SCN new_frozen_scn;
|
int64_t tenant_schema_version = -1;
|
||||||
SCN remote_snapshot_gc_scn;
|
|
||||||
|
|
||||||
if (OB_FAIL(check_inner_stat())) {
|
if (OB_FAIL(check_inner_stat())) {
|
||||||
LOG_WARN("inner stat error", KR(ret));
|
LOG_WARN("inner stat error", KR(ret));
|
||||||
} else if (OB_FAIL(trans.start(sql_proxy_, tenant_id_))) {
|
} else if (OB_ISNULL(GCTX.schema_service_)) {
|
||||||
LOG_WARN("fail to start transaction", KR(ret), K_(tenant_id));
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("schema_service is null", KR(ret));
|
||||||
|
} else if (GCTX.schema_service_->get_tenant_refreshed_schema_version(tenant_id_, tenant_schema_version)) {
|
||||||
|
LOG_WARN("fail to get tenant refreshed schema version", KR(ret), K_(tenant_id));
|
||||||
|
} else {
|
||||||
|
ObFreezeInfoProxy freeze_info_proxy(tenant_id_);
|
||||||
|
ObDDLSQLTransaction trans(GCTX.schema_service_, false/*need_end_signal*/);
|
||||||
|
|
||||||
|
// In 'ddl_sql_transaction.start()', it implements the semantics of 'lock_all_ddl_operation'.
|
||||||
|
if (OB_FAIL(trans.start(sql_proxy_, tenant_id_, tenant_schema_version))) {
|
||||||
|
LOG_WARN("fail to start transaction", KR(ret), K_(tenant_id), K(tenant_schema_version));
|
||||||
// 1. lock snapshot_gc_ts in __all_global_stat
|
// 1. lock snapshot_gc_ts in __all_global_stat
|
||||||
} else if (OB_FAIL(ObGlobalStatProxy::select_snapshot_gc_scn_for_update(
|
} else if (OB_FAIL(ObGlobalStatProxy::select_snapshot_gc_scn_for_update(
|
||||||
trans, tenant_id_, remote_snapshot_gc_scn))) {
|
trans, tenant_id_, remote_snapshot_gc_scn))) {
|
||||||
LOG_WARN("fail to select for update", KR(ret));
|
LOG_WARN("fail to select for update", KR(ret));
|
||||||
// 2. acquire ddl lock, serialize with ddl operation
|
} else {
|
||||||
} else if (OB_FAIL(ObDDLSQLTransaction::lock_all_ddl_operation(trans, tenant_id_))) {
|
|
||||||
LOG_WARN("fail to acquire ddl lock", KR(ret), K_(tenant_id));
|
|
||||||
}
|
|
||||||
|
|
||||||
ObSimpleFrozenStatus frozen_status;
|
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
int64_t schema_version_in_frozen_ts = 0;
|
int64_t schema_version_in_frozen_ts = 0;
|
||||||
uint64_t data_version = 0;
|
uint64_t data_version = 0;
|
||||||
|
|
||||||
// 3. generate new frozen_scn
|
// 2. generate new frozen_scn
|
||||||
if (OB_FAIL(generate_frozen_scn(freeze_info_, remote_snapshot_gc_scn, new_frozen_scn))) {
|
if (OB_FAIL(generate_frozen_scn(freeze_info_, remote_snapshot_gc_scn, new_frozen_scn))) {
|
||||||
LOG_WARN("fail to generate frozen timestamp", KR(ret));
|
LOG_WARN("fail to generate frozen timestamp", KR(ret));
|
||||||
// 4. get schema_version at frozen_scn
|
// 3. get schema_version at frozen_scn
|
||||||
} else if (OB_FAIL(get_schema_version(new_frozen_scn, schema_version_in_frozen_ts))) {
|
} else if (OB_FAIL(get_schema_version(new_frozen_scn, schema_version_in_frozen_ts))) {
|
||||||
LOG_WARN("fail to get schema version", KR(ret), K(new_frozen_scn));
|
LOG_WARN("fail to get schema version", KR(ret), K(new_frozen_scn));
|
||||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, data_version))) {
|
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, data_version))) {
|
||||||
@ -314,7 +317,7 @@ int ObFreezeInfoManager::set_freeze_info()
|
|||||||
frozen_status.schema_version_ = schema_version_in_frozen_ts;
|
frozen_status.schema_version_ = schema_version_in_frozen_ts;
|
||||||
frozen_status.data_version_ = data_version;
|
frozen_status.data_version_ = data_version;
|
||||||
|
|
||||||
// 5. insert freeze info
|
// 4. insert freeze info
|
||||||
if (OB_FAIL(freeze_info_proxy.set_freeze_info(trans, frozen_status))) {
|
if (OB_FAIL(freeze_info_proxy.set_freeze_info(trans, frozen_status))) {
|
||||||
LOG_WARN("fail to set freeze info", KR(ret), K(frozen_status), K_(tenant_id));
|
LOG_WARN("fail to set freeze info", KR(ret), K(frozen_status), K_(tenant_id));
|
||||||
}
|
}
|
||||||
@ -328,6 +331,7 @@ int ObFreezeInfoManager::set_freeze_info()
|
|||||||
LOG_WARN("fail to end trans", "is_commit", OB_SUCC(ret), KR(tmp_ret));
|
LOG_WARN("fail to end trans", "is_commit", OB_SUCC(ret), KR(tmp_ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(freeze_info_.frozen_statuses_.push_back(frozen_status))) {
|
if (OB_FAIL(freeze_info_.frozen_statuses_.push_back(frozen_status))) {
|
||||||
|
|||||||
@ -211,9 +211,10 @@ int ObMajorFreezeHelper::do_one_tenant_major_freeze(
|
|||||||
.by(tenant_id)
|
.by(tenant_id)
|
||||||
.dst_cluster_id(GCONF.cluster_id)
|
.dst_cluster_id(GCONF.cluster_id)
|
||||||
.major_freeze(req, resp))) {
|
.major_freeze(req, resp))) {
|
||||||
if (OB_LEADER_NOT_EXIST == ret) {
|
if (OB_LEADER_NOT_EXIST == ret || OB_EAGAIN == ret) {
|
||||||
const int64_t idle_time = 200 * 1000 * (i + 1);
|
const int64_t idle_time = 200 * 1000 * (i + 1);
|
||||||
LOG_WARN("leader may switch, will retry", K(tenant_id), K(freeze_info), "ori_leader", leader, K(idle_time));
|
LOG_WARN("leader may switch or ddl confilict, will retry", KR(ret), K(tenant_id), K(freeze_info),
|
||||||
|
"ori_leader", leader, K(idle_time));
|
||||||
USLEEP(idle_time);
|
USLEEP(idle_time);
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
} else if ((OB_MAJOR_FREEZE_NOT_FINISHED != ret) && (OB_FROZEN_INFO_ALREADY_EXIST != ret)) {
|
} else if ((OB_MAJOR_FREEZE_NOT_FINISHED != ret) && (OB_FROZEN_INFO_ALREADY_EXIST != ret)) {
|
||||||
|
|||||||
Reference in New Issue
Block a user