wait until tenant_sync_scn is larger than sys_ls_end_scn in some cases

This commit is contained in:
linqiucen
2023-08-07 13:48:13 +00:00
committed by ob-robot
parent 7c789f265f
commit e24b2cefb7
12 changed files with 254 additions and 17 deletions

View File

@ -31,6 +31,7 @@
#include "observer/ob_server_struct.h"//GCTX
#include "rootserver/ob_recovery_ls_service.h"//ObRecoveryLSHelper
#include "rootserver/ob_tenant_thread_helper.h"//get_zone_priority
#include "rootserver/ob_tenant_role_transition_service.h"//get_checkpoint_by_rpc
#include "storage/tx_storage/ob_ls_map.h"
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/tx_storage/ob_ls_handle.h" //ObLSHandle
@ -512,6 +513,7 @@ int ObLSServiceHelper::process_status_to_steady(
//no need do next, or ls is normal, no need process next
//or ls status not equal, no need to next
} else if (machine.ls_info_.ls_is_normal() && machine.ls_info_.get_ls_group_id() != machine.status_info_.ls_group_id_) {
// ***TODO(linqiucen.lqc) wait tenant_sync_scn > sys_ls_end_scn
if (OB_TMP_FAIL(process_alter_ls(machine.ls_id_, machine.status_info_.ls_group_id_,
machine.ls_info_.get_ls_group_id(), machine.status_info_.unit_group_id_,
tenant_ls_info, *GCTX.sql_proxy_))) {
@ -563,6 +565,80 @@ int ObLSServiceHelper::offline_ls(const uint64_t tenant_id,
return ret;
}
int ObLSServiceHelper::wait_all_tenants_user_ls_sync_scn(
common::hash::ObHashMap<uint64_t, share::SCN> &tenants_sys_ls_target_scn)
{
int ret = OB_SUCCESS;
share::SCN sys_ls_target_scn;
ObTimeoutCtx timeout_ctx;
const int64_t DEFAULT_TIMEOUT = GCONF.internal_sql_execute_timeout;
sys_ls_target_scn.set_invalid();
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(timeout_ctx, DEFAULT_TIMEOUT))) {
LOG_WARN("fail to get default timeout", KR(ret));
}
while(OB_SUCC(ret) && !tenants_sys_ls_target_scn.empty()) {
hash::ObHashMap<uint64_t, share::SCN>::iterator iter = tenants_sys_ls_target_scn.begin();
while (OB_SUCC(ret) && iter != tenants_sys_ls_target_scn.end()) {
sys_ls_target_scn.set_invalid();
const uint64_t tenant_id = iter->first;
sys_ls_target_scn = iter->second;
iter++;
if (OB_UNLIKELY(timeout_ctx.is_timeouted())) {
ret = OB_TIMEOUT;
LOG_WARN("wait tenant_sync_scn timeout", KR(ret), K(timeout_ctx));
} else if (OB_FAIL(check_if_need_wait_user_ls_sync_scn(tenant_id, sys_ls_target_scn))) {
if (OB_NEED_WAIT != ret) {
LOG_WARN("fail to check tenant_sync_scn", KR(ret), K(tenant_id), K(sys_ls_target_scn));
} else {
ret = OB_SUCCESS;
}
} else if (OB_FAIL(tenants_sys_ls_target_scn.erase_refactored(tenant_id))) {
LOG_WARN("fail to remove the tenant from tenants_sys_ls_target_scn", KR(ret), K(tenant_id));
}
}
if (OB_SUCC(ret) && OB_LIKELY(!tenants_sys_ls_target_scn.empty())) {
ob_usleep(200*1000);
}
}
return ret;
}
ERRSIM_POINT_DEF(ERRSIM_USER_LS_SYNC_SCN);
int ObLSServiceHelper::check_if_need_wait_user_ls_sync_scn(
const uint64_t tenant_id,
const share::SCN &sys_ls_target_scn)
{
int ret = OB_SUCCESS;
share::SCN user_ls_sync_scn;
user_ls_sync_scn.set_invalid();
ObAllTenantInfo tenant_info;
ObLSRecoveryStatOperator ls_recovery;
if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
} else if (!is_user_tenant(tenant_id)) {
// skip
} else if (OB_UNLIKELY(!sys_ls_target_scn.is_valid_and_not_min())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid sys_ls_target_scn", KR(ret), K(sys_ls_target_scn));
} else if (OB_FAIL(ls_recovery.get_user_ls_sync_scn(tenant_id, *GCTX.sql_proxy_, user_ls_sync_scn))) {
LOG_WARN("failed to get user scn", KR(ret), K(tenant_id));
} else {
bool is_errsim_opened = ERRSIM_USER_LS_SYNC_SCN ? true : false;
user_ls_sync_scn = is_errsim_opened ? SCN::scn_dec(sys_ls_target_scn) : user_ls_sync_scn;
// if ERRSIM_USER_LS_SYNC_SCN is true, user_ls_sync_scn will be always smaller than sys_ls_target_scn
// thus, the related operation will fail due to timeout.
if (user_ls_sync_scn < sys_ls_target_scn) {
ret = OB_NEED_WAIT;
LOG_WARN("wait some time, user_ls_sync_scn cannot be smaller than sys_ls_target_scn",
KR(ret), K(tenant_id), K(user_ls_sync_scn), K(sys_ls_target_scn), K(is_errsim_opened));
} else {
LOG_INFO("user_ls_sync_scn >= sys_ls_target_scn now", K(tenant_id), K(user_ls_sync_scn), K(sys_ls_target_scn));
}
}
return ret;
}
int ObLSServiceHelper::revision_to_equal_status_(const ObLSStatusMachineParameter &machine,
const share::ObTenantSwitchoverStatus &working_sw_status,
ObTenantLSInfo& tenant_ls_info)

View File

@ -247,6 +247,8 @@ public:
const uint64_t &old_unit_group_id,
ObTenantLSInfo& tenant_ls_info,
common::ObISQLClient &sql_proxy);
static int check_if_need_wait_user_ls_sync_scn(const uint64_t tenant_id, const share::SCN &sys_ls_target_scn);
static int wait_all_tenants_user_ls_sync_scn(common::hash::ObHashMap<uint64_t, share::SCN> &tenants_sys_ls_target_scn);
private:
static int revision_to_equal_status_(
const ObLSStatusMachineParameter &status_machine,

View File

@ -140,14 +140,22 @@ int ObPrimaryLSService::set_tenant_dropping_status_(
const common::ObIArray<ObLSStatusMachineParameter> &status_machine_array, int64_t &task_cnt)
{
int ret = OB_SUCCESS;
ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_ISNULL(tenant_info_loader)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant_info_loader is null", KR(ret), KP(tenant_info_loader));
} else {
share::ObLSAttrOperator ls_operator(MTL_ID(), GCTX.sql_proxy_);
const ObTenantSwitchoverStatus working_sw_status = share::NORMAL_SWITCHOVER_STATUS;
share::SCN tenant_sync_scn, sys_ls_target_scn;
tenant_sync_scn.set_invalid();
sys_ls_target_scn.set_invalid();
for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
const share::ObLSAttr &attr = status_machine_array.at(i).ls_info_;
const share::ObLSStatusInfo &info = status_machine_array.at(i).status_info_;
if (attr.get_ls_id().is_sys_ls()) {
if (attr.ls_is_normal()) {
if (OB_FAIL(ls_operator.update_ls_status(attr.get_ls_id(),
@ -157,34 +165,50 @@ int ObPrimaryLSService::set_tenant_dropping_status_(
task_cnt++;
LOG_INFO("[PRIMARY_LS_SERVICE] set sys ls to pre tenant dropping", KR(ret), K(attr));
}
if (OB_FAIL(ret)) {
} else if (!attr.ls_is_normal() && !attr.ls_is_pre_tenant_dropping()) {
// if attr is normal, it means that the status has been switched to pre_tenant_dropping in this round
// if attr is pre_tenant_dropping, it means that the status has been changed in a previous round
// the other attr is tenant_dropping, we should skip checking
} else if (OB_FAIL(ls_operator.get_pre_tenant_dropping_ora_rowscn(sys_ls_target_scn))) {
LOG_WARN("fail to get sys_ls_end_scn", KR(ret), K(tenant_id_));
} else if (OB_FAIL(tenant_info_loader->get_sync_scn(tenant_sync_scn))) {
LOG_WARN("get tenant_sync_scn failed", KR(ret));
} else if (OB_UNLIKELY(!tenant_sync_scn.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant_sync_scn not valid", KR(ret), K(tenant_sync_scn));
} else if (tenant_sync_scn < sys_ls_target_scn) {
ret = OB_NEED_WAIT;
LOG_WARN("wait some time, tenant_sync_scn cannot be smaller than sys_ls_target_scn", KR(ret),
K(tenant_id_), K(tenant_sync_scn), K(sys_ls_target_scn));
}
// find SYS LS
break;
}
}//end for set sys ls change to pre tenant dropping
//TODO check all user ls is large than sys ls pre_tenant_dropping
for (int64_t i = 0; OB_SUCC(ret) && i < status_machine_array.count() && !has_set_stop(); ++i) {
const share::ObLSAttr &attr = status_machine_array.at(i).ls_info_;
if (attr.get_ls_id().is_sys_ls()) {
//no need process sys ls
} else if (attr.ls_is_creating()) {
//drop the status,
if (OB_FAIL(ls_operator.delete_ls(attr.get_ls_id(), attr.get_ls_status(), working_sw_status))) {
LOG_WARN("failed to remove ls not normal", KR(ret), K(attr));
}
LOG_INFO("[PRIMARY_LS_SERVICE] tenant is dropping, delete ls in creating", KR(ret), K(attr));
if (OB_UNLIKELY(!attr.is_valid()) || attr.get_ls_id().is_sys_ls()) {
// invalid attr might happens if the ls is deleted in __all_ls table but still exists in __all_ls_status table
// no need process sys ls
} else if (!attr.ls_is_tenant_dropping()) {
task_cnt++;
} else {
//no matter the status is in normal or dropping
//may be the status in status info is created
if (!attr.ls_is_tenant_dropping()) {
task_cnt++;
if (attr.ls_is_creating()) {
if (OB_FAIL(ls_operator.delete_ls(attr.get_ls_id(), attr.get_ls_status(), working_sw_status))) {
LOG_WARN("failed to remove ls not normal", KR(ret), K(attr));
}
LOG_INFO("[PRIMARY_LS_SERVICE] tenant is dropping, delete ls in creating", KR(ret),
K(attr), K(tenant_sync_scn), K(sys_ls_target_scn));
} else if (!attr.ls_is_tenant_dropping()) {
//no matter the status is in normal or dropping
//may be the status in status info is created
if (OB_FAIL(ls_operator.update_ls_status(
attr.get_ls_id(), attr.get_ls_status(),
share::OB_LS_TENANT_DROPPING, working_sw_status))) {
LOG_WARN("failed to update ls status", KR(ret), K(attr));
}
LOG_INFO("[PRIMARY_LS_SERVICE] set ls to tenant dropping", KR(ret), K(attr), K(i));
LOG_INFO("[PRIMARY_LS_SERVICE] set ls to tenant dropping", KR(ret), K(attr), K(i),
K(tenant_sync_scn), K(sys_ls_target_scn));
}
}
}//end for

View File

@ -489,6 +489,19 @@ int ObTenantInfoLoader::get_replayable_scn(share::SCN &replayable_scn)
return ret;
}
int ObTenantInfoLoader::get_sync_scn(share::SCN &sync_scn)
{
int ret = OB_SUCCESS;
share::ObAllTenantInfo tenant_info;
sync_scn.set_invalid();
if (OB_FAIL(get_tenant_info(tenant_info))) {
LOG_WARN("failed to get tenant info", KR(ret));
} else {
sync_scn = tenant_info.get_sync_scn();
}
return ret;
}
int ObTenantInfoLoader::get_tenant_info(share::ObAllTenantInfo &tenant_info)
{
int ret = OB_SUCCESS;

View File

@ -121,6 +121,15 @@ public:
*/
int get_replayable_scn(share::SCN &replayable_scn);
/**
* @description:
* get tenant sync_scn.
* for SYS/META tenant: there isn't sync_scn
* for user tenant: get sync_scn from __all_tenant_info cache
* @param[out] sync_scn
*/
int get_sync_scn(share::SCN &sync_scn);
/**
* @description:
* get tenant is_standby_normal_status

View File

@ -13,6 +13,7 @@
#define USING_LOG_PREFIX RS
#include "rootserver/ob_upgrade_executor.h"
#include "rootserver/ob_ls_service_helper.h"
#include "observer/ob_server_struct.h"
#include "share/ob_global_stat_proxy.h"
#include "share/ob_cluster_event_history_table_operator.h"//CLUSTER_EVENT_INSTANCE
@ -470,27 +471,51 @@ int ObUpgradeExecutor::run_upgrade_begin_action_(
const common::ObIArray<uint64_t> &tenant_ids)
{
int ret = OB_SUCCESS;
common::hash::ObHashMap<uint64_t, share::SCN> tenants_sys_ls_target_scn;
lib::ObMemAttr attr(MTL_ID(), "UPGRADE");
const int BUCKET_NUM = hash::cal_next_prime(tenant_ids.count());
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (OB_FAIL(tenants_sys_ls_target_scn.create(BUCKET_NUM, attr))) {
LOG_WARN("fail to create tenants_sys_ls_target_scn", KR(ret));
} else {
int64_t backup_ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
share::SCN sys_ls_target_scn;
tenants_sys_ls_target_scn.clear();
for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
const uint64_t tenant_id = tenant_ids.at(i);
int64_t start_ts = ObTimeUtility::current_time();
sys_ls_target_scn.set_invalid();
FLOG_INFO("[UPGRADE] start to run upgrade begin action", K(tenant_id));
if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (OB_TMP_FAIL(run_upgrade_begin_action_(tenant_id))) {
LOG_WARN("fail to upgrade begin action", KR(ret), K(tenant_id));
backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
} else if (!is_user_tenant(tenant_id)) {
// skip
} else if (OB_FAIL(ObGlobalStatProxy::get_target_data_version_ora_rowscn(tenant_id, sys_ls_target_scn))) {
LOG_WARN("fail to get sys_ls_target_scn", KR(ret), K(tenant_id));
} else if (OB_FAIL(tenants_sys_ls_target_scn.set_refactored(
tenant_id,
sys_ls_target_scn,
0 /* flag: 0 shows that not cover existing object. */))) {
LOG_WARN("fail to push an element into tenants_sys_ls_target_scn", KR(ret), K(tenant_id),
K(sys_ls_target_scn));
}
FLOG_INFO("[UPGRADE] finish run upgrade begin action",
FLOG_INFO("[UPGRADE] finish run upgrade begin action step 1/2, write upgrade barrier log",
KR(ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
} // end for
ret = OB_SUCC(ret) ? backup_ret : ret;
if (OB_SUCC(ret)) {
int64_t start_ts_step2 = ObTimeUtility::current_time();
ret = ObLSServiceHelper::wait_all_tenants_user_ls_sync_scn(tenants_sys_ls_target_scn);
FLOG_INFO("[UPGRADE] finish run upgrade begin action step 2/2, wait all tenants' sync_scn",
KR(ret), "cost", ObTimeUtility::current_time() - start_ts_step2);
}
}
return ret;
}

View File

@ -617,6 +617,24 @@ int ObLSAttrOperator::get_ls_attr(const ObLSID &id,
return ret;
}
int ObLSAttrOperator::get_pre_tenant_dropping_ora_rowscn(share::SCN &pre_tenant_dropping_ora_rowscn)
{
int ret = OB_SUCCESS;
ObSqlString sql;
pre_tenant_dropping_ora_rowscn.set_invalid();
if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
} else if (OB_FAIL(sql.assign_fmt(
"SELECT ORA_ROWSCN FROM %s WHERE ls_id = %ld and status = '%s'",
OB_ALL_LS_TNAME, SYS_LS.id(), ls_status_to_str(OB_LS_PRE_TENANT_DROPPING)))) {
LOG_WARN("assign sql failed", KR(ret));
} else if (OB_FAIL(ObShareUtil::get_ora_rowscn(*GCTX.sql_proxy_, tenant_id_, sql, pre_tenant_dropping_ora_rowscn))) {
LOG_WARN("fail to get target_data_version_ora_rowscn", KR(ret), K(pre_tenant_dropping_ora_rowscn), K(sql));
}
return ret;
}
int ObLSAttrOperator::get_duplicate_ls_attr(const bool for_update,
common::ObISQLClient &client,
ObLSAttr &ls_attr,

View File

@ -336,6 +336,7 @@ public:
static ObLSOperationType get_ls_operation_by_status(const ObLSStatus &ls_status);
int get_ls_attr(const ObLSID &id, const bool for_update, common::ObISQLClient &client,
ObLSAttr &ls_attr, bool only_existing_ls = true);
int get_pre_tenant_dropping_ora_rowscn(share::SCN &pre_tenant_dropping_ora_rowscn);
/*
* description: get all ls with snapshot
* @param[in] read_scn:the snapshot of read_version

View File

@ -265,6 +265,26 @@ int ObGlobalStatProxy::get_target_data_version(
return ret;
}
int ObGlobalStatProxy::get_target_data_version_ora_rowscn(
const uint64_t tenant_id,
share::SCN &target_data_version_ora_rowscn)
{
int ret = OB_SUCCESS;
ObSqlString sql;
target_data_version_ora_rowscn.set_invalid();
if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
} else if (OB_FAIL(sql.assign_fmt(
"SELECT ORA_ROWSCN FROM %s WHERE TABLE_NAME = '__all_global_stat' AND COLUMN_NAME"
" = 'target_data_version'", OB_ALL_CORE_TABLE_TNAME))) {
LOG_WARN("assign sql failed", KR(ret));
} else if (OB_FAIL(ObShareUtil::get_ora_rowscn(*GCTX.sql_proxy_, tenant_id, sql, target_data_version_ora_rowscn))) {
LOG_WARN("fail to get target_data_version_ora_rowscn", KR(ret), K(tenant_id), K(sql));
}
return ret;
}
int ObGlobalStatProxy::inc_rootservice_epoch()
{
int ret = OB_SUCCESS;

View File

@ -74,6 +74,9 @@ public:
int update_current_data_version(const uint64_t current_data_version);
int get_current_data_version(uint64_t &current_data_version);
static int get_target_data_version_ora_rowscn(
const uint64_t tenant_id,
share::SCN &target_data_version_ora_rowscn);
int update_target_data_version(const uint64_t target_data_version);
int get_target_data_version(const bool for_update, uint64_t &target_data_version);

View File

@ -272,6 +272,42 @@ int ObShareUtil::parse_all_server_list(
return ret;
}
int ObShareUtil::get_ora_rowscn(
common::ObISQLClient &client,
const uint64_t tenant_id,
const ObSqlString &sql,
SCN &ora_rowscn)
{
int ret = OB_SUCCESS;
uint64_t ora_rowscn_val = 0;
ora_rowscn.set_invalid();
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
if (OB_FAIL(client.read(res, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", KR(ret), K(sql));
} else if (NULL == (result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get sql result", KR(ret));
} else if (OB_FAIL(result->next())) {
LOG_WARN("fail to get next row", KR(ret));
} else {
EXTRACT_INT_FIELD_MYSQL(*result, "ORA_ROWSCN", ora_rowscn_val, int64_t);
if (OB_FAIL(ora_rowscn.convert_for_inner_table_field(ora_rowscn_val))) {
LOG_WARN("fail to convert val to SCN", KR(ret), K(ora_rowscn_val));
}
}
int tmp_ret = OB_SUCCESS;
if (OB_FAIL(ret)) {
//nothing todo
} else if (OB_ITER_END != (tmp_ret = result->next())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get more row than one", KR(ret), KR(tmp_ret));
}
}
return ret;
}
bool ObShareUtil::is_tenant_enable_rebalance(const uint64_t tenant_id)
{
bool bret = false;

View File

@ -13,6 +13,7 @@
#ifndef OCEANBASE_SHARE_OB_SHARE_UTIL_H_
#define OCEANBASE_SHARE_OB_SHARE_UTIL_H_
#include "share/ob_define.h"
#include "share/scn.h"
namespace oceanbase
{
namespace common
@ -67,6 +68,15 @@ public:
static int parse_all_server_list(
const ObArray<ObAddr> &excluded_server_list,
ObArray<ObAddr> &config_all_server_list);
// get ora_rowscn from one row
// @params[in]: tenant_id, the table owner
// @params[in]: sql, the sql should be "select ORA_ROWSCN from xxx", where count() is 1
// @params[out]: the ORA_ROWSCN
static int get_ora_rowscn(
common::ObISQLClient &client,
const uint64_t tenant_id,
const ObSqlString &sql,
SCN &ora_rowscn);
static bool is_tenant_enable_rebalance(const uint64_t tenant_id);
};