[FEAT MERGE] New upgrade framework for 4.x
This commit is contained in:
@ -34,6 +34,10 @@
|
||||
#include "share/schema/ob_multi_version_schema_service.h" //ObMultiSchemaService
|
||||
#include "share/restore/ob_physical_restore_info.h" //restore_status
|
||||
#include "share/restore/ob_physical_restore_table_operator.h"//ObPhysicalRestoreTableOperator
|
||||
#include "share/ob_primary_standby_service.h" // ObPrimaryStandbyService
|
||||
#include "share/ob_standby_upgrade.h" // ObStandbyUpgrade
|
||||
#include "share/ob_upgrade_utils.h" // ObUpgradeChecker
|
||||
#include "share/ob_global_stat_proxy.h" // ObGlobalStatProxy
|
||||
#include "storage/tx/ob_tx_log.h" //ObTxLogHeader
|
||||
#include "storage/tx_storage/ob_ls_service.h" //ObLSService
|
||||
#include "storage/tx_storage/ob_ls_handle.h" //ObLSHandle
|
||||
@ -132,6 +136,10 @@ void ObRecoveryLSService::do_work()
|
||||
start_scn.reset();
|
||||
LOG_WARN("failed to do wowrk", KR(ret), K(start_scn));
|
||||
}
|
||||
|
||||
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { // every 10 second
|
||||
(void)try_tenant_upgrade_end_();
|
||||
}
|
||||
}
|
||||
LOG_INFO("[LS_RECOVERY] finish one round", KR(ret), KR(tmp_ret), K(start_scn), K(thread_idx));
|
||||
idle(idle_time_us);
|
||||
@ -276,7 +284,11 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC
|
||||
commit_log.get_multi_source_data();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < source_data.count(); ++i) {
|
||||
const ObTxBufferNode &node = source_data.at(i);
|
||||
if (ObTxDataSourceType::LS_TABLE != node.get_data_source_type()) {
|
||||
if (ObTxDataSourceType::STANDBY_UPGRADE == node.get_data_source_type()) {
|
||||
if (OB_FAIL(process_upgrade_log_(node))) {
|
||||
LOG_WARN("failed to process_upgrade_log_", KR(ret), K(node));
|
||||
}
|
||||
} else if (ObTxDataSourceType::LS_TABLE != node.get_data_source_type()) {
|
||||
// nothing
|
||||
} else if (has_operation) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -311,6 +323,145 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRecoveryLSService::process_upgrade_log_(const ObTxBufferNode &node)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t standby_data_version = 0;
|
||||
|
||||
if (!node.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("node is invalid", KR(ret), K(node));
|
||||
} else {
|
||||
ObStandbyUpgrade primary_data_version;
|
||||
int64_t pos = 0;
|
||||
if (OB_FAIL(primary_data_version.deserialize(node.get_data_buf().ptr(), node.get_data_buf().length(), pos))) {
|
||||
LOG_WARN("failed to deserialize", KR(ret), K(node), KPHEX(node.get_data_buf().ptr(), node.get_data_buf().length()));
|
||||
} else if (OB_UNLIKELY(pos > node.get_data_buf().length())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to get primary_data_version", KR(ret), K(pos), K(node.get_data_buf().length()));
|
||||
} else {
|
||||
LOG_INFO("get primary_data_version", K(primary_data_version));
|
||||
if (!primary_data_version.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("primary_data_version not valid", KR(ret), K(primary_data_version));
|
||||
} else if (OB_FAIL(ObUpgradeChecker::get_data_version_by_cluster_version(
|
||||
GET_MIN_CLUSTER_VERSION(),
|
||||
standby_data_version))) {
|
||||
LOG_WARN("failed to get_data_version_by_cluster_version", KR(ret), K(GET_MIN_CLUSTER_VERSION()));
|
||||
} else if (primary_data_version.get_data_version() > standby_data_version) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(30 * 60 * 1000 * 1000)) { // 30min
|
||||
LOG_ERROR("standby version is not new enough to recover primary clog", KR(ret),
|
||||
K(primary_data_version), K(standby_data_version));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRecoveryLSService::get_min_data_version_(uint64_t &compatible)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
compatible = 0;
|
||||
if (OB_ISNULL(proxy_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("sql proxy is null", KR(ret));
|
||||
} else {
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
|
||||
uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
|
||||
ObSqlString sql;
|
||||
if (OB_FAIL(sql.assign_fmt("select value from %s where tenant_id = '%lu' and name = 'compatible' ",
|
||||
OB_TENANT_PARAMETER_TNAME, tenant_id_))) {
|
||||
LOG_WARN("fail to generate sql", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(proxy_->read(result, exec_tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("read config from __tenant_parameter failed",
|
||||
KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(result.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("config result is null", KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
|
||||
} else if (OB_FAIL(result.get_result()->next())) {
|
||||
LOG_WARN("get result next failed", KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(result.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result is null", KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
|
||||
} else {
|
||||
ObString compatible_str;
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*result.get_result(), "compatible", compatible_str);
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to get result", KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
|
||||
} else if (OB_FAIL(ObClusterVersion::get_version(compatible_str, compatible))) {
|
||||
LOG_WARN("parse version failed", KR(ret), K(compatible_str));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObRecoveryLSService::try_tenant_upgrade_end_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t min_data_version = 0;
|
||||
obrpc::ObCommonRpcProxy *rs_rpc_proxy_ = GCTX.rs_rpc_proxy_;
|
||||
if (OB_ISNULL(proxy_) || !is_user_tenant(tenant_id_) || OB_ISNULL(rs_rpc_proxy_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("invalid argument", KR(ret), KP(proxy_), K_(tenant_id), KP(rs_rpc_proxy_));
|
||||
} else if (OB_FAIL(get_min_data_version_(min_data_version))) {
|
||||
LOG_WARN("fail to get min data version", KR(ret), K_(tenant_id));
|
||||
} else if (min_data_version == DATA_CURRENT_VERSION) {
|
||||
// already upgrade end
|
||||
} else {
|
||||
ObGlobalStatProxy proxy(*proxy_, tenant_id_);
|
||||
uint64_t target_data_version = 0;
|
||||
uint64_t current_data_version = 0;
|
||||
if (OB_FAIL(proxy.get_target_data_version(false /* for_update */, target_data_version))) {
|
||||
LOG_WARN("fail to get target data version", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(proxy.get_current_data_version(current_data_version))) {
|
||||
LOG_WARN("fail to get current data version", KR(ret), K_(tenant_id));
|
||||
} else if (!(target_data_version == current_data_version
|
||||
&& target_data_version != min_data_version
|
||||
&& min_data_version <= DATA_CURRENT_VERSION)) {
|
||||
ret = EAGAIN;
|
||||
LOG_WARN("data_version not match, run upgrade end later",
|
||||
KR(ret), K_(tenant_id), K(target_data_version), K(current_data_version),
|
||||
K(DATA_CURRENT_VERSION));
|
||||
} else {
|
||||
HEAP_VAR(obrpc::ObAdminSetConfigItem, item) {
|
||||
ObSchemaGetterGuard guard;
|
||||
const ObSimpleTenantSchema *tenant = NULL;
|
||||
obrpc::ObAdminSetConfigArg arg;
|
||||
item.exec_tenant_id_ = OB_SYS_TENANT_ID;
|
||||
const int64_t timeout = GCONF.internal_sql_execute_timeout;
|
||||
int64_t pos = ObClusterVersion::print_version_str(
|
||||
item.value_.ptr(), item.value_.capacity(),
|
||||
current_data_version);
|
||||
if (pos <= 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("current_data_version is invalid",
|
||||
KR(ret), K_(tenant_id), K(current_data_version));
|
||||
} else if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
|
||||
LOG_WARN("fail to get schema guard", KR(ret));
|
||||
} else if (OB_FAIL(guard.get_tenant_info(tenant_id_, tenant))) {
|
||||
LOG_WARN("fail to get tenant info", KR(ret), K_(tenant_id));
|
||||
} else if (OB_ISNULL(tenant)) {
|
||||
ret = OB_TENANT_NOT_EXIST;
|
||||
LOG_WARN("tenant not exist", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(item.tenant_name_.assign(tenant->get_tenant_name()))) {
|
||||
LOG_WARN("fail to assign tenant name", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(item.name_.assign("compatible"))) {
|
||||
LOG_WARN("fail to assign config name", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(arg.items_.push_back(item))) {
|
||||
LOG_WARN("fail to push back item", KR(ret), K(item));
|
||||
} else if (OB_FAIL(rs_rpc_proxy_->timeout(timeout).admin_set_config(arg))) {
|
||||
LOG_WARN("fail to set config", KR(ret), K(arg), K(timeout));
|
||||
}
|
||||
} // end HEAP_VAR
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int ObRecoveryLSService::process_gc_log_(logservice::ObGCLSLog &gc_log, const SCN &sync_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
Reference in New Issue
Block a user