fix some recover table bugs
This commit is contained in:
parent
8f7ed1a53f
commit
47089d3931
@ -533,6 +533,12 @@ int ObMPConnect::load_privilege_info(ObSQLSessionInfo &session)
|
||||
tenant_name_ = ObString::make_string(OB_SYS_TENANT_NAME);
|
||||
OB_LOG(INFO, "no tenant name set, use default tenant name", K_(tenant_name));
|
||||
}
|
||||
|
||||
if (OB_NOT_NULL(tenant_name_.find('$'))) {
|
||||
ret = OB_ERR_INVALID_TENANT_NAME;
|
||||
LOG_WARN("invalid tenant name. “$” is not allowed in tenant name.", K(ret), K_(tenant_name));
|
||||
}
|
||||
|
||||
//在oracle租户下需要转换db_name和user_name,处理双引号和大小写
|
||||
//在mysql租户下不会作任何处理,只简单拷贝下~
|
||||
if (OB_SUCC(ret)) {
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "share/ob_rpc_struct.h"
|
||||
#include "share/longops_mgr/ob_longops_mgr.h"
|
||||
#include "share/scheduler/ob_sys_task_stat.h"
|
||||
#include "share/restore/ob_import_util.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -1747,6 +1748,7 @@ int ObDDLScheduler::recover_task()
|
||||
int64_t tenant_schema_version = 0;
|
||||
int64_t table_task_status = 0;
|
||||
int64_t execution_id = -1;
|
||||
bool is_recover_table_aux_tenant = false;
|
||||
ObMySQLTransaction trans;
|
||||
if (OB_FAIL(schema_service.get_tenant_schema_version(cur_record.tenant_id_, tenant_schema_version))) {
|
||||
LOG_WARN("failed to get tenant schema version", K(ret), K(cur_record));
|
||||
@ -1755,6 +1757,12 @@ int ObDDLScheduler::recover_task()
|
||||
} else if (tenant_schema_version < cur_record.schema_version_) {
|
||||
// schema has not publish, by pass now
|
||||
LOG_INFO("skip schedule ddl task, because tenant schema version too old", K(tenant_schema_version), K(cur_record));
|
||||
} else if (OB_FAIL(ObImportTableUtil::check_is_recover_table_aux_tenant(schema_service,
|
||||
cur_record.tenant_id_,
|
||||
is_recover_table_aux_tenant))) {
|
||||
LOG_WARN("failed to check is recover table aux tenant", K(ret), K(cur_record));
|
||||
} else if (is_recover_table_aux_tenant) {
|
||||
LOG_INFO("tenant is recover table aux tenant, skip schedule ddl task", K(cur_record));
|
||||
} else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), cur_record.tenant_id_))) {
|
||||
LOG_WARN("start transaction failed", K(ret));
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans,
|
||||
|
@ -321,16 +321,9 @@ int ObDDLService::get_tenant_schema_guard_with_version_in_inner_table(
|
||||
int ret = OB_SUCCESS;
|
||||
src_tenant_schema_guard = nullptr;
|
||||
dst_tenant_schema_guard = nullptr;
|
||||
bool is_standby = false;
|
||||
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == src_tenant_id || OB_INVALID_TENANT_ID == dst_tenant_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tenant_id", K(ret), K(src_tenant_id), K(dst_tenant_id));
|
||||
} else if (src_tenant_id != dst_tenant_id
|
||||
&& OB_FAIL(ObAllTenantInfoProxy::is_standby_tenant(sql_proxy_, src_tenant_id, is_standby))) {
|
||||
LOG_WARN("check tenant standby failed", K(ret), K(src_tenant_id), K(dst_tenant_id));
|
||||
} else if (src_tenant_id != dst_tenant_id && !is_standby) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arg, src tenant should be standby", K(ret), K(src_tenant_id), K(dst_tenant_id));
|
||||
} else if (OB_FAIL(get_tenant_schema_guard_with_version_in_inner_table(dst_tenant_id, hold_buf_dst_tenant_schema_guard))) {
|
||||
LOG_WARN("get tenant schema guard failed", K(dst_tenant_id));
|
||||
} else if (src_tenant_id == dst_tenant_id) {
|
||||
|
@ -215,6 +215,17 @@ int ObImportTableJobScheduler::gen_import_table_task_(share::ObImportTableJob &j
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObImportTableJobStatus next_status(ObImportTableJobStatus::IMPORT_FINISH);
|
||||
job.set_end_ts(ObTimeUtility::current_time());
|
||||
|
||||
if (!job.get_result().is_comment_setted()) {
|
||||
share::ObTaskId trace_id(*ObCurTraceId::get_trace_id());
|
||||
ObImportResult result;
|
||||
if (OB_TMP_FAIL(result.set_result(ret, trace_id, GCONF.self_addr_))) {
|
||||
LOG_WARN("failed to set result", K(ret));
|
||||
} else {
|
||||
job.set_result(result);
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_TMP_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
|
||||
LOG_WARN("failed to advance status", K(ret));
|
||||
}
|
||||
|
@ -287,9 +287,8 @@ int ObImportTableTaskGenerator::fill_import_task_(
|
||||
share::ObImportTableTask &import_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!table_schema.is_user_table()) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("import not user table is not allowed");
|
||||
if (OB_FAIL(check_src_table_schema_(import_job, table_schema, table_item))) {
|
||||
LOG_WARN("failed to check src table schema", K(ret));
|
||||
} else if (OB_FAIL(import_task.set_src_database(table_item.database_name_))) {
|
||||
LOG_WARN("failed to set src database name", K(ret));
|
||||
} else if (OB_FAIL(import_task.set_src_table(table_item.table_name_))) {
|
||||
@ -311,6 +310,50 @@ int ObImportTableTaskGenerator::fill_import_task_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObImportTableTaskGenerator::check_src_table_schema_(
|
||||
share::ObImportTableJob &import_job,
|
||||
const share::schema::ObTableSchema &table_schema,
|
||||
const share::ObImportTableItem &table_item)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (!table_schema.is_user_table()) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("import not user table is not allowed", K(ret), K(table_item));
|
||||
ObImportResult::Comment comment;
|
||||
if (OB_TMP_FAIL(databuff_printf(comment.ptr(), comment.capacity(), "import table %.*s.%.*s is not user table",
|
||||
table_item.database_name_.length(), table_item.database_name_.ptr(),
|
||||
table_item.table_name_.length(), table_item.table_name_.ptr()))) {
|
||||
LOG_WARN("failed to databuff printf", K(ret));
|
||||
} else {
|
||||
import_job.get_result().set_result(false/*failed*/, comment);
|
||||
}
|
||||
} else if (table_schema.is_in_recyclebin()) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("import table in recyclebin is not allowed", K(ret), K(table_item));
|
||||
ObImportResult::Comment comment;
|
||||
if (OB_TMP_FAIL(databuff_printf(comment.ptr(), comment.capacity(), "import table %.*s.%.*s is in recyclebin",
|
||||
table_item.database_name_.length(), table_item.database_name_.ptr(),
|
||||
table_item.table_name_.length(), table_item.table_name_.ptr()))) {
|
||||
LOG_WARN("failed to databuff printf", K(ret));
|
||||
} else {
|
||||
import_job.get_result().set_result(false/*failed*/, comment);
|
||||
}
|
||||
} else if (table_schema.is_user_hidden_table()) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("import hidden user table is not allowed", K(ret), K(table_item));
|
||||
ObImportResult::Comment comment;
|
||||
if (OB_TMP_FAIL(databuff_printf(comment.ptr(), comment.capacity(), "import table %.*s.%.*s is hidden table",
|
||||
table_item.database_name_.length(), table_item.database_name_.ptr(),
|
||||
table_item.table_name_.length(), table_item.table_name_.ptr()))) {
|
||||
LOG_WARN("failed to databuff printf", K(ret));
|
||||
} else {
|
||||
import_job.get_result().set_result(false/*failed*/, comment);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObImportTableTaskGenerator::check_target_schema_(
|
||||
share::ObImportTableJob &import_job,
|
||||
const share::ObImportTableTask &task)
|
||||
|
@ -94,6 +94,10 @@ private:
|
||||
const share::ObImportTableItem &table_item,
|
||||
const share::ObImportTableItem &remap_table_item,
|
||||
share::ObImportTableTask &import_task);
|
||||
int check_src_table_schema_(
|
||||
share::ObImportTableJob &import_job,
|
||||
const share::schema::ObTableSchema &table_schema,
|
||||
const share::ObImportTableItem &table_item);
|
||||
int fill_common_para_(
|
||||
const share::ObImportTableJob &import_job,
|
||||
const share::schema::ObTableSchema &table_schema,
|
||||
|
@ -530,6 +530,8 @@ int ObRecoverTableJobScheduler::restore_aux_tenant_(share::ObRecoverTableJob &jo
|
||||
job.get_result().set_result(false, restore_history_info.comment_);
|
||||
} else if (OB_FAIL(check_aux_tenant_(job, aux_tenant_id))) {
|
||||
LOG_WARN("failed to check aux tenant", K(ret), K(aux_tenant_id));
|
||||
} else if (OB_FAIL(failover_to_leader_(job, aux_tenant_id))) {
|
||||
LOG_WARN("failed to failover to leader", K(ret));
|
||||
}
|
||||
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -540,6 +542,24 @@ int ObRecoverTableJobScheduler::restore_aux_tenant_(share::ObRecoverTableJob &jo
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRecoverTableJobScheduler::failover_to_leader_(
|
||||
share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObAddr leader;
|
||||
obrpc::ObSwitchTenantArg switch_tenant_arg;
|
||||
MTL_SWITCH(OB_SYS_TENANT_ID) {
|
||||
if (OB_FAIL(switch_tenant_arg.init(aux_tenant_id, obrpc::ObSwitchTenantArg::OpType::FAILOVER_TO_PRIMARY, "", false))) {
|
||||
LOG_WARN("failed to init switch tenant arg", K(ret), K(aux_tenant_id));
|
||||
} else if (OB_FAIL(OB_PRIMARY_STANDBY_SERVICE.switch_tenant(switch_tenant_arg))) {
|
||||
LOG_WARN("failed to switch_tenant", KR(ret), K(switch_tenant_arg));
|
||||
} else {
|
||||
LOG_INFO("[RECOVER_TABLE]succeed to switch aux tenant role to primary", K(aux_tenant_id), K(job));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRecoverTableJobScheduler::check_aux_tenant_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -71,6 +71,7 @@ private:
|
||||
int user_prepare_(share::ObRecoverTableJob &job);
|
||||
int restore_aux_tenant_(share::ObRecoverTableJob &job);
|
||||
int check_aux_tenant_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id);
|
||||
int failover_to_leader_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id);
|
||||
int check_tenant_compatibility(
|
||||
share::schema::ObSchemaGetterGuard &aux_tenant_guard,
|
||||
share::schema::ObSchemaGetterGuard &recover_tenant_guard,
|
||||
|
@ -152,4 +152,53 @@ int ObImportTableUtil::get_tenant_name_case_mode(const uint64_t tenant_id, ObNam
|
||||
LOG_WARN("faield to get tenant schema guard", K(ret), K(tenant_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
int ObImportTableUtil::check_is_recover_table_aux_tenant(
|
||||
share::schema::ObMultiVersionSchemaService &schema_service,
|
||||
const uint64_t tenant_id,
|
||||
bool &is_recover_table_aux_tenant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
schema::ObSchemaGetterGuard guard;
|
||||
const schema::ObTenantSchema *tenant_schema = nullptr;
|
||||
is_recover_table_aux_tenant = false;
|
||||
if (!is_valid_tenant_id(tenant_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tenant id", K(ret), K(tenant_id));
|
||||
} else if (!is_user_tenant(tenant_id)) { // skip sys tenant and meta tenant
|
||||
} else if (OB_FAIL(schema_service.get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
|
||||
LOG_WARN("failed to get tenant schema guard", K(tenant_id), K(ret));
|
||||
} else if (OB_FAIL(guard.get_tenant_info(tenant_id, tenant_schema))) {
|
||||
LOG_WARN("failed to get tenant info", K(ret), K(tenant_id));
|
||||
} else if (OB_ISNULL(tenant_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tenant schema must not be nullptr", K(ret));
|
||||
} else if (OB_FAIL(check_is_recover_table_aux_tenant_name(tenant_schema->get_tenant_name_str(),
|
||||
is_recover_table_aux_tenant))) {
|
||||
LOG_WARN("failed to check is recover table aux tenant name", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObImportTableUtil::check_is_recover_table_aux_tenant_name(
|
||||
const ObString &tenant_name,
|
||||
bool &is_recover_table_aux_tenant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t timestamp = 0;
|
||||
is_recover_table_aux_tenant = false;
|
||||
char buf[OB_MAX_TENANT_NAME_LENGTH] = "";
|
||||
const ObString AUX_TENANT_NAME_PREFIX("AUX_RECOVER$");
|
||||
if (tenant_name.length() > OB_MAX_TENANT_NAME_LENGTH || tenant_name.empty()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tenant name", K(ret), K(tenant_name));
|
||||
} else if (!tenant_name.prefix_match(AUX_TENANT_NAME_PREFIX)) {
|
||||
// not recover table aux tenant, skip
|
||||
} else if (1 != sscanf(tenant_name.ptr(), "AUX_RECOVER$%ld%s", ×tamp, buf)) {
|
||||
// not recover table aux tenant, skip
|
||||
} else {
|
||||
is_recover_table_aux_tenant = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -33,6 +33,9 @@ static int check_tablegroup_exist(share::schema::ObMultiVersionSchemaService &sc
|
||||
static int check_tablespace_exist(share::schema::ObMultiVersionSchemaService &schema_service,
|
||||
uint64_t tenant_id, const ObString &tablespace, bool &is_exist);
|
||||
static int get_tenant_name_case_mode(const uint64_t tenant_id, ObNameCaseMode &name_case_mode);
|
||||
static int check_is_recover_table_aux_tenant(
|
||||
share::schema::ObMultiVersionSchemaService &schema_service, const uint64_t tenant_id, bool &is_recover_table_aux_tenant);
|
||||
static int check_is_recover_table_aux_tenant_name(const ObString &tenant_name, bool &is_recover_table_aux_tenant);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <time.h>
|
||||
#include "share/ob_errno.h"
|
||||
#include "share/restore/ob_import_arg.h"
|
||||
#include "share/restore/ob_import_util.h"
|
||||
|
||||
|
||||
using namespace oceanbase;
|
||||
@ -458,6 +459,23 @@ TEST_F(ImportTableTest, test_namecase_mode)
|
||||
ASSERT_EQ(OB_BACKUP_CONFLICT_VALUE, arg.add_remap_database(remap_db2));
|
||||
}
|
||||
|
||||
TEST_F(ImportTableTest, test_check_aux_tenant)
|
||||
{
|
||||
const ObString tenant_name_1("AUX_RECOVER$1694673215667468");
|
||||
bool is_recover_table_aux_tenant = false;
|
||||
ASSERT_EQ(OB_SUCCESS, ObImportTableUtil::check_is_recover_table_aux_tenant_name(tenant_name_1, is_recover_table_aux_tenant));
|
||||
ASSERT_EQ(true, is_recover_table_aux_tenant);
|
||||
const ObString tenant_name_2("AUX_RECOVER$1694673215667468aaa");
|
||||
ASSERT_EQ(OB_SUCCESS, ObImportTableUtil::check_is_recover_table_aux_tenant_name(tenant_name_2, is_recover_table_aux_tenant));
|
||||
ASSERT_EQ(false, is_recover_table_aux_tenant);
|
||||
const ObString tenant_name_3("AUX_RECOVER1694673215667468aaa");
|
||||
ASSERT_EQ(OB_SUCCESS, ObImportTableUtil::check_is_recover_table_aux_tenant_name(tenant_name_3, is_recover_table_aux_tenant));
|
||||
ASSERT_EQ(false, is_recover_table_aux_tenant);
|
||||
const ObString tenant_name_4("AUX_RECOVER$aaa");
|
||||
ASSERT_EQ(OB_SUCCESS, ObImportTableUtil::check_is_recover_table_aux_tenant_name(tenant_name_4, is_recover_table_aux_tenant));
|
||||
ASSERT_EQ(false, is_recover_table_aux_tenant);
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user