From acc97de57f32c759d861fd4fd7c20ab25c1efcf2 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 4 Nov 2022 03:06:24 +0000 Subject: [PATCH] [UPGRADE] UPGRADE_SYSTEM_VARIABLE/UPGRADE_SYSTEM_TABLE CMD --- src/observer/ob_srv_xlator_rootserver.cpp | 1 + src/rootserver/ob_ddl_operator.cpp | 56 ++- src/rootserver/ob_ddl_operator.h | 7 + src/rootserver/ob_ddl_service.cpp | 236 ++++++++++- src/rootserver/ob_ddl_service.h | 12 + src/rootserver/ob_root_inspection.cpp | 187 +++++--- src/rootserver/ob_root_inspection.h | 31 +- src/rootserver/ob_root_service.cpp | 40 +- src/rootserver/ob_root_service.h | 3 +- src/rootserver/ob_rs_job_table_operator.cpp | 4 +- src/rootserver/ob_rs_job_table_operator.h | 4 +- src/rootserver/ob_rs_rpc_processor.h | 20 +- src/rootserver/ob_system_admin_util.cpp | 51 ++- src/rootserver/ob_upgrade_executor.cpp | 401 +++++++++++++----- src/rootserver/ob_upgrade_executor.h | 27 +- src/share/ob_common_rpc_proxy.h | 1 + src/share/ob_rpc_struct.cpp | 38 +- src/share/ob_rpc_struct.h | 26 +- src/share/ob_upgrade_utils.cpp | 320 +------------- src/share/ob_upgrade_utils.h | 33 +- src/share/schema/ob_column_schema.cpp | 6 + src/share/schema/ob_column_schema.h | 2 + .../schema/ob_schema_service_sql_impl.cpp | 2 +- src/share/schema/ob_schema_struct.cpp | 11 + src/share/schema/ob_schema_struct.h | 1 + src/share/schema/ob_schema_utils.cpp | 29 +- src/share/schema/ob_table_schema.cpp | 24 +- src/share/schema/ob_table_schema.h | 2 +- src/share/schema/ob_table_sql_service.cpp | 14 +- src/share/schema/ob_table_sql_service.h | 3 +- .../resolver/cmd/ob_alter_system_resolver.cpp | 22 +- tools/upgrade/special_upgrade_action_post.py | 2 +- tools/upgrade/upgrade_post.py | 2 +- tools/upgrade/upgrade_pre.py | 2 +- 34 files changed, 1007 insertions(+), 613 deletions(-) diff --git a/src/observer/ob_srv_xlator_rootserver.cpp b/src/observer/ob_srv_xlator_rootserver.cpp index db927b8d6c..1797898f95 100644 --- a/src/observer/ob_srv_xlator_rootserver.cpp +++ b/src/observer/ob_srv_xlator_rootserver.cpp @@ -209,6 +209,7 @@ void oceanbase::observer::init_srv_xlator_for_rootserver(ObSrvRpcXlator *xlator) RPC_PROCESSOR(rootserver::ObRpcRunJobP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcAdminRefreshIOCalibrationP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcRunUpgradeJobP, *gctx_.root_service_); + RPC_PROCESSOR(rootserver::ObRpcUpgradeTableSchemaP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcAdminFlushCacheP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcAdminUpgradeCmdP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcAdminRollingUpgradeCmdP, *gctx_.root_service_); diff --git a/src/rootserver/ob_ddl_operator.cpp b/src/rootserver/ob_ddl_operator.cpp index 65b4368234..bf4d52da8e 100644 --- a/src/rootserver/ob_ddl_operator.cpp +++ b/src/rootserver/ob_ddl_operator.cpp @@ -2685,7 +2685,8 @@ int ObDDLOperator::insert_single_column(ObMySQLTransaction &trans, LOG_WARN("fail to gen new schema_version", K(ret), K(tenant_id)); } else if (FALSE_IT(new_column.set_schema_version(new_schema_version))) { //do nothing - } else if (OB_FAIL(schema_service->get_table_sql_service().insert_single_column(trans, new_table_schema, new_column))) { + } else if (OB_FAIL(schema_service->get_table_sql_service().insert_single_column( + trans, new_table_schema, new_column, true))) { LOG_WARN("insert single column failed", K(ret)); } return ret; @@ -3853,6 +3854,59 @@ int ObDDLOperator::update_single_column(common::ObMySQLTransaction &trans, return ret; } +int ObDDLOperator::batch_update_system_table_columns( + common::ObMySQLTransaction &trans, + const share::schema::ObTableSchema &orig_table_schema, + share::schema::ObTableSchema &new_table_schema, + const common::ObIArray &add_column_ids, + const common::ObIArray &alter_column_ids, + const common::ObString *ddl_stmt_str/*=NULL*/) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = new_table_schema.get_tenant_id(); + const uint64_t table_id = new_table_schema.get_table_id(); + int64_t new_schema_version = OB_INVALID_VERSION; + ObSchemaService *schema_service_impl = schema_service_.get_schema_service(); + if (OB_ISNULL(schema_service_impl)) { + ret = OB_ERR_SYS; + LOG_WARN("schema_service_impl must not null", KR(ret)); + } else if (OB_FAIL(schema_service_.gen_new_schema_version(tenant_id, new_schema_version))) { + LOG_WARN("fail to gen new schema_version", KR(ret), K(tenant_id)); + } else { + (void) new_table_schema.set_schema_version(new_schema_version); + ObColumnSchemaV2 *new_column = NULL; + for (int64_t i = 0; OB_SUCC(ret) && i < add_column_ids.count(); i++) { + const uint64_t column_id = add_column_ids.at(i); + if (OB_ISNULL(new_column = new_table_schema.get_column_schema(column_id))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get column", KR(ret), K(tenant_id), K(table_id), K(column_id)); + } else if (FALSE_IT(new_column->set_schema_version(new_schema_version))) { + } else if (OB_FAIL(schema_service_impl->get_table_sql_service().insert_single_column( + trans, new_table_schema, *new_column, false))) { + LOG_WARN("fail to insert column", KR(ret), K(tenant_id), K(table_id), K(column_id)); + } + } // end for + + for (int64_t i = 0; OB_SUCC(ret) && i < alter_column_ids.count(); i++) { + const uint64_t column_id = alter_column_ids.at(i); + if (OB_ISNULL(new_column = new_table_schema.get_column_schema(column_id))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get column", KR(ret), K(tenant_id), K(table_id), K(column_id)); + } else if (FALSE_IT(new_column->set_schema_version(new_schema_version))) { + } else if (OB_FAIL(schema_service_impl->get_table_sql_service().update_single_column( + trans, orig_table_schema, new_table_schema, *new_column, false))) { + LOG_WARN("fail to insert column", KR(ret), K(tenant_id), K(table_id), K(column_id)); + } + } // end for + + if (FAILEDx(schema_service_impl->get_table_sql_service().update_table_attribute( + trans, new_table_schema, OB_DDL_ALTER_TABLE, ddl_stmt_str))) { + LOG_WARN("failed to update table attribute", KR(ret), K(tenant_id), K(table_id)); + } + } + return ret; +} + int ObDDLOperator::update_partition_option(common::ObMySQLTransaction &trans, ObTableSchema &table_schema) { diff --git a/src/rootserver/ob_ddl_operator.h b/src/rootserver/ob_ddl_operator.h index 8ea3b25f02..5b20616a71 100644 --- a/src/rootserver/ob_ddl_operator.h +++ b/src/rootserver/ob_ddl_operator.h @@ -292,6 +292,13 @@ public: int delete_single_column(common::ObMySQLTransaction &trans, share::schema::ObTableSchema &new_table_schema, const common::ObString &column_name); + int batch_update_system_table_columns( + common::ObMySQLTransaction &trans, + const share::schema::ObTableSchema &orig_table_schema, + share::schema::ObTableSchema &new_table_schema, + const common::ObIArray &add_column_ids, + const common::ObIArray &alter_column_ids, + const common::ObString *ddl_stmt_str = NULL); int create_sequence_in_create_table(share::schema::ObTableSchema &table_schema, common::ObMySQLTransaction &trans, share::schema::ObSchemaGetterGuard &schema_guard, diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index d3b779771b..afbcdff5d3 100644 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -18513,6 +18513,204 @@ int ObDDLService::update_index_status(const obrpc::ObUpdateIndexStatusArg &arg) return ret; } +int ObDDLService::upgrade_table_schema(const obrpc::ObUpgradeTableSchemaArg &arg) +{ + int ret = OB_SUCCESS; + FLOG_INFO("[UPGRADE] begin upgrade system table", K(arg)); + const uint64_t tenant_id = arg.get_tenant_id(); + const uint64_t table_id = arg.get_table_id(); + int64_t start_time = ObTimeUtility::current_time(); + if (OB_FAIL(check_inner_stat())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", KR(ret), K(arg)); + } else if (!GCONF.enable_sys_table_ddl) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("upgrade table schema when enable_sys_table_ddl is off is not allowed", + KR(ret), K(arg)); + } else { + HEAP_VAR(ObTableSchema, hard_code_schema) { + ObSchemaGetterGuard schema_guard; + bool exist = false; + if (OB_FAIL(get_hard_code_system_table_schema_( + tenant_id, table_id, hard_code_schema))) { + LOG_WARN("fail to get hard code table schema", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_FAIL(get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) { + LOG_WARN("get_schema_guard with version in inner table failed", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.check_table_exist(tenant_id, table_id, exist))) { + LOG_WARN("fail to check table exist", KR(ret), K(tenant_id), K(table_id)); + } else if (!exist) { + if (OB_FAIL(create_system_table_(schema_guard, hard_code_schema))) { + LOG_WARN("fail to create system table", KR(ret), K(tenant_id), K(table_id)); + } + } else if (OB_FAIL(alter_system_table_column_(schema_guard, hard_code_schema))) { + LOG_WARN("fail to alter system table's column", KR(ret), K(tenant_id), K(table_id)); + } + } + } + FLOG_INFO("[UPGRADE] end upgrade system table", + KR(ret), K(tenant_id), K(table_id), + "cost", ObTimeUtility::current_time() - start_time); + return ret; + +} + +int ObDDLService::get_hard_code_system_table_schema_( + const uint64_t tenant_id, + const uint64_t table_id, + ObTableSchema &hard_code_schema) +{ + int ret = OB_SUCCESS; + if (OB_INVALID_TENANT_ID == tenant_id + && !is_system_table(table_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id or table_id", KR(ret), K(tenant_id), K(table_id)); + } else { + bool finded = false; + const schema_create_func *creator_ptr_array[] = { + share::core_table_schema_creators, + share::sys_table_schema_creators, NULL }; + for (const schema_create_func **creator_ptr_ptr = creator_ptr_array; + OB_SUCC(ret) && !finded && OB_NOT_NULL(*creator_ptr_ptr); ++creator_ptr_ptr) { + for (const schema_create_func *creator_ptr = *creator_ptr_ptr; + OB_SUCC(ret) && !finded && OB_NOT_NULL(*creator_ptr); ++creator_ptr) { + hard_code_schema.reset(); + bool exist = false; + if (OB_FAIL((*creator_ptr)(hard_code_schema))) { + LOG_WARN("create table schema failed", KR(ret)); + } else if (!is_sys_tenant(tenant_id) + && OB_FAIL(ObSchemaUtils::construct_tenant_space_full_table( + tenant_id, hard_code_schema))) { + LOG_WARN("fail to construct tenant space table", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObSysTableChecker::is_inner_table_exist( + tenant_id, hard_code_schema, exist))) { + LOG_WARN("fail to check inner table exist", + KR(ret), K(tenant_id), K(hard_code_schema)); + } else if (!exist) { + // skip + } else if (hard_code_schema.get_table_id() == table_id) { + finded = true; + } + } // end for + } // end for + + if (OB_SUCC(ret) && !finded) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("hard code table schema not exist", KR(ret), K(tenant_id), K(table_id)); + } + } + return ret; +} + +int ObDDLService::create_system_table_( + ObSchemaGetterGuard &schema_guard, + const ObTableSchema &hard_code_schema) +{ + int ret = OB_SUCCESS; + bool if_not_exist = true; + int64_t frozen_scn = 0; + ObArray table_schemas; + // the following variable is not used + ObString ddl_stmt_str; + ObErrorInfo error_info; + obrpc::ObSequenceDDLArg sequence_ddl_arg; + uint64_t last_replay_log_id = 0; + ObArray dep_infos; + ObArray mock_fk_parent_table_schema_array; + // sys index、sys lob table will be added in create_user_tables() + if (OB_FAIL(table_schemas.push_back(hard_code_schema))) { + LOG_WARN("fail to push back new table schema", KR(ret)); + } else if (OB_FAIL(ObMajorFreezeHelper::get_frozen_scn( + hard_code_schema.get_tenant_id(), frozen_scn))) { + LOG_WARN("get_frozen_scn failed", KR(ret), "tenant_id", hard_code_schema.get_tenant_id()); + } else if (OB_FAIL(create_user_tables(if_not_exist, ddl_stmt_str, + error_info, table_schemas, frozen_scn, schema_guard, sequence_ddl_arg, + last_replay_log_id, &dep_infos, mock_fk_parent_table_schema_array))) { + LOG_WARN("fail to create system table", KR(ret), K(hard_code_schema)); + } + return ret; +} + +int ObDDLService::alter_system_table_column_( + ObSchemaGetterGuard &schema_guard, + const ObTableSchema &hard_code_schema) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = hard_code_schema.get_tenant_id(); + const uint64_t table_id = hard_code_schema.get_table_id(); + const ObTableSchema *orig_table_schema = NULL; + ObArray add_column_ids; + ObArray alter_column_ids; + if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, orig_table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_ISNULL(orig_table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_FAIL(ObRootInspection::check_and_get_system_table_column_diff( + *orig_table_schema, hard_code_schema, add_column_ids, alter_column_ids))) { + LOG_WARN("fail to check system table's column schemas", KR(ret), K(tenant_id), K(table_id)); + } else if (0 == add_column_ids.count() && 0 == alter_column_ids.count()) { + LOG_INFO("system table's column schemas not changed, just skip", KR(ret), K(tenant_id), K(table_id)); + } else { + ObDDLSQLTransaction trans(schema_service_); + ObDDLOperator ddl_operator(*schema_service_, *sql_proxy_); + int64_t refreshed_schema_version = 0; + HEAP_VAR(ObTableSchema, new_table_schema) { + + if (OB_FAIL(schema_guard.get_schema_version(tenant_id, refreshed_schema_version))) { + LOG_WARN("failed to get tenant schema version", KR(ret), K(tenant_id)); + } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, refreshed_schema_version))) { + LOG_WARN("failed to start trans", KR(ret), K(tenant_id), K(refreshed_schema_version)); + } else if (OB_FAIL(new_table_schema.assign(*orig_table_schema))) { + LOG_WARN("fail to assign table schema", KR(ret), K(tenant_id), K(table_id)); + } else { + const ObColumnSchemaV2 *hard_code_column = NULL; + for (int64_t i = 0; OB_SUCC(ret) && i < add_column_ids.count(); i++) { + const uint64_t column_id = add_column_ids.at(i); + if (OB_ISNULL(hard_code_column = hard_code_schema.get_column_schema(column_id))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get column schema", KR(ret), K(tenant_id), K(table_id), K(column_id)); + } else if (OB_FAIL(new_table_schema.add_column(*hard_code_column))) { + LOG_WARN("fail to add column", KR(ret), KPC(hard_code_column)); + } + } // end for + + ObColumnSchemaV2 new_column; + for (int64_t i = 0; OB_SUCC(ret) && i < alter_column_ids.count(); i++) { + const uint64_t column_id = alter_column_ids.at(i); + if (OB_ISNULL(hard_code_column = hard_code_schema.get_column_schema(column_id))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get column schema", KR(ret), K(tenant_id), K(table_id), K(column_id)); + } else if (OB_FAIL(new_column.assign(*hard_code_column))) { + LOG_WARN("fail to assign column", KR(ret), KPC(hard_code_column)); + } else if (OB_FAIL(new_table_schema.alter_column(new_column, share::schema::ObTableSchema::CHECK_MODE_ONLINE))) { + LOG_WARN("fail to alter column", KR(ret), K(new_column)); + } + } // end for + + if (FAILEDx(ddl_operator.batch_update_system_table_columns(trans, + *orig_table_schema, new_table_schema, add_column_ids, alter_column_ids, NULL))) { + LOG_WARN("fail to batch update columns", KR(ret), K(new_table_schema)); + } + } + + if (trans.is_started()) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) { + LOG_WARN("trans end failed", "is_commit", OB_SUCC(ret), K(tmp_ret)); + ret = (OB_SUCC(ret)) ? tmp_ret : ret; + } + } + if (FAILEDx(publish_schema(tenant_id))) { + LOG_WARN("fail to publish schema", KR(ret), K(tenant_id)); + } + + } // end HEAP_VAR + } + return ret; +} + int ObDDLService::add_table_schema( ObTableSchema &table_schema, @@ -18533,18 +18731,30 @@ int ObDDLService::drop_inner_table(const share::schema::ObTableSchema &table_sch ObString *stmt = NULL; ObSchemaGetterGuard schema_guard; const uint64_t tenant_id = table_schema.get_tenant_id(); + const uint64_t table_id = table_schema.get_table_id(); + const ObSimpleTableSchemaV2 * table = NULL; if (OB_FAIL(check_inner_stat())) { - LOG_WARN("variable is not init"); - } else if (!is_inner_table(table_schema.get_table_id())) { + LOG_WARN("variable is not init", KR(ret), K(tenant_id), K(table_id)); + } else if (!is_inner_table(table_id)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("table not inner table", "table_id", table_schema.get_table_id(), K(ret)); + LOG_WARN("table not inner table", KR(ret), K(tenant_id), K(table_id)); } else if (OB_FAIL(get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) { - LOG_WARN("fail to get schema guard with version in inner table", K(ret), K(tenant_id)); - } else if (OB_FAIL(drop_table_in_trans(schema_guard, table_schema, false, table_schema.is_index_table(), + LOG_WARN("fail to get schema guard with version in inner table", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_simple_table_schema(tenant_id, table_id, table))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_ISNULL(table)) { + // bugfix: https://work.aone.alibaba-inc.com/issue/45050614 + // virtual table index may be dropped with virtual table, so here we ignore OB_TABLE_NOT_EXIST failure. + LOG_INFO("table has already been dropped, just ignore", + K(tenant_id), K(table_id), "table_name", table_schema.get_table_name()); + } else if (OB_FAIL(drop_table_in_trans(schema_guard, + table_schema, + false, + table_schema.is_index_table(), false, /* to recyclebin*/ stmt, NULL, NULL, NULL))) { - LOG_WARN("drop table in transaction failed", K(ret), K(table_schema)); + LOG_WARN("drop table in transaction failed", KR(ret), K(tenant_id), K(table_id)); } return ret; } @@ -21685,7 +21895,8 @@ int ObDDLService::lock_tenant(const ObString &tenant_name, const bool is_lock) int ObDDLService::add_system_variable(const ObAddSysVarArg &arg) { - LOG_INFO("receive add system variable request", K(arg)); + FLOG_INFO("[UPGRADE] begin upgrade system variable", K(arg)); + int64_t start_time = ObTimeUtility::current_time(); DEBUG_SYNC(BEFORE_UPRADE_SYSTEM_VARIABLE); int ret = OB_SUCCESS; ObDDLSQLTransaction trans(schema_service_); @@ -21725,14 +21936,15 @@ int ObDDLService::add_system_variable(const ObAddSysVarArg &arg) LOG_WARN("sys var schema is null", KR(ret), K(arg)); } else if (!arg.update_sys_var_) { // case 1. add sys var, and sys var exist - if (arg.if_not_exist_) { - execute = false; + if (new_sys_var.is_equal_for_add(*old_schema)) { + // new sys var will be mocked by schema when upgrade, + // only persist new sys var when sys var is equal with old sys var. } else { - ret = OB_ERR_PARAM_DUPLICATE; + ret = OB_SCHEMA_ERROR; LOG_WARN("system variable duplicated", KR(ret), K(var_name)); } } else { - // upate sys var + // update sys var if (new_sys_var.is_equal_except_value(*old_schema)) { // case 2. new sys var is same with existed schema(except value), do nothing execute = false; @@ -21784,6 +21996,8 @@ int ObDDLService::add_system_variable(const ObAddSysVarArg &arg) LOG_WARN("publish schema failed", KR(ret)); } } + FLOG_INFO("[UPGRADE] end upgrade system variable", + KR(ret), K(arg), "cost", ObTimeUtility::current_time() - start_time); return ret; } diff --git a/src/rootserver/ob_ddl_service.h b/src/rootserver/ob_ddl_service.h index 4733f5fe04..2c7c875608 100644 --- a/src/rootserver/ob_ddl_service.h +++ b/src/rootserver/ob_ddl_service.h @@ -223,6 +223,7 @@ public: const share::schema::ObTableSchema **table_schema); virtual int update_index_status(const obrpc::ObUpdateIndexStatusArg &arg); + int upgrade_table_schema(const obrpc::ObUpgradeTableSchemaArg &arg); virtual int add_table_schema(share::schema::ObTableSchema &table_schema, share::schema::ObSchemaGetterGuard &schema_guard); virtual int drop_inner_table(const share::schema::ObTableSchema &table_schema); @@ -2155,6 +2156,17 @@ private: int check_table_pk(const share::schema::ObTableSchema &orig_table_schema); int check_can_convert_to_character(const share::schema::ObColumnSchemaV2 &col_schema, bool &can_convert); int clean_global_context(const ObContextSchema &context_schema); + + int get_hard_code_system_table_schema_( + const uint64_t tenant_id, + const uint64_t table_id, + share::schema::ObTableSchema &hard_code_schema); + int create_system_table_( + share::schema::ObSchemaGetterGuard &schema_guard, + const share::schema::ObTableSchema &hard_code_schema); + int alter_system_table_column_( + share::schema::ObSchemaGetterGuard &schema_guard, + const share::schema::ObTableSchema &hard_code_schema); private: bool inited_; volatile bool stopped_; diff --git a/src/rootserver/ob_root_inspection.cpp b/src/rootserver/ob_root_inspection.cpp index c378b8669a..a8c5507276 100644 --- a/src/rootserver/ob_root_inspection.cpp +++ b/src/rootserver/ob_root_inspection.cpp @@ -726,7 +726,7 @@ int ObRootInspection::check_all() // check sys schema tmp = OB_SUCCESS; - if (OB_SUCCESS != (tmp = check_sys_table_schemas())) { + if (OB_SUCCESS != (tmp = check_sys_table_schemas_())) { LOG_WARN("check_sys_table_schemas failed", K(tmp)); ret = (OB_SUCCESS == ret) ? tmp : ret; } @@ -976,51 +976,6 @@ int ObRootInspection::check_and_insert_sys_params(uint64_t tenant_id, LOG_WARN("some sys var exist in hard code, but does not exist in inner table, " "they will be inserted into table", K(table_name), K(miss_names)); } - ObSqlString sql; - ObSysParam sys_param; - obrpc::ObAddSysVarArg arg; - arg.exec_tenant_id_ = tenant_id; - const ObZone global_zone = ""; - for (int64_t i = 0; OB_SUCC(ret) && i < miss_names.count(); ++i) { - sql.reset(); - sys_param.reset(); - arg.sysvar_.reset(); - arg.if_not_exist_ = true; - arg.sysvar_.set_tenant_id(tenant_id); - int64_t var_store_idx = OB_INVALID_INDEX; - if (OB_ISNULL(rpc_proxy_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("common rpc proxy is null"); - } else if (OB_FAIL(check_cancel())) { - LOG_WARN("check_cancel failed", K(ret)); - } else if (OB_FAIL(ObSysVarFactory::calc_sys_var_store_idx_by_name( - ObString(miss_names.at(i).ptr()), var_store_idx))) { - LOG_WARN("fail to calc sys var store idx by name", K(ret), K(miss_names.at(i).ptr())); - } else if (false == ObSysVarFactory::is_valid_sys_var_store_idx(var_store_idx)) { - ret = OB_SCHEMA_ERROR; - LOG_WARN("calc sys var store idx success but store_idx is invalid", K(ret), K(var_store_idx)); - } else { - const ObString &name = ObSysVariables::get_name(var_store_idx); - const ObObjType &type = ObSysVariables::get_type(var_store_idx); - const ObString &value = ObSysVariables::get_value(var_store_idx); - const ObString &min = ObSysVariables::get_min(var_store_idx); - const ObString &max = ObSysVariables::get_max(var_store_idx); - const ObString &info = ObSysVariables::get_info(var_store_idx); - const int64_t flag = ObSysVariables::get_flags(var_store_idx); - if (OB_FAIL(sys_param.init(tenant_id, global_zone, name.ptr(), type, - value.ptr(), min.ptr(), max.ptr(), info.ptr(), flag))) { - LOG_WARN("sys_param init failed", K(tenant_id), K(name), K(type), K(value), - K(min), K(max), K(info), K(flag), K(ret)); - } else if (!sys_param.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("sys param is invalid", K(sys_param), K(ret)); - } else if (OB_FAIL(ObSchemaUtils::convert_sys_param_to_sysvar_schema(sys_param, arg.sysvar_))) { - LOG_WARN("convert sys param to sysvar schema failed", K(ret)); - } else if (OB_FAIL(rpc_proxy_->add_system_variable(arg))) { - LOG_WARN("add system variable failed", K(ret)); - } - } - } } return ret; } @@ -1190,7 +1145,7 @@ int ObRootInspection::calc_diff_names(const uint64_t tenant_id, return ret; } -int ObRootInspection::check_sys_table_schemas() +int ObRootInspection::check_sys_table_schemas_() { int ret = OB_SUCCESS; ObArray tenant_ids; @@ -1337,7 +1292,7 @@ int ObRootInspection::check_table_schema(const ObTableSchema &hard_code_table, || !inner_table.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid table_schema", K(hard_code_table), K(inner_table), K(ret)); - } else if (OB_FAIL(check_table_options(inner_table, hard_code_table))) { + } else if (OB_FAIL(check_table_options_(inner_table, hard_code_table))) { LOG_WARN("check_table_options failed", "table_id", hard_code_table.get_table_id(), K(ret)); } else { if (hard_code_table.get_column_count() != inner_table.get_column_count()) { @@ -1361,7 +1316,7 @@ int ObRootInspection::check_table_schema(const ObTableSchema &hard_code_table, hard_code_column->get_column_name(), K(ret)); } else { const bool ignore_column_id = is_virtual_table(hard_code_table.get_table_id()); - if (OB_FAIL(check_column_schema(hard_code_table.get_table_name(), + if (OB_FAIL(check_column_schema_(hard_code_table.get_table_name(), *column, *hard_code_column, ignore_column_id))) { LOG_WARN("column schema mismatch with hard code column schema", "table_name",inner_table.get_table_name(), "column", *column, @@ -1377,6 +1332,110 @@ int ObRootInspection::check_table_schema(const ObTableSchema &hard_code_table, return ret; } +int ObRootInspection::check_and_get_system_table_column_diff( + const share::schema::ObTableSchema &table_schema, + const share::schema::ObTableSchema &hard_code_schema, + common::ObIArray &add_column_ids, + common::ObIArray &alter_column_ids) +{ + int ret = OB_SUCCESS; + add_column_ids.reset(); + alter_column_ids.reset(); + if (!table_schema.is_valid() || !hard_code_schema.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid table_schema", KR(ret), K(table_schema), K(hard_code_schema)); + } else if (table_schema.get_tenant_id() != hard_code_schema.get_tenant_id() + || table_schema.get_table_id() != hard_code_schema.get_table_id() + || 0 != table_schema.get_table_name_str().compare(hard_code_schema.get_table_name_str()) + || !is_system_table(table_schema.get_table_id())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid table_schema", KR(ret), + "tenant_id", table_schema.get_tenant_id(), + "table_id", table_schema.get_table_id(), + "table_name", table_schema.get_table_name(), + "hard_code_tenant_id", hard_code_schema.get_tenant_id(), + "hard_code_table_id", hard_code_schema.get_table_id(), + "hard_code_table_name", hard_code_schema.get_table_name()); + } else { + const uint64_t tenant_id = table_schema.get_tenant_id(); + const uint64_t table_id = table_schema.get_table_id(); + const ObColumnSchemaV2 *column = NULL; + const ObColumnSchemaV2 *hard_code_column = NULL; + ObColumnSchemaV2 tmp_column; // check_column_can_be_altered_online() may change dst_column, is ugly. + bool ignore_column_id = false; + + // case 1. check if columns should be dropped. + // case 2. check if column can be altered online. + for (int64_t i = 0; OB_SUCC(ret) && i < table_schema.get_column_count(); i++) { + column = table_schema.get_column_schema_by_idx(i); + if (OB_ISNULL(column)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("column schema is null", KR(ret), K(tenant_id), K(table_id), K(i)); + } else if (OB_ISNULL(hard_code_column = hard_code_schema.get_column_schema(column->get_column_id()))) { + ret = OB_NOT_SUPPORTED; // case 1 + LOG_WARN("can't drop system table's column", KR(ret), + K(tenant_id), K(table_id), + "table_name", table_schema.get_table_name(), + "column_id", column->get_column_id(), + "column_name", column->get_column_name()); + } else { + // case 2 + int tmp_ret = check_column_schema_(table_schema.get_table_name_str(), + *column, + *hard_code_column, + ignore_column_id); + if (OB_SUCCESS == tmp_ret) { + // not changed + } else if (OB_SCHEMA_ERROR != tmp_ret) { + ret = tmp_ret; + LOG_WARN("fail to check column schema", KR(ret), + K(tenant_id), K(table_id), KPC(column), KPC(hard_code_column)); + } else if (OB_FAIL(tmp_column.assign(*hard_code_column))) { + LOG_WARN("fail to assign hard code column schema", KR(ret), + K(tenant_id), K(table_id), "column_id", hard_code_column->get_column_id()); + } else if (OB_FAIL(table_schema.check_column_can_be_altered_online(column, &tmp_column))) { + LOG_WARN("fail to check alter column online", KR(ret), + K(tenant_id), K(table_id), + "table_name", table_schema.get_table_name(), + "column_id", column->get_column_id(), + "column_name", column->get_column_name()); + } else if (OB_FAIL(alter_column_ids.push_back(column->get_column_id()))) { + LOG_WARN("fail to push back column_id", KR(ret), K(tenant_id), K(table_id), + "column_id", column->get_column_id()); + } + } + } // end for + + // case 3: check if columns should be added. + for (int64_t i = 0; OB_SUCC(ret) && i < hard_code_schema.get_column_count(); i++) { + hard_code_column = hard_code_schema.get_column_schema_by_idx(i); + if (OB_ISNULL(hard_code_column)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("column schema is null", KR(ret), K(tenant_id), K(table_id), K(i)); + } else if (OB_NOT_NULL(column = table_schema.get_column_schema(hard_code_column->get_column_id()))) { + // column exist, just skip + } else { + const uint64_t hard_code_column_id = hard_code_column->get_column_id(); + const ObColumnSchemaV2 *last_column = NULL; + if (table_schema.get_column_count() <= 0 + || OB_ISNULL(last_column = table_schema.get_column_schema_by_idx( + table_schema.get_column_count() - 1))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid column count or column", KR(ret), K(table_schema)); + } else if (table_schema.get_max_used_column_id() >= hard_code_column_id + || last_column->get_column_id() >= hard_code_column_id) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("column should be added at last", KR(ret), KPC(hard_code_column), K(table_schema)); + } else if (OB_FAIL(add_column_ids.push_back(hard_code_column_id))) { + LOG_WARN("fail to push back column_id", KR(ret), K(tenant_id), K(table_id), + "column_id", hard_code_column_id); + } + } + } // end for + } + return ret; +} + bool ObRootInspection::check_str_with_lower_case_(const ObString &str) { bool bret = false; @@ -1462,8 +1521,8 @@ int ObRootInspection::check_sys_view_( return ret; } -int ObRootInspection::check_table_options(const ObTableSchema &table, - const ObTableSchema &hard_code_table) +int ObRootInspection::check_table_options_(const ObTableSchema &table, + const ObTableSchema &hard_code_table) { int ret = OB_SUCCESS; if (!table.is_valid() || !hard_code_table.is_valid()) { @@ -1615,23 +1674,23 @@ int ObRootInspection::check_table_options(const ObTableSchema &table, return ret; } -int ObRootInspection::check_column_schema(const ObString &table_name, - const ObColumnSchemaV2 &column, - const ObColumnSchemaV2 &hard_code_column, - const bool ignore_column_id) +int ObRootInspection::check_column_schema_(const ObString &table_name, + const ObColumnSchemaV2 &column, + const ObColumnSchemaV2 &hard_code_column, + const bool ignore_column_id) { int ret = OB_SUCCESS; if (table_name.empty() || !column.is_valid() || !hard_code_column.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("table_name is empty or invalid column or invalid hard_code_column", - K(table_name), K(column), K(hard_code_column), K(ret)); + KR(ret), K(table_name), K(column), K(hard_code_column)); } else { #define CMP_COLUMN_ATTR(attr) \ if (OB_SUCC(ret)) { \ if (column.get_##attr() != hard_code_column.get_##attr()) { \ ret = OB_SCHEMA_ERROR; \ - LOG_WARN(#attr " mismatch", K(table_name), "column_name", column.get_column_name(), \ - "in_memory", column.get_##attr(), "hard_code", hard_code_column.get_##attr(), K(ret)); \ + LOG_WARN(#attr " mismatch", KR(ret), K(table_name), "column_name", column.get_column_name(), \ + "in_memory", column.get_##attr(), "hard_code", hard_code_column.get_##attr()); \ } \ } @@ -1639,10 +1698,18 @@ int ObRootInspection::check_column_schema(const ObString &table_name, if (OB_SUCC(ret)) { \ if (column.is_##attr() != hard_code_column.is_##attr()) { \ ret = OB_SCHEMA_ERROR; \ - LOG_WARN(#attr " mismatch", K(table_name), "column_name", column.get_column_name(), \ - "in_memory", column.is_##attr(), "hard_code", hard_code_column.is_##attr(), K(ret)); \ + LOG_WARN(#attr " mismatch", KR(ret), K(table_name), "column_name", column.get_column_name(), \ + "in_memory", column.is_##attr(), "hard_code", hard_code_column.is_##attr()); \ } \ } + if (OB_SUCC(ret)) { + if (column.get_column_name_str() != hard_code_column.get_column_name_str()) { + ret = OB_SCHEMA_ERROR; + LOG_WARN("column_name mismatch", KR(ret), K(table_name), + "in_memory", column.get_column_name(), + "hard_code", hard_code_column.get_column_name()); + } + } if (!ignore_column_id) { CMP_COLUMN_ATTR(column_id); diff --git a/src/rootserver/ob_root_inspection.h b/src/rootserver/ob_root_inspection.h index 5510295c97..06d601c607 100644 --- a/src/rootserver/ob_root_inspection.h +++ b/src/rootserver/ob_root_inspection.h @@ -168,18 +168,26 @@ public: // return OB_SCHEMA_ERROR for table schema mismatch virtual int check_table_schema(const uint64_t tenant_id, const share::schema::ObTableSchema &hard_code_table); - // TODO:(yanmu.ztl) standby cluster won't sync sys table schema any more. To be removed. + static int check_table_schema(const share::schema::ObTableSchema &hard_code_table, const share::schema::ObTableSchema &inner_table); - // TODO:(yanmu.ztl) standby cluster wont't sync sys table schema any more. Should be private. - int check_sys_table_schemas(); + + // For system tables, check and get column schemas' difference + // between table schema in memory and hard code table schema. + // 1. Drop column: Not supported. + // 2. Add column: Can only add columns at last. + // 3. Alter column: Can only alter columns online. + static int check_and_get_system_table_column_diff( + const share::schema::ObTableSchema &table_schema, + const share::schema::ObTableSchema &hard_code_schema, + common::ObIArray &add_column_ids, + common::ObIArray &alter_column_ids); private: static const int64_t NAME_BUF_LEN = 64; typedef common::ObFixedLengthString Name; int check_zone(); int check_sys_stat(); int check_sys_param(); - int check_sys_table_schemas_(const uint64_t tenant_id); template int get_names(const common::ObDList &list, common::ObIArray &names); @@ -199,12 +207,15 @@ private: common::ObIArray &fetch_names, /* data from inner table*/ common::ObIArray &extra_names, /* inner table more than hard code*/ common::ObIArray &miss_names /* inner table less than hard code*/); - static int check_table_options(const share::schema::ObTableSchema &table, - const share::schema::ObTableSchema &hard_code_table); - static int check_column_schema(const common::ObString &table_name, - const share::schema::ObColumnSchemaV2 &column, - const share::schema::ObColumnSchemaV2 &hard_code_column, - const bool ignore_column_id); + + int check_sys_table_schemas_(); + int check_sys_table_schemas_(const uint64_t tenant_id); + static int check_table_options_(const share::schema::ObTableSchema &table, + const share::schema::ObTableSchema &hard_code_table); + static int check_column_schema_(const common::ObString &table_name, + const share::schema::ObColumnSchemaV2 &column, + const share::schema::ObColumnSchemaV2 &hard_code_column, + const bool ignore_column_id); bool check_str_with_lower_case_(const ObString &str); int check_sys_view_(const uint64_t tenant_id, diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index 5b96005024..1fa0a60350 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -1431,20 +1431,22 @@ int ObRootService::submit_offline_server_task(const common::ObAddr &server) return ret; } -int ObRootService::submit_upgrade_task(const int64_t version) +int ObRootService::submit_upgrade_task( + const obrpc::ObUpgradeJobArg::Action action, + const int64_t version) { int ret = OB_SUCCESS; - ObUpgradeTask task(upgrade_executor_, version); + ObUpgradeTask task(upgrade_executor_, action, version); task.set_retry_times(0); //not repeat if (!inited_) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret)); } else if (OB_FAIL(upgrade_executor_.can_execute())) { - LOG_WARN("can't run task now", KR(ret), K(version)); + LOG_WARN("can't run task now", KR(ret), K(action), K(version)); } else if (OB_FAIL(task_queue_.add_async_task(task))) { - LOG_WARN("submit upgrade task fail", KR(ret), K(version)); + LOG_WARN("submit upgrade task fail", KR(ret), K(action), K(version)); } else { - LOG_INFO("submit upgrade task success", KR(ret), K(version)); + LOG_INFO("submit upgrade task success", KR(ret), K(action), K(version)); } return ret; } @@ -7955,13 +7957,15 @@ int ObRootService::run_upgrade_job(const obrpc::ObUpgradeJobArg &arg) } else if (!arg.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(arg), KR(ret)); - } else if (version < CLUSTER_VERSION_2270 - || !ObUpgradeChecker::check_cluster_version_exist(version)) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version)); - LOG_USER_ERROR(OB_NOT_SUPPORTED, "run upgrade job with such version is"); - } else if (ObUpgradeJobArg::RUN_UPGRADE_JOB == arg.action_) { - if (OB_FAIL(submit_upgrade_task(arg.version_))) { + } else if (ObUpgradeJobArg::UPGRADE_POST_ACTION == arg.action_ + || ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE == arg.action_ + || ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE == arg.action_) { + if (ObUpgradeJobArg::UPGRADE_POST_ACTION == arg.action_ + && !ObUpgradeChecker::check_cluster_version_exist(version)) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version)); + LOG_USER_ERROR(OB_NOT_SUPPORTED, "run upgrade job with such version is"); + } else if (OB_FAIL(submit_upgrade_task(arg.action_, version))) { LOG_WARN("fail to submit upgrade task", KR(ret), K(arg)); } } else if (ObUpgradeJobArg::STOP_UPGRADE_JOB == arg.action_) { @@ -7978,6 +7982,18 @@ int ObRootService::run_upgrade_job(const obrpc::ObUpgradeJobArg &arg) return ret; } +int ObRootService::upgrade_table_schema(const obrpc::ObUpgradeTableSchemaArg &arg) +{ + int ret = OB_SUCCESS; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_FAIL(ddl_service_.upgrade_table_schema(arg))) { + LOG_WARN("fail to upgrade table schema", KR(ret), K(arg)); + } + return ret; +} + int ObRootService::merge_finish(const obrpc::ObMergeFinishArg &arg) { int ret = OB_SUCCESS; diff --git a/src/rootserver/ob_root_service.h b/src/rootserver/ob_root_service.h index fa2ec4f953..f4ca7aa695 100644 --- a/src/rootserver/ob_root_service.h +++ b/src/rootserver/ob_root_service.h @@ -671,6 +671,7 @@ public: int admin_upgrade_virtual_schema(); int run_job(const obrpc::ObRunJobArg &arg); int run_upgrade_job(const obrpc::ObUpgradeJobArg &arg); + int upgrade_table_schema(const obrpc::ObUpgradeTableSchemaArg &arg); int admin_flush_cache(const obrpc::ObAdminFlushCacheArg &arg); int admin_upgrade_cmd(const obrpc::Bool &arg); int admin_rolling_upgrade_cmd(const obrpc::ObAdminRollingUpgradeArg &arg); @@ -696,7 +697,7 @@ public: int report_single_replica(const int64_t tenant_id, const share::ObLSID &ls_id); // @see RsListChangeCb int submit_update_rslist_task(const bool force_update = false); - int submit_upgrade_task(const int64_t version); + int submit_upgrade_task(const obrpc::ObUpgradeJobArg::Action action, const int64_t version); int submit_upgrade_storage_format_version_task(); int submit_create_inner_schema_task(); int submit_async_minor_freeze_task(const obrpc::ObRootMinorFreezeArg &arg); diff --git a/src/rootserver/ob_rs_job_table_operator.cpp b/src/rootserver/ob_rs_job_table_operator.cpp index 3eeb79decc..58b44c23f5 100644 --- a/src/rootserver/ob_rs_job_table_operator.cpp +++ b/src/rootserver/ob_rs_job_table_operator.cpp @@ -63,7 +63,9 @@ static const char* job_type_str_array[JOB_TYPE_MAX] = { "UPGRADE_STORAGE_FORMAT_VERSION", "STOP_UPGRADE_STORAGE_FORMAT_VERSION", "CREATE_INNER_SCHEMA", - "RUN_UPGRADE_POST_JOB" + "UPGRADE_POST_ACTION", + "UPGRADE_SYSTEM_VARIABLE", + "UPGRADE_SYSTEM_TABLE" }; const char* ObRsJobTableOperator::get_job_type_str(ObRsJobType job_type) diff --git a/src/rootserver/ob_rs_job_table_operator.h b/src/rootserver/ob_rs_job_table_operator.h index f16995f4a7..a3535dc4f1 100644 --- a/src/rootserver/ob_rs_job_table_operator.h +++ b/src/rootserver/ob_rs_job_table_operator.h @@ -82,7 +82,9 @@ enum ObRsJobType JOB_TYPE_UPGRADE_STORAGE_FORMAT_VERSION, JOB_TYPE_STOP_UPGRADE_STORAGE_FORMAT_VERSION, JOB_TYPE_CREATE_INNER_SCHEMA, - JOB_TYPE_RUN_UPGRADE_POST_JOB, + JOB_TYPE_UPGRADE_POST_ACTION, + JOB_TYPE_UPGRADE_SYSTEM_VARIABLE, + JOB_TYPE_UPGRADE_SYSTEM_TABLE, JOB_TYPE_MAX }; diff --git a/src/rootserver/ob_rs_rpc_processor.h b/src/rootserver/ob_rs_rpc_processor.h index 99aca11f83..483b3c5ee6 100644 --- a/src/rootserver/ob_rs_rpc_processor.h +++ b/src/rootserver/ob_rs_rpc_processor.h @@ -35,6 +35,7 @@ bool is_allow_when_disable_ddl(const obrpc::ObRpcPacketCode pcode, const obrpc:: if (OB_ISNULL(ddl_arg)) { } else if (obrpc::OB_COMMIT_ALTER_TENANT_LOCALITY == pcode || obrpc::OB_SCHEMA_REVISE == pcode // for upgrade + || obrpc::OB_UPGRADE_TABLE_SCHEMA == pcode || ((obrpc::OB_MODIFY_TENANT == pcode || obrpc::OB_MODIFY_SYSVAR == pcode || obrpc::OB_DO_KEYSTORE_DDL == pcode @@ -46,21 +47,6 @@ bool is_allow_when_disable_ddl(const obrpc::ObRpcPacketCode pcode, const obrpc:: return bret; } -// precondition: enable_ddl = false -bool is_allow_when_upgrade(const obrpc::ObRpcPacketCode pcode, const obrpc::ObDDLArg *ddl_arg) -{ - bool bret = false; - UNUSED(pcode); - if (obrpc::OB_UPGRADE_STAGE_DBUPGRADE != GCTX.get_upgrade_stage()) { - bret = true; - } else if (OB_ISNULL(ddl_arg)) { - bret = false; - } else { - bret = ddl_arg->is_allow_when_upgrade(); - } - return bret; -} - bool is_allow_when_create_tenant(const obrpc::ObRpcPacketCode pcode) { bool bret = false; @@ -126,8 +112,7 @@ protected: RS_LOG(WARN, "RS major freeze not finished, can not process ddl request", K(ret), K(pcode)); } else if (is_ddl_like_ - && ((!GCONF.enable_ddl && !is_allow_when_disable_ddl(pcode, ddl_arg_)) - || (GCONF.enable_ddl && !is_allow_when_upgrade(pcode, ddl_arg_)))) { + && (!GCONF.enable_ddl && !is_allow_when_disable_ddl(pcode, ddl_arg_))) { ret = OB_OP_NOT_ALLOW; RS_LOG(WARN, "ddl operation not allow, can not process this request", K(ret), K(pcode)); } else { @@ -476,6 +461,7 @@ DEFINE_RS_RPC_PROCESSOR(obrpc::OB_BROADCAST_SCHEMA, ObBroadcastSchemaP, broadcas // only for upgrade DEFINE_RS_RPC_PROCESSOR(obrpc::OB_CHECK_MERGE_FINISH, ObCheckMergeFinishP, check_merge_finish(arg_)); DEFINE_RS_RPC_PROCESSOR(obrpc::OB_GET_RECYCLE_SCHEMA_VERSIONS, ObGetRecycleSchemaVersionsP, get_recycle_schema_versions(arg_, result_)); +DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_UPGRADE_TABLE_SCHEMA, ObRpcUpgradeTableSchemaP, upgrade_table_schema(arg_)); //label security ddl DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_HANDLE_LABEL_SE_POLICY_DDL, ObRpcHandleLabelSePolicyDDLP, handle_label_se_policy_ddl(arg_)); DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_HANDLE_LABEL_SE_COMPONENT_DDL, ObRpcHandleLabelSeComponentDDLP, handle_label_se_component_ddl(arg_)); diff --git a/src/rootserver/ob_system_admin_util.cpp b/src/rootserver/ob_system_admin_util.cpp index 894f69aaea..c0f0cd4869 100644 --- a/src/rootserver/ob_system_admin_util.cpp +++ b/src/rootserver/ob_system_admin_util.cpp @@ -1307,34 +1307,49 @@ int ObAdminUpgradeVirtualSchema::upgrade_( } else if (OB_ISNULL(ctx_.ddl_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ddl service is null", KR(ret)); - } else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table( - tenant_id, schema_guard))) { + } + // 1. check table name duplicated + if (FAILEDx(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table( + tenant_id, schema_guard))) { LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id)); - } else if (OB_FAIL(schema_guard.get_table_schema(table.get_tenant_id(), + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table.get_database_id(), table.get_table_name(), table.is_index_table(), exist_schema))) { - // check if the table_name is occupied by others LOG_WARN("get table schema failed", KR(ret), K(tenant_id), "table", table.get_table_name()); if (OB_TABLE_NOT_EXIST == ret) { ret = OB_SUCCESS; } - } else if (OB_NOT_NULL(exist_schema)) { - // name modification except virtual table should first delete the old table, - // then create the new one to make virtual table upgrade valid - if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) { - LOG_WARN("get table schema failed", KR(ret), K(tenant_id), - "table", table.get_table_name(), "table_id", table.get_table_id()); - } else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table( - tenant_id, schema_guard))) { - LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id)); - } + } else if (OB_ISNULL(exist_schema)) { + // no duplicate table name + } else if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) { + LOG_WARN("get table schema failed", KR(ret), K(tenant_id), + "table", table.get_table_name(), "table_id", table.get_table_id()); + } else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table( + tenant_id, schema_guard))) { + LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id)); } - - // rebuild the inner table - if (OB_FAIL(ret)) { - } else if (OB_FAIL(ctx_.ddl_service_->add_table_schema(table, schema_guard))) { + // 2. try drop table first + exist_schema = NULL; + if (FAILEDx(schema_guard.get_table_schema(tenant_id, + table.get_table_id(), + exist_schema))) { + LOG_WARN("get table schema failed", KR(ret), "table", table.get_table_name(), + "table_id", table.get_table_id()); + if (OB_TABLE_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } + } else if (OB_ISNULL(exist_schema)) { + // missed table + } else if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) { + LOG_WARN("drop table schema failed", KR(ret), "table_schema", *exist_schema); + } else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table( + tenant_id, schema_guard))) { + LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id)); + } + // 3. create table + if (FAILEDx(ctx_.ddl_service_->add_table_schema(table, schema_guard))) { LOG_WARN("add table schema failed", KR(ret), K(tenant_id), K(table)); } else if (OB_FAIL(ctx_.ddl_service_->refresh_schema(tenant_id))) { LOG_WARN("refresh schema failed", KR(ret), K(tenant_id)); diff --git a/src/rootserver/ob_upgrade_executor.cpp b/src/rootserver/ob_upgrade_executor.cpp index 466e524011..d4aee5e487 100644 --- a/src/rootserver/ob_upgrade_executor.cpp +++ b/src/rootserver/ob_upgrade_executor.cpp @@ -15,6 +15,7 @@ #include "rootserver/ob_upgrade_executor.h" #include "rootserver/ob_rs_job_table_operator.h" #include "observer/ob_server_struct.h" +#include "rootserver/ob_root_inspection.h" namespace oceanbase { @@ -43,7 +44,7 @@ ObAsyncTask *ObUpgradeTask::deep_copy(char *buf, const int64_t buf_size) const ret = OB_INVALID_ARGUMENT; LOG_WARN("buf is not long enough", K(need_size), K(buf_size), KR(ret)); } else { - task = new(buf) ObUpgradeTask(*upgrade_executor_, version_); + task = new(buf) ObUpgradeTask(*upgrade_executor_, action_, version_); } return task; } @@ -51,22 +52,24 @@ ObAsyncTask *ObUpgradeTask::deep_copy(char *buf, const int64_t buf_size) const int ObUpgradeTask::process() { const int64_t start = ObTimeUtility::current_time(); - LOG_INFO("[UPGRADE] start to do execute upgrade task", K(start), K_(version)); + FLOG_INFO("[UPGRADE] start to do execute upgrade task", + K(start), K_(action), K_(version)); int ret = OB_SUCCESS; if (OB_ISNULL(upgrade_executor_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("upgrade_executor_ is null", KR(ret)); - } else if (OB_FAIL(upgrade_executor_->execute(version_))) { - LOG_WARN("fail to execute upgrade task", KR(ret)); + } else if (OB_FAIL(upgrade_executor_->execute(action_, version_))) { + LOG_WARN("fail to execute upgrade task", KR(ret), K_(action), K_(version)); } - LOG_INFO("[UPGRADE] finish execute upgrade task", - KR(ret), K_(version), "cost_us", ObTimeUtility::current_time() - start); + FLOG_INFO("[UPGRADE] finish execute upgrade task", + KR(ret), K_(action), K_(version), + "cost_us", ObTimeUtility::current_time() - start); return ret; } ObUpgradeExecutor::ObUpgradeExecutor() : inited_(false), stopped_(false), execute_(false), rwlock_(), - sql_proxy_(NULL), rpc_proxy_(NULL), schema_service_(NULL), + sql_proxy_(NULL), rpc_proxy_(NULL), common_rpc_proxy_(NULL), schema_service_(NULL), upgrade_processors_() {} @@ -88,6 +91,7 @@ int ObUpgradeExecutor::init( schema_service_ = &schema_service; sql_proxy_ = &sql_proxy; rpc_proxy_ = &rpc_proxy; + common_rpc_proxy_ = &common_proxy; stopped_ = false; execute_ = false; inited_ = true; @@ -128,9 +132,8 @@ int ObUpgradeExecutor::check_stop() const { int ret = OB_SUCCESS; SpinRLockGuard guard(rwlock_); - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KR(ret)); + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); } else if (stopped_) { ret = OB_CANCELED; LOG_WARN("executor should stopped", KR(ret)); @@ -145,13 +148,12 @@ bool ObUpgradeExecutor::check_execute() const return bret; } -int ObUpgradeExecutor::set_execute_mark() +int ObUpgradeExecutor::set_execute_mark_() { int ret = OB_SUCCESS; SpinWLockGuard guard(rwlock_); - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KR(ret)); + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); } else if (stopped_ || execute_) { ret = OB_OP_NOT_ALLOW; LOG_WARN("can't run job at the same time", KR(ret)); @@ -165,9 +167,8 @@ int ObUpgradeExecutor::can_execute() { int ret = OB_SUCCESS; SpinWLockGuard guard(rwlock_); - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KR(ret)); + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); } else if (stopped_ || execute_) { ret = OB_OP_NOT_ALLOW; LOG_WARN("status not matched", KR(ret), @@ -177,17 +178,33 @@ int ObUpgradeExecutor::can_execute() return ret; } +int ObUpgradeExecutor::check_inner_stat_() const +{ + int ret = OB_SUCCESS; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", KR(ret)); + } else if (OB_ISNULL(schema_service_) + || OB_ISNULL(sql_proxy_) + || OB_ISNULL(rpc_proxy_) + || OB_ISNULL(common_rpc_proxy_)){ + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ptr is null", KR(ret), KP_(schema_service), + KP_(sql_proxy), KP_(rpc_proxy), KP_(common_rpc_proxy)); + } + return ret; +} + // wait schema sync in cluster -int ObUpgradeExecutor::check_schema_sync() +int ObUpgradeExecutor::check_schema_sync_() { const int64_t start = ObTimeUtility::current_time(); LOG_INFO("[UPGRADE] start to check schema sync", K(start)); int ret = OB_SUCCESS; bool enable_ddl = GCONF.enable_ddl; bool enable_sys_table_ddl = GCONF.enable_sys_table_ddl; - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not inited", KR(ret)); + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); } else if (enable_ddl || enable_sys_table_ddl) { ret = OB_OP_NOT_ALLOW; LOG_WARN("ddl should disable now", KR(ret), K(enable_ddl), K(enable_sys_table_ddl)); @@ -213,7 +230,7 @@ int ObUpgradeExecutor::check_schema_sync() } // Ensure primary cluster's schema_version is not greator than standby clusters'. -int ObUpgradeExecutor::check_schema_sync( +int ObUpgradeExecutor::check_schema_sync_( obrpc::ObTenantSchemaVersions &primary_schema_versions, obrpc::ObTenantSchemaVersions &standby_schema_versions, bool &schema_sync) @@ -255,61 +272,31 @@ int ObUpgradeExecutor::check_schema_sync( return ret; } -int ObUpgradeExecutor::get_tenant_ids(common::ObIArray &tenant_ids) -{ - int ret = OB_SUCCESS; - ObSchemaGetterGuard guard; - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KR(ret)); - } else if (OB_FAIL(check_stop())) { - LOG_WARN("executor should stopped", KR(ret)); - } else if (OB_ISNULL(schema_service_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("schema_service is null", KR(ret)); - } else if (OB_FAIL(schema_service_->get_tenant_schema_guard( - OB_SYS_TENANT_ID, guard))) { - LOG_WARN("fail to get schema guard", KR(ret)); - } else if (OB_FAIL(guard.get_tenant_ids(tenant_ids))) { - LOG_WARN("fail to get tenant_ids", KR(ret)); - } - return ret; -} - -int ObUpgradeExecutor::execute(const int64_t version) +//TODO: +//1. Run upgrade job by tenant. +//2. Check tenant role/tenant status before run upgrade job. +int ObUpgradeExecutor::execute( + const obrpc::ObUpgradeJobArg::Action action, + const int64_t version) { ObCurTraceId::init(GCONF.self_addr_); int ret = OB_SUCCESS; - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KR(ret)); - } else if (OB_FAIL(set_execute_mark())) { + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (OB_FAIL(set_execute_mark_())) { LOG_WARN("fail to set execute mark", KR(ret)); } else { - int64_t job_id = OB_INVALID_ID; - ObRsJobType job_type = ObRsJobType::JOB_TYPE_RUN_UPGRADE_POST_JOB; - char version_str[common::OB_CLUSTER_VERSION_LENGTH] = {0}; - int64_t len = ObClusterVersion::print_version_str( - version_str, common::OB_CLUSTER_VERSION_LENGTH, version); - if (OB_ISNULL(sql_proxy_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("sql_proxy_ is null", KR(ret)); - } else if (OB_FAIL(check_stop())) { - LOG_WARN("executor should stopped", KR(ret)); - } else if (OB_FAIL(RS_JOB_CREATE_WITH_RET(job_id, job_type, *sql_proxy_, - "extra_info", ObString(len, version_str)))) { - LOG_WARN("fail to create rs job", KR(ret)); - } else if (job_id <= 0) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("job_id is invalid", KR(ret), K(job_id)); - } else if (OB_FAIL(run_upgrade_job(version))) { - LOG_WARN("fail to run upgrade job", KR(ret), K(job_id), K(version)); - } - int tmp_ret = OB_SUCCESS; - if (job_id > 0) { - if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) { - LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id)); - ret = OB_FAIL(ret) ? ret : tmp_ret; + if (obrpc::ObUpgradeJobArg::UPGRADE_POST_ACTION == action) { + if (OB_FAIL(run_upgrade_post_job_(version))) { + LOG_WARN("fail to run upgrade post job", KR(ret), K(version)); + } + } else if (obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE == action) { + if (OB_FAIL(run_upgrade_system_variable_job_())) { + LOG_WARN("fail to run upgrade system variable job", KR(ret)); + } + } else if (obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE == action) { + if (OB_FAIL(run_upgrade_system_table_job_())) { + LOG_WARN("fail to run upgrade system table job", KR(ret)); } } execute_ = false; @@ -317,49 +304,267 @@ int ObUpgradeExecutor::execute(const int64_t version) return ret; } -// Python upgrade script will set enable_ddl = false before it run upgrade job. -int ObUpgradeExecutor::run_upgrade_job(const int64_t version) +// Python upgrade script may set enable_ddl = false before it run upgrade job. +// TODO: +// 1. support run upgrade post action from `COMPATIBLE` to current cluster version. +int ObUpgradeExecutor::run_upgrade_post_job_(const int64_t version) { int ret = OB_SUCCESS; - ObArray tenant_ids; - ObBaseUpgradeProcessor *processor = NULL; - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KR(ret)); + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); } else if (OB_FAIL(check_stop())) { LOG_WARN("executor should stopped", KR(ret)); - } else if (version < CLUSTER_VERSION_2270 - || !ObUpgradeChecker::check_cluster_version_exist(version)) { + } else if (!ObUpgradeChecker::check_cluster_version_exist(version)) { ret = OB_NOT_SUPPORTED; LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version)); - } else if (OB_FAIL(get_tenant_ids(tenant_ids))) { - LOG_WARN("fail to get tenant_ids", KR(ret), K(version)); - } else if (OB_FAIL(upgrade_processors_.get_processor_by_version( - version, processor))) { - LOG_WARN("fail to get processor by version", KR(ret), K(version)); } else { - // 1. Run upgrade jobs(by tenant_id desc) in primary cluster. - // 2. Only run sys tenant's upgrade job in standby clusters. - for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) { - const uint64_t tenant_id = tenant_ids.at(i); - if (OB_SYS_TENANT_ID == tenant_id || !GCTX.is_standby_cluster()) { + ObArray tenant_ids; + ObBaseUpgradeProcessor *processor = NULL; + int64_t job_id = OB_INVALID_ID; + ObRsJobType job_type = ObRsJobType::JOB_TYPE_UPGRADE_POST_ACTION; + char version_str[common::OB_CLUSTER_VERSION_LENGTH] = {0}; + int64_t len = ObClusterVersion::print_version_str( + version_str, common::OB_CLUSTER_VERSION_LENGTH, version); + int tmp_ret = OB_SUCCESS; + int64_t backup_ret = OB_SUCCESS; + if (OB_FAIL(RS_JOB_CREATE_WITH_RET(job_id, job_type, *sql_proxy_, + "tenant_id", 0, + "extra_info", ObString(len, version_str)))) { + LOG_WARN("fail to create rs job", KR(ret)); + } else if (job_id <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("job_id is invalid", KR(ret), K(job_id)); + } else if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) { + LOG_WARN("fail to get tenant_ids", KR(ret), K(version)); + } else if (OB_FAIL(upgrade_processors_.get_processor_by_version( + version, processor))) { + LOG_WARN("fail to get processor by version", KR(ret), K(version)); + } else { + for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) { + const uint64_t tenant_id = tenant_ids.at(i); int64_t start_ts = ObTimeUtility::current_time(); int64_t current_version = processor->get_version(); processor->set_tenant_id(tenant_id); - LOG_INFO("[UPGRADE] start to run post upgrade job by version", - K(tenant_id), K(current_version)); - if (OB_FAIL(processor->post_upgrade())) { + FLOG_INFO("[UPGRADE] start to run post upgrade job by version", + K(tenant_id), K(current_version)); + if (OB_FAIL(check_stop())) { + LOG_WARN("executor should stopped", KR(ret)); + } else if (OB_TMP_FAIL(processor->post_upgrade())) { LOG_WARN("run post upgrade by version failed", - KR(ret), K(tenant_id), K(current_version)); + KR(tmp_ret), K(tenant_id), K(current_version)); + backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret; } - LOG_INFO("[UPGRADE] finish post upgrade job by version", - KR(ret), K(tenant_id), K(current_version), - "cost", ObTimeUtility::current_time() - start_ts); + FLOG_INFO("[UPGRADE] finish post upgrade job by version", + KR(tmp_ret), K(tenant_id), K(current_version), + "cost", ObTimeUtility::current_time() - start_ts); + } // end for + } + ret = OB_SUCC(ret) ? backup_ret : ret; + if (job_id > 0) { + if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) { + LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id)); + ret = OB_FAIL(ret) ? ret : tmp_ret; } } } return ret; } +int ObUpgradeExecutor::run_upgrade_system_variable_job_() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (OB_FAIL(check_stop())) { + LOG_WARN("executor should stopped", KR(ret)); + } else { + ObArray tenant_ids; + int64_t job_id = OB_INVALID_ID; + ObRsJobType job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_VARIABLE; + int tmp_ret = OB_SUCCESS; + int backup_ret = OB_SUCCESS; + if (OB_FAIL(RS_JOB_CREATE_WITH_RET(job_id, job_type, *sql_proxy_, "tenant_id", 0))) { + LOG_WARN("fail to create rs job", KR(ret)); + } else if (job_id <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("job_id is invalid", KR(ret), K(job_id)); + } else if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) { + LOG_WARN("fail to get tenant_ids", KR(ret)); + } else { + for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) { + const uint64_t tenant_id = tenant_ids.at(i); + int64_t start_ts = ObTimeUtility::current_time(); + FLOG_INFO("[UPGRADE] start to run upgrade system variable job", K(tenant_id)); + if (OB_FAIL(check_stop())) { + LOG_WARN("executor should stopped", KR(ret)); + } else if (OB_TMP_FAIL(ObUpgradeUtils::upgrade_sys_variable(*common_rpc_proxy_, *sql_proxy_, tenant_id))) { + LOG_WARN("fail to upgrade sys variable", KR(tmp_ret), K(tenant_id)); + backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret; + } + FLOG_INFO("[UPGRADE] finish run upgrade system variable job", + KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts); + } // end for + ret = OB_SUCC(ret) ? backup_ret : ret; + } + if (job_id > 0) { + if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) { + LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id)); + ret = OB_FAIL(ret) ? ret : tmp_ret; + } + } + } + return ret; +} + +// NOTICE: enable_sys_table_ddl should be true before run this job. +int ObUpgradeExecutor::run_upgrade_system_table_job_() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (OB_FAIL(check_stop())) { + LOG_WARN("executor should stopped", KR(ret)); + } else { + ObArray tenant_ids; + int64_t job_id = OB_INVALID_ID; + ObRsJobType job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_TABLE; + int tmp_ret = OB_SUCCESS; + int backup_ret = OB_SUCCESS; + if (OB_FAIL(RS_JOB_CREATE_WITH_RET(job_id, job_type, *sql_proxy_, "tenant_id", 0))) { + LOG_WARN("fail to create rs job", KR(ret)); + } else if (job_id <= 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("job_id is invalid", KR(ret), K(job_id)); + } else if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) { + LOG_WARN("fail to get tenant_ids", KR(ret)); + } else { + for (int64_t i = tenant_ids.count() - 1; i >= 0; i--) { + const uint64_t tenant_id = tenant_ids.at(i); + int64_t start_ts = ObTimeUtility::current_time(); + FLOG_INFO("[UPGRADE] start to run upgrade system table job", K(tenant_id)); + if (OB_FAIL(check_stop())) { + LOG_WARN("executor should stopped", KR(ret)); + } else if (OB_TMP_FAIL(upgrade_system_table_(tenant_id))) { + LOG_WARN("fail to upgrade system table", KR(tmp_ret), K(tenant_id)); + backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret; + } + FLOG_INFO("[UPGRADE] finish run upgrade system table job", + KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts); + } // end for + ret = OB_SUCC(ret) ? backup_ret : ret; + } + if (job_id > 0) { + if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) { + LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id)); + ret = OB_FAIL(ret) ? ret : tmp_ret; + } + } + } + return ret; +} + +int ObUpgradeExecutor::upgrade_system_table_(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (OB_FAIL(check_stop())) { + LOG_WARN("executor should stopped", KR(ret)); + } else { + ObArray upgrade_table_ids; // miss or mismatch + // Only core/system tables can be upgraded here. + // 1. __all_core_table can't be altered. + // 2. sys index table and sys lob table will be added with sys data table, and can't be altered. + const schema_create_func *creator_ptr_array[] = { + share::core_table_schema_creators, + share::sys_table_schema_creators, NULL }; + + // check system table + ObTableSchema table_schema; + bool exist = false; + for (const schema_create_func **creator_ptr_ptr = creator_ptr_array; + OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr_ptr); ++creator_ptr_ptr) { + for (const schema_create_func *creator_ptr = *creator_ptr_ptr; + OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr); ++creator_ptr) { + table_schema.reset(); + if (OB_FAIL(check_stop())) { + LOG_WARN("check_cancel failed", KR(ret)); + } else if (OB_FAIL((*creator_ptr)(table_schema))) { + LOG_WARN("create table schema failed", KR(ret)); + } else if (!is_sys_tenant(tenant_id) + && OB_FAIL(ObSchemaUtils::construct_tenant_space_full_table( + tenant_id, table_schema))) { + LOG_WARN("fail to construct tenant space table", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObSysTableChecker::is_inner_table_exist( + tenant_id, table_schema, exist))) { + LOG_WARN("fail to check inner table exist", + KR(ret), K(tenant_id), K(table_schema)); + } else if (!exist) { + // skip + } else if (OB_FAIL(check_table_schema_(tenant_id, table_schema))) { + const uint64_t table_id = table_schema.get_table_id(); + if (OB_SCHEMA_ERROR != ret) { + LOG_WARN("check_table_schema failed", KR(ret), K(tenant_id), K(table_id)); + } else { + FLOG_INFO("[UPGRADE] table need upgrade", K(tenant_id), K(table_id), + "table_name", table_schema.get_table_name()); + if (OB_FAIL(upgrade_table_ids.push_back(table_id))) { // overwrite ret + LOG_WARN("fail to push back upgrade table ids", KR(ret), K(tenant_id), K(table_id)); + } + } + } + } // end for + } // end for + + int tmp_ret = OB_SUCCESS; + int backup_ret = OB_SUCCESS; + // upgrade system table(create or alter) + obrpc::ObUpgradeTableSchemaArg arg; + const int64_t timeout = GCONF.internal_sql_execute_timeout; + for (int64_t i = 0; OB_SUCC(ret) && i < upgrade_table_ids.count(); i++) { + const uint64_t table_id = upgrade_table_ids.at(i); + int64_t start_ts = ObTimeUtility::current_time(); + FLOG_INFO("[UPGRADE] start upgrade system table", K(tenant_id), K(table_id)); + if (OB_FAIL(check_stop())) { + LOG_WARN("check_cancel failed", KR(ret)); + } else if (OB_FAIL(arg.init(tenant_id, table_id))) { + LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_TMP_FAIL(common_rpc_proxy_->timeout(timeout).upgrade_table_schema(arg))) { + LOG_WARN("fail to uggrade table schema", KR(tmp_ret), K(timeout), K(arg)); + backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret; + } + FLOG_INFO("[UPGRADE] finish upgrade system table", + KR(tmp_ret), K(tenant_id), K(table_id), "cost", ObTimeUtility::current_time() - start_ts); + } // end for + ret = OB_SUCC(ret) ? backup_ret : ret; + } + return ret; +} + +int ObUpgradeExecutor::check_table_schema_(const uint64_t tenant_id, const ObTableSchema &hard_code_table) +{ + int ret = OB_SUCCESS; + const ObTableSchema *table = NULL; + ObSchemaGetterGuard schema_guard; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("fail to check inner stat", KR(ret)); + } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) { + LOG_WARN("failed to get schema guard", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_table_schema( + tenant_id, hard_code_table.get_table_id(), table))) { + LOG_WARN("get_table_schema failed", KR(ret), K(tenant_id), + "table_id", hard_code_table.get_table_id(), + "table_name", hard_code_table.get_table_name()); + } else if (OB_ISNULL(table)) { + ret = OB_SCHEMA_ERROR; + LOG_WARN("table should not be null", KR(ret), K(tenant_id), + "table_id", hard_code_table.get_table_id(), + "table_name", hard_code_table.get_table_name()); + } else if (OB_FAIL(ObRootInspection::check_table_schema(hard_code_table, *table))) { + LOG_WARN("fail to check table schema", KR(ret), K(tenant_id), K(hard_code_table), KPC(table)); + } + return ret; +} + }//end rootserver }//end oceanbase diff --git a/src/rootserver/ob_upgrade_executor.h b/src/rootserver/ob_upgrade_executor.h index 40d30675b7..4d4f4918ee 100644 --- a/src/rootserver/ob_upgrade_executor.h +++ b/src/rootserver/ob_upgrade_executor.h @@ -18,6 +18,7 @@ #include "share/ob_check_stop_provider.h" #include "share/schema/ob_multi_version_schema_service.h" #include "share/schema/ob_schema_getter_guard.h" +#include "share/ob_rpc_struct.h" namespace oceanbase { @@ -29,8 +30,9 @@ class ObUpgradeTask: public share::ObAsyncTask { public: explicit ObUpgradeTask(ObUpgradeExecutor &upgrade_executor, + const obrpc::ObUpgradeJobArg::Action action, const int64_t version) - : upgrade_executor_(&upgrade_executor), version_(version) + : upgrade_executor_(&upgrade_executor), action_(action), version_(version) {} virtual ~ObUpgradeTask() {} virtual int64_t get_deep_copy_size() const; @@ -38,6 +40,7 @@ public: virtual int process(); private: ObUpgradeExecutor *upgrade_executor_; + obrpc::ObUpgradeJobArg::Action action_; int64_t version_; }; @@ -51,7 +54,8 @@ public: obrpc::ObSrvRpcProxy &rpc_proxy, obrpc::ObCommonRpcProxy &common_proxy); - int execute(const int64_t version); + int execute(const obrpc::ObUpgradeJobArg::Action action, + const int64_t version); int can_execute(); int check_stop() const; bool check_execute() const; @@ -59,16 +63,22 @@ public: void start(); int stop(); private: - int set_execute_mark(); + int check_inner_stat_() const; + int set_execute_mark_(); - int check_schema_sync(); - int check_schema_sync( + int run_upgrade_post_job_(const int64_t version); + int run_upgrade_system_variable_job_(); + int run_upgrade_system_table_job_(); + + int upgrade_system_table_(const uint64_t tenant_id); + int check_table_schema_(const uint64_t tenant_id, + const share::schema::ObTableSchema &hard_code_table); + + int check_schema_sync_(); + int check_schema_sync_( obrpc::ObTenantSchemaVersions &primary_schema_versions, obrpc::ObTenantSchemaVersions &standby_schema_versions, bool &schema_sync); - int get_tenant_ids(common::ObIArray &tenant_ids); - - int run_upgrade_job(const int64_t version); private: bool inited_; bool stopped_; @@ -76,6 +86,7 @@ private: common::SpinRWLock rwlock_; common::ObMySQLProxy *sql_proxy_; obrpc::ObSrvRpcProxy *rpc_proxy_; + obrpc::ObCommonRpcProxy *common_rpc_proxy_; share::schema::ObMultiVersionSchemaService *schema_service_; share::ObUpgradeProcesserSet upgrade_processors_; DISALLOW_COPY_AND_ASSIGN(ObUpgradeExecutor); diff --git a/src/share/ob_common_rpc_proxy.h b/src/share/ob_common_rpc_proxy.h index 8cd0a9fa77..7a9db4ab9b 100644 --- a/src/share/ob_common_rpc_proxy.h +++ b/src/share/ob_common_rpc_proxy.h @@ -214,6 +214,7 @@ public: RPC_S(PRD admin_upgrade_virtual_schema, obrpc::OB_ADMIN_UPGRADE_VIRTUAL_SCHEMA); RPC_S(PRD run_job, obrpc::OB_RUN_JOB, (ObRunJobArg)); RPC_S(PRD run_upgrade_job, obrpc::OB_RUN_UPGRADE_JOB, (ObUpgradeJobArg)); + RPC_S(PRD upgrade_table_schema, obrpc::OB_UPGRADE_TABLE_SCHEMA, (ObUpgradeTableSchemaArg)); RPC_S(PR5 admin_flush_cache, obrpc::OB_ADMIN_FLUSH_CACHE, (ObAdminFlushCacheArg)); RPC_S(PR5 admin_upgrade_cmd, obrpc::OB_ADMIN_UPGRADE_CMD, (Bool)); RPC_S(PR5 admin_rolling_upgrade_cmd, obrpc::OB_ADMIN_ROLLING_UPGRADE_CMD, (ObAdminRollingUpgradeArg)); diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 5ed49bef2e..a68a23f790 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -3819,7 +3819,8 @@ ObUpgradeJobArg::ObUpgradeJobArg() {} bool ObUpgradeJobArg::is_valid() const { - return INVALID_ACTION != action_ && version_ > 0; + return INVALID_ACTION != action_ + && (UPGRADE_POST_ACTION != action_ || version_ > 0); } int ObUpgradeJobArg::assign(const ObUpgradeJobArg &other) { @@ -3830,6 +3831,41 @@ int ObUpgradeJobArg::assign(const ObUpgradeJobArg &other) } OB_SERIALIZE_MEMBER(ObUpgradeJobArg, action_, version_); +int ObUpgradeTableSchemaArg::init( + const uint64_t tenant_id, + const uint64_t table_id) +{ + int ret = OB_SUCCESS; + ObDDLArg::reset(); + exec_tenant_id_ = tenant_id; + tenant_id_ = tenant_id; + table_id_ = table_id; + return ret; +} + +bool ObUpgradeTableSchemaArg::is_valid() const +{ + return common::OB_INVALID_TENANT_ID != exec_tenant_id_ + && common::OB_INVALID_TENANT_ID != tenant_id_ + /*index、lob table will be created with related system table*/ + && is_system_table(table_id_); +} + +int ObUpgradeTableSchemaArg::assign(const ObUpgradeTableSchemaArg &other) +{ + int ret = OB_SUCCESS; + if (this == &other) { + } else if (OB_FAIL(ObDDLArg::assign(other))) { + LOG_WARN("fail to assign ObDDLArg", KR(ret)); + } else { + tenant_id_ = other.tenant_id_; + table_id_ = other.table_id_; + } + return ret; +} + +OB_SERIALIZE_MEMBER((ObUpgradeTableSchemaArg, ObDDLArg), tenant_id_, table_id_); + int ObAdminFlushCacheArg::assign(const ObAdminFlushCacheArg &other) { int ret = OB_SUCCESS; diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 6ea3a05c08..1ae5764af5 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -4451,8 +4451,10 @@ struct ObUpgradeJobArg public: enum Action { INVALID_ACTION, - RUN_UPGRADE_JOB, - STOP_UPGRADE_JOB + UPGRADE_POST_ACTION, + STOP_UPGRADE_JOB, + UPGRADE_SYSTEM_VARIABLE, + UPGRADE_SYSTEM_TABLE, }; public: ObUpgradeJobArg(); @@ -4464,6 +4466,26 @@ public: int64_t version_; }; +struct ObUpgradeTableSchemaArg : public ObDDLArg +{ + OB_UNIS_VERSION(1); +public: + ObUpgradeTableSchemaArg() + : ObDDLArg(), + tenant_id_(common::OB_INVALID_TENANT_ID), + table_id_(common::OB_INVALID_ID) {} + ~ObUpgradeTableSchemaArg() {} + int init(const uint64_t tenant_id, const uint64_t table_id); + bool is_valid() const; + int assign(const ObUpgradeTableSchemaArg &other); + uint64_t get_tenant_id() const { return tenant_id_; } + uint64_t get_table_id() const { return table_id_; } + TO_STRING_KV(K_(tenant_id), K_(table_id)); +private: + uint64_t tenant_id_; + uint64_t table_id_; +}; + struct ObAdminMergeArg { OB_UNIS_VERSION(1); diff --git a/src/share/ob_upgrade_utils.cpp b/src/share/ob_upgrade_utils.cpp index 59204b5529..9f58da812d 100644 --- a/src/share/ob_upgrade_utils.cpp +++ b/src/share/ob_upgrade_utils.cpp @@ -10,7 +10,7 @@ * See the Mulan PubL v2 for more details. */ -#define USING_LOG_PREFIX SHARE +#define USING_LOG_PREFIX RS #include "lib/string/ob_sql_string.h" #include "share/ob_rpc_struct.h" @@ -280,17 +280,17 @@ int ObUpgradeUtils::upgrade_sys_variable( || OB_INVALID_ID == tenant_id) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); - } else if (OB_FAIL(calc_diff_sys_var(sql_client, tenant_id, update_list, add_list))) { + } else if (OB_FAIL(calc_diff_sys_var_(sql_client, tenant_id, update_list, add_list))) { LOG_WARN("fail to calc diff sys var", KR(ret), K(tenant_id)); - } else if (OB_FAIL(update_sys_var(rpc_proxy, tenant_id, update_list))) { + } else if (OB_FAIL(update_sys_var_(rpc_proxy, tenant_id, true, update_list))) { LOG_WARN("fail to update sys var", KR(ret), K(tenant_id)); - } else if (OB_FAIL(add_sys_var(sql_client, tenant_id, add_list))) { + } else if (OB_FAIL(update_sys_var_(rpc_proxy, tenant_id, false, add_list))) { LOG_WARN("fail to add sys var", KR(ret), K(tenant_id)); } return ret; } -int ObUpgradeUtils::calc_diff_sys_var( +int ObUpgradeUtils::calc_diff_sys_var_( common::ObISQLClient &sql_client, const uint64_t tenant_id, common::ObArray &update_list, @@ -361,7 +361,7 @@ int ObUpgradeUtils::calc_diff_sys_var( || 0 != hard_code_min_val.compare(min_val) || 0 != hard_code_max_val.compare(max_val)) { // sys var to modify - LOG_INFO("sys var diff, need modify", K(tenant_id), K(name), + LOG_INFO("[UPGRADE] sys var diff, need modify", K(tenant_id), K(name), K(data_type), K(flags), K(min_val), K(max_val), K(info), K(hard_code_type), K(hard_code_flag), K(hard_code_min_val), K(hard_code_max_val), K(hard_code_info)); @@ -398,7 +398,7 @@ int ObUpgradeUtils::calc_diff_sys_var( } else if (OB_FAIL(add_list.push_back(i))) { LOG_WARN("fail to push back var_store_idx", KR(ret), K(tenant_id), K(name)); } else { - LOG_INFO("sys var miss, need add", K(tenant_id), K(name), K(i)); + LOG_INFO("[UPGRADE] sys var miss, need add", K(tenant_id), K(name), K(i)); } } } @@ -406,15 +406,11 @@ int ObUpgradeUtils::calc_diff_sys_var( return ret; } -/* - * This function is used to restore backup data from cluster with lower cluster version. - * For modified system variable schema in physical restore, we compensate by methods below. - * 1. Modify system variable(except value) by DDL in physical restore. - * 2. Observer should run with system variable schema which is not modified yet. - */ -int ObUpgradeUtils::update_sys_var( +// modify & add sys var according by hard code schema +int ObUpgradeUtils::update_sys_var_( obrpc::ObCommonRpcProxy &rpc_proxy, const uint64_t tenant_id, + const bool is_update, common::ObArray &update_list) { int ret = OB_SUCCESS; @@ -423,7 +419,9 @@ int ObUpgradeUtils::update_sys_var( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); } else { + const int64_t timeout = GCONF.internal_sql_execute_timeout; for (int64_t i = 0; OB_SUCC(ret) && i < update_list.count(); i++) { + int64_t start_ts = ObTimeUtility::current_time(); int64_t var_store_idx = update_list.at(i); const ObString &name = ObSysVariables::get_name(var_store_idx); const ObObjType &type = ObSysVariables::get_type(var_store_idx); @@ -436,9 +434,9 @@ int ObUpgradeUtils::update_sys_var( ObSysParam sys_param; obrpc::ObAddSysVarArg arg; arg.exec_tenant_id_ = tenant_id; - arg.if_not_exist_ = true; + arg.if_not_exist_ = true; // not used arg.sysvar_.set_tenant_id(tenant_id); - arg.update_sys_var_ = true; + arg.update_sys_var_ = is_update; if (OB_FAIL(sys_param.init(tenant_id, zone, name.ptr(), type, value.ptr(), min.ptr(), max.ptr(), info.ptr(), flag))) { LOG_WARN("sys_param init failed", KR(ret), K(tenant_id), K(name), @@ -448,298 +446,16 @@ int ObUpgradeUtils::update_sys_var( LOG_WARN("sys param is invalid", KR(ret), K(tenant_id), K(sys_param)); } else if (OB_FAIL(ObSchemaUtils::convert_sys_param_to_sysvar_schema(sys_param, arg.sysvar_))) { LOG_WARN("convert sys param to sysvar schema failed", KR(ret)); - } else if (OB_FAIL(rpc_proxy.add_system_variable(arg))) { - LOG_WARN("add system variable failed", KR(ret), K(arg)); - /*} else if (OB_FAIL(execute_update_sys_var_sql(sql_client, tenant_id, sys_param))) { - LOG_WARN("fail to execute update sys var sql", KR(ret), K(tenant_id)); - } else if (OB_FAIL(execute_update_sys_var_history_sql(sql_client, tenant_id, sys_param))) { - LOG_WARN("fail to execute update sys var history sql", KR(ret), K(tenant_id));*/ + } else if (OB_FAIL(rpc_proxy.timeout(timeout).add_system_variable(arg))) { + LOG_WARN("add system variable failed", KR(ret), K(timeout), K(arg)); } + LOG_INFO("[UPGRADE] finish upgrade system variable", + KR(ret), K(tenant_id), K(name), "cost", ObTimeUtility::current_time() - start_ts); } } return ret; } -int ObUpgradeUtils::execute_update_sys_var_sql( - common::ObISQLClient &sql_client, - const uint64_t tenant_id, - const ObSysParam &sys_param) -{ - int ret = OB_SUCCESS; - if (OB_INVALID_TENANT_ID == tenant_id - || OB_INVALID_ID == tenant_id) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); - } else { - int64_t affected_rows = 0; - ObDMLSqlSplicer dml; - ObDMLExecHelper exec(sql_client, tenant_id); - if (OB_FAIL(gen_basic_sys_variable_dml(tenant_id, sys_param, dml))) { - LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param)); - } else if (OB_FAIL(exec.exec_update(OB_ALL_SYS_VARIABLE_TNAME, dml, affected_rows))) { - LOG_WARN("execute insert failed", KR(ret)); - } else if (!is_zero_row(affected_rows) && !is_single_row(affected_rows)) { - LOG_WARN("invalid affected_rows", KR(ret), K(tenant_id), K(affected_rows)); - } else { - LOG_INFO("[UPGRADE] modify sys var", KR(ret), K(tenant_id), K(sys_param)); - } - } - return ret; -} - -int ObUpgradeUtils::execute_update_sys_var_history_sql( - common::ObISQLClient &sql_client, - const uint64_t tenant_id, - const ObSysParam &sys_param) -{ - int ret = OB_SUCCESS; - if (OB_INVALID_TENANT_ID == tenant_id - || OB_INVALID_ID == tenant_id) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); - } else { - int64_t schema_version = OB_INVALID_VERSION; - { - SMART_VAR(ObMySQLProxy::MySQLResult, res) { - ObMySQLResult *result = NULL; - ObSqlString sql; - if (OB_FAIL(sql.append_fmt( - "select schema_version from %s where tenant_id = %lu" - " and zone = '' and name = '%s' order by schema_version desc limit 1", - OB_ALL_SYS_VARIABLE_HISTORY_TNAME, - ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), - sys_param.name_))) { - LOG_WARN("fail to append sql", KR(ret), K(tenant_id), K(sql)); - } else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) { - LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); - } else if (NULL == (result = res.get_result())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result is not expected to be NULL", KR(ret), K(tenant_id), K(sql)); - } else if (OB_FAIL(result->next())) { - LOG_WARN("fail to get row", KR(ret), K(tenant_id), K(sql)); - } else { - EXTRACT_INT_FIELD_MYSQL(*result, "schema_version", schema_version, int64_t); - } - } - } - if (OB_SUCC(ret)) { - int64_t affected_rows = 0; - ObDMLSqlSplicer dml; - ObDMLExecHelper exec(sql_client, tenant_id); - if (OB_INVALID_VERSION == schema_version) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid schema_version", KR(ret), K(tenant_id), K(sys_param)); - } else if (OB_FAIL(gen_basic_sys_variable_dml(tenant_id, sys_param, dml))) { - LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param)); - } else if (OB_FAIL(dml.add_pk_column("schema_version", schema_version))) { - LOG_WARN("fail to add column", KR(ret), K(tenant_id), K(schema_version)); - } else if (OB_FAIL(exec.exec_update(OB_ALL_SYS_VARIABLE_HISTORY_TNAME, dml, affected_rows))) { - LOG_WARN("execute insert failed", KR(ret)); - } else if (!is_zero_row(affected_rows) && !is_single_row(affected_rows)) { - LOG_WARN("invalid affected_rows", KR(ret), K(tenant_id), K(affected_rows)); - } else { - LOG_INFO("[UPGRADE] modify sys var history", KR(ret), K(tenant_id), K(sys_param)); - } - } - } - return ret; -} - -/* - * This function is used to restore backup data from cluster with lower cluster version. - * For missing system variable schema in physical restore, we compensate by methods below. - * 1. Missing system variable schema will be added according to hardcoded meta schema when refreshing schema. - * 2. (Not necessary) Modify __all_sys_variable/__all_sys_variable_history, so we can construct system variable schema - * from inner table when observer restarts. - */ -int ObUpgradeUtils::add_sys_var( - common::ObISQLClient &sql_client, - const uint64_t tenant_id, - common::ObArray &add_list) -{ - int ret = OB_SUCCESS; - if (OB_INVALID_TENANT_ID == tenant_id - || OB_INVALID_ID == tenant_id) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); - } else { - ObArenaAllocator allocator("AddSysVar"); - for (int64_t i = 0; OB_SUCC(ret) && i < add_list.count(); i++) { - int64_t var_store_idx = add_list.at(i); - const ObString &name = ObSysVariables::get_name(var_store_idx); - const ObObjType &type = ObSysVariables::get_type(var_store_idx); - const ObString &min = ObSysVariables::get_min(var_store_idx); - const ObString &max = ObSysVariables::get_max(var_store_idx); - const ObString &info = ObSysVariables::get_info(var_store_idx); - const int64_t flag = ObSysVariables::get_flags(var_store_idx); - const ObString zone(""); - ObSysParam sys_param; - ObString value; - if (OB_FAIL(convert_sys_variable_value(var_store_idx, allocator, value))) { - LOG_WARN("fail to get sys variable value", KR(ret), K(tenant_id), K(var_store_idx)); - } else if (OB_FAIL(sys_param.init(tenant_id, zone, name.ptr(), type, - value.ptr(), min.ptr(), max.ptr(), info.ptr(), flag))) { - LOG_WARN("sys_param init failed", KR(ret), K(tenant_id), K(name), - K(type), K(value), K(min), K(max), K(info), K(flag)); - } else if (!sys_param.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("sys param is invalid", KR(ret), K(tenant_id), K(sys_param)); - } else if (OB_FAIL(execute_add_sys_var_sql(sql_client, tenant_id, sys_param))) { - LOG_WARN("fail to execute add sys var sql", KR(ret), K(tenant_id)); - } else if (OB_FAIL(execute_add_sys_var_history_sql(sql_client, tenant_id, sys_param))) { - LOG_WARN("fail to execute add sys var history sql", KR(ret), K(tenant_id)); - } - } - } - return ret; -} - -// C++ implement for special_update_sys_vars_for_tenant() in python upgrade script. -int ObUpgradeUtils::convert_sys_variable_value( - const int64_t var_store_idx, - common::ObIAllocator &allocator, - ObString &value) -{ - int ret = OB_SUCCESS; - const ObString &name = ObSysVariables::get_name(var_store_idx); - if (0 == name.compare("nls_date_format")) { - if (OB_FAIL(ob_write_string( - allocator, ObString("YYYY-MM-DD HH24:MI:SS"), value))) { - LOG_WARN("fail to write string", KR(ret), K(name)); - } - } else if (0 == name.compare("nls_timestamp_format")) { - if (OB_FAIL(ob_write_string( - allocator, ObString("YYYY-MM-DD HH24:MI:SS.FF"), value))) { - LOG_WARN("fail to write string", KR(ret), K(name)); - } - } else if (0 == name.compare("nls_timestamp_tz_format")) { - if (OB_FAIL(ob_write_string( - allocator, ObString("YYYY-MM-DD HH24:MI:SS.FF TZR TZD"), value))) { - LOG_WARN("fail to write string", KR(ret), K(name)); - } - } else { - const ObString &ori_value = ObSysVariables::get_value(var_store_idx); - value.assign_ptr(ori_value.ptr(), ori_value.length()); - } - return ret; -} - -int ObUpgradeUtils::execute_add_sys_var_sql( - common::ObISQLClient &sql_client, - const uint64_t tenant_id, - const ObSysParam &sys_param) -{ - int ret = OB_SUCCESS; - if (OB_INVALID_TENANT_ID == tenant_id - || OB_INVALID_ID == tenant_id) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); - } else { - int64_t affected_rows = 0; - ObDMLSqlSplicer dml; - ObDMLExecHelper exec(sql_client, tenant_id); - if (OB_FAIL(gen_basic_sys_variable_dml(tenant_id, sys_param, dml))) { - LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param)); - } else if (OB_FAIL(dml.add_column("value", FORMAT_STR(ObString(sys_param.value_))))) { - LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param)); - } else if (OB_FAIL(exec.exec_replace(OB_ALL_SYS_VARIABLE_TNAME, dml, affected_rows))) { - LOG_WARN("execute insert failed", KR(ret)); - } else if (!is_zero_row(affected_rows) - && !is_single_row(affected_rows) - && !is_double_row(affected_rows)) { - LOG_WARN("invalid affected_rows", KR(ret), K(tenant_id), K(affected_rows)); - } else { - LOG_INFO("[UPGRADE] add sys var", KR(ret), K(tenant_id), K(sys_param)); - } - } - return ret; -} - -int ObUpgradeUtils::execute_add_sys_var_history_sql( - common::ObISQLClient &sql_client, - const uint64_t tenant_id, - const ObSysParam &sys_param) -{ - int ret = OB_SUCCESS; - if (OB_INVALID_TENANT_ID == tenant_id - || OB_INVALID_ID == tenant_id) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); - } else { - int64_t schema_version = OB_INVALID_VERSION; - { - SMART_VAR(ObMySQLProxy::MySQLResult, res) { - ObMySQLResult *result = NULL; - ObSqlString sql; - if (OB_FAIL(sql.append_fmt( - "select schema_version from %s where tenant_id = %lu" - " order by schema_version asc limit 1", - OB_ALL_SYS_VARIABLE_HISTORY_TNAME, - ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))) { - LOG_WARN("fail to append sql", KR(ret), K(tenant_id), K(sql)); - } else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) { - LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); - } else if (NULL == (result = res.get_result())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result is not expected to be NULL", KR(ret), K(tenant_id), K(sql)); - } else if (OB_FAIL(result->next())) { - LOG_WARN("fail to get row", KR(ret), K(tenant_id), K(sql)); - } else { - EXTRACT_INT_FIELD_MYSQL(*result, "schema_version", schema_version, int64_t); - } - } - } - if (OB_SUCC(ret)) { - int64_t affected_rows = 0; - ObDMLSqlSplicer dml; - ObDMLExecHelper exec(sql_client, tenant_id); - if (OB_INVALID_VERSION == schema_version) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid schema_version", KR(ret), K(tenant_id), K(sys_param)); - } else if (OB_FAIL(gen_basic_sys_variable_dml(tenant_id, sys_param, dml))) { - LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param)); - } else if (OB_FAIL(dml.add_pk_column("schema_version", schema_version)) - || OB_FAIL(dml.add_column("value", FORMAT_STR(ObString(sys_param.value_)))) - || OB_FAIL(dml.add_column("is_deleted", 0))) { - LOG_WARN("fail to add column", KR(ret), K(tenant_id), K(schema_version)); - } else if (OB_FAIL(exec.exec_replace(OB_ALL_SYS_VARIABLE_HISTORY_TNAME, dml, affected_rows))) { - LOG_WARN("execute insert failed", KR(ret)); - } else if (!is_zero_row(affected_rows) - && !is_single_row(affected_rows) - && !is_double_row(affected_rows)) { - LOG_WARN("invalid affected_rows", KR(ret), K(tenant_id), K(affected_rows)); - } else { - LOG_INFO("[UPGRADE] add sys var history", KR(ret), K(tenant_id), K(sys_param)); - } - } - } - return ret; -} - -int ObUpgradeUtils::gen_basic_sys_variable_dml( - const uint64_t tenant_id, - const ObSysParam &sys_param, - ObDMLSqlSplicer &dml) -{ - int ret = OB_SUCCESS; - if (OB_INVALID_TENANT_ID == tenant_id - || OB_INVALID_ID == tenant_id) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); - } else if ( - OB_FAIL(dml.add_pk_column("tenant_id", - ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id))) - || OB_FAIL(dml.add_pk_column("zone", "")) - || OB_FAIL(dml.add_pk_column("name", FORMAT_STR(ObString(sys_param.name_)))) - || OB_FAIL(dml.add_column("data_type", sys_param.data_type_)) - || OB_FAIL(dml.add_column("min_val", FORMAT_STR(ObString(sys_param.min_val_)))) - || OB_FAIL(dml.add_column("max_val", FORMAT_STR(ObString(sys_param.max_val_)))) - || OB_FAIL(dml.add_column("info", FORMAT_STR(ObString(sys_param.info_)))) - || OB_FAIL(dml.add_column("flags", sys_param.flags_))) { - LOG_WARN("fail to add column", KR(ret), K(tenant_id), K(sys_param)); - } - return ret; -} /* =========== upgrade sys variable end =========== */ /* =========== upgrade sys stat =========== */ diff --git a/src/share/ob_upgrade_utils.h b/src/share/ob_upgrade_utils.h index 9854bf7e5e..57f87784d1 100644 --- a/src/share/ob_upgrade_utils.h +++ b/src/share/ob_upgrade_utils.h @@ -38,7 +38,6 @@ public: static int can_run_upgrade_job(rootserver::ObRsJobType job_type, bool &can); static int check_upgrade_job_passed(rootserver::ObRsJobType job_type); static int check_schema_sync(bool &is_sync); - /* physical restore related */ // upgrade_sys_variable()/upgrade_sys_stat() can be called when enable_ddl = false. static int upgrade_sys_variable( obrpc::ObCommonRpcProxy &rpc_proxy, @@ -51,42 +50,16 @@ private: static int check_rs_job_success(rootserver::ObRsJobType job_type, bool &success); /* upgrade sys variable */ - static int calc_diff_sys_var( + static int calc_diff_sys_var_( common::ObISQLClient &sql_client, const uint64_t tenant_id, common::ObArray &update_list, common::ObArray &add_list); - static int update_sys_var( + static int update_sys_var_( obrpc::ObCommonRpcProxy &rpc_proxy, const uint64_t tenant_id, + const bool is_update, common::ObArray &update_list); - static int add_sys_var(common::ObISQLClient &sql_client, - const uint64_t tenant_id, - common::ObArray &add_list); - static int execute_update_sys_var_sql( - common::ObISQLClient &sql_client, - const uint64_t tenant_id, - const share::schema::ObSysParam &sys_param); - static int execute_update_sys_var_history_sql( - common::ObISQLClient &sql_client, - const uint64_t tenant_id, - const share::schema::ObSysParam &sys_param); - static int execute_add_sys_var_sql( - common::ObISQLClient &sql_client, - const uint64_t tenant_id, - const share::schema::ObSysParam &sys_param); - static int execute_add_sys_var_history_sql( - common::ObISQLClient &sql_client, - const uint64_t tenant_id, - const share::schema::ObSysParam &sys_param); - static int convert_sys_variable_value( - const int64_t var_store_idx, - common::ObIAllocator &allocator, - common::ObString &value); - static int gen_basic_sys_variable_dml( - const uint64_t tenant_id, - const share::schema::ObSysParam &sys_param, - share::ObDMLSqlSplicer &dml); /* upgrade sys variable end */ static int filter_sys_stat( common::ObISQLClient &sql_client, diff --git a/src/share/schema/ob_column_schema.cpp b/src/share/schema/ob_column_schema.cpp index 62fe3b8059..af4bd8ef5a 100644 --- a/src/share/schema/ob_column_schema.cpp +++ b/src/share/schema/ob_column_schema.cpp @@ -169,6 +169,12 @@ ObColumnSchemaV2 &ObColumnSchemaV2::operator =(const ObColumnSchemaV2 &src_schem return *this; } +int ObColumnSchemaV2::assign(const ObColumnSchemaV2 &other) +{ + *this = other; + return error_ret_; +} + bool ObColumnSchemaV2::operator==(const ObColumnSchemaV2 &r) const { return (tenant_id_ == r.tenant_id_ && table_id_ == r.table_id_ && column_id_ == r.column_id_ diff --git a/src/share/schema/ob_column_schema.h b/src/share/schema/ob_column_schema.h index 641ce9e538..d62d0d4568 100644 --- a/src/share/schema/ob_column_schema.h +++ b/src/share/schema/ob_column_schema.h @@ -61,6 +61,8 @@ public: bool operator==(const ObColumnSchemaV2 &r) const; bool operator!=(const ObColumnSchemaV2 &r) const; + int assign(const ObColumnSchemaV2 &other); + //set methods inline void set_tenant_id(const uint64_t id) { tenant_id_ = id; } inline void set_table_id(const uint64_t id) { table_id_ = id; } diff --git a/src/share/schema/ob_schema_service_sql_impl.cpp b/src/share/schema/ob_schema_service_sql_impl.cpp index 216c6a2b01..151411bd1b 100644 --- a/src/share/schema/ob_schema_service_sql_impl.cpp +++ b/src/share/schema/ob_schema_service_sql_impl.cpp @@ -1271,7 +1271,7 @@ int ObSchemaServiceSQLImpl::get_sys_variable_schema( } if (OB_SUCC(ret)) { - // mock missed system variable schema by hardcoded schema + // To avoid -5044 error, mock missed system variable schema with default value by hardcoded schema. for (int64_t i = 0; OB_SUCC(ret) && i < ObSysVariables::get_amount(); i++) { ObSysVarClassType sys_var_id = ObSysVariables::get_sys_var_id(i); const ObSysVarSchema *sys_var = NULL; diff --git a/src/share/schema/ob_schema_struct.cpp b/src/share/schema/ob_schema_struct.cpp index 32086b2453..c1bf244c77 100644 --- a/src/share/schema/ob_schema_struct.cpp +++ b/src/share/schema/ob_schema_struct.cpp @@ -2252,6 +2252,17 @@ bool ObSysVarSchema::is_equal_except_value(const ObSysVarSchema &other) const } return bret; } + +bool ObSysVarSchema::is_equal_for_add(const ObSysVarSchema &other) const +{ + bool bret = false; + if (is_equal_except_value(other) + && 0 == value_.compare(other.value_) + && zone_ == other.zone_) { + bret = true; + } + return bret; +} /*------------------------------------------------------------------------------------------------- * ------------------------------ObDatabaseSchema------------------------------------------- ----------------------------------------------------------------------------------------------------*/ diff --git a/src/share/schema/ob_schema_struct.h b/src/share/schema/ob_schema_struct.h index 23f6a05064..eacf2c46fd 100644 --- a/src/share/schema/ob_schema_struct.h +++ b/src/share/schema/ob_schema_struct.h @@ -1271,6 +1271,7 @@ public: void reset(); int64_t get_convert_size() const; bool is_equal_except_value(const ObSysVarSchema &other) const; + bool is_equal_for_add(const ObSysVarSchema &other) const; uint64_t get_tenant_id() const { return tenant_id_; } void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; } const common::ObString &get_name() const { return name_; } diff --git a/src/share/schema/ob_schema_utils.cpp b/src/share/schema/ob_schema_utils.cpp index f0354d04c2..ed6604ddd6 100644 --- a/src/share/schema/ob_schema_utils.cpp +++ b/src/share/schema/ob_schema_utils.cpp @@ -339,23 +339,18 @@ int ObSchemaUtils::construct_tenant_space_simple_table( ObSimpleTableSchemaV2 &table) { int ret = OB_SUCCESS; - if (is_sys_tenant(tenant_id)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); - } else { - table.set_tenant_id(tenant_id); - // for distributed virtual table in tenant space - int64_t part_num = table.get_partition_num(); - for (int64_t i = 0; OB_SUCC(ret) && i < part_num; i++) { - ObPartition *part = table.get_part_array()[i]; - if (OB_ISNULL(part)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("part is null", KR(ret), K(i)); - } else { - part->set_tenant_id(tenant_id); - } - } // end for - } + table.set_tenant_id(tenant_id); + // for distributed virtual table in tenant space + int64_t part_num = table.get_partition_num(); + for (int64_t i = 0; OB_SUCC(ret) && i < part_num; i++) { + ObPartition *part = table.get_part_array()[i]; + if (OB_ISNULL(part)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("part is null", KR(ret), K(i)); + } else { + part->set_tenant_id(tenant_id); + } + } // end for return ret; } diff --git a/src/share/schema/ob_table_schema.cpp b/src/share/schema/ob_table_schema.cpp index 0eb90a830b..7e5dafbb57 100644 --- a/src/share/schema/ob_table_schema.cpp +++ b/src/share/schema/ob_table_schema.cpp @@ -1852,8 +1852,7 @@ int ObTableSchema::alter_column(ObColumnSchemaV2 &column_schema, ObColumnCheckMo } } if (OB_FAIL(ret)) { - } else if (OB_FALSE_IT(*src_schema = column_schema)) { - } else if (OB_FAIL(src_schema->get_err_ret())) { + } else if (OB_FAIL(src_schema->assign(column_schema))) { LOG_WARN("failed to assign src schema", K(ret), K(column_schema)); } } @@ -4271,10 +4270,10 @@ int ObTableSchema::check_column_can_be_altered_offline( int ObTableSchema::check_column_can_be_altered_online( const ObColumnSchemaV2 *src_schema, - ObColumnSchemaV2 *dst_schema) + ObColumnSchemaV2 *dst_schema) const { int ret = OB_SUCCESS; - ObColumnSchemaV2 *tmp_column = NULL; + const ObColumnSchemaV2 *tmp_column = NULL; if (OB_ISNULL(src_schema) || NULL == dst_schema) { ret = OB_INVALID_ARGUMENT; LOG_WARN("The column schema is NULL", K(ret)); @@ -4289,8 +4288,23 @@ int ObTableSchema::check_column_can_be_altered_online( LOG_WARN("Only NORMAL table and INDEX table and SYSTEM table are allowed", K(ret)); } else { LOG_DEBUG("check column schema can be altered", KPC(src_schema), KPC(dst_schema)); + // Additional restriction for system table: + // 1. Can't alter column name + // 2. Can't alter column from "NULL" to "NOT NULL" + if (is_system_table(get_table_id())) { + if (0 != src_schema->get_column_name_str().compare(dst_schema->get_column_name_str())) { + ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "Alter system table's column name is"); + LOG_WARN("Alter system table's column name is not supported", KR(ret), K(src_schema), K(dst_schema)); + } else if (src_schema->is_nullable() && !dst_schema->is_nullable()) { + ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "Alter system table's column from `NULL` to `NOT NULL`"); + LOG_WARN("Alter system table's column from `NULL` to `NOT NULL` is not supported", KR(ret), K(src_schema), K(dst_schema)); + } + } + bool is_oracle_mode = false; - if (OB_FAIL(check_if_oracle_compat_mode(is_oracle_mode))) { + if (FAILEDx(check_if_oracle_compat_mode(is_oracle_mode))) { LOG_WARN("check if oracle compat mode failed", K(ret)); } else if (is_oracle_mode && ob_is_number_tc(src_schema->get_data_type()) diff --git a/src/share/schema/ob_table_schema.h b/src/share/schema/ob_table_schema.h index 30d75feffe..b2fe6198f8 100644 --- a/src/share/schema/ob_table_schema.h +++ b/src/share/schema/ob_table_schema.h @@ -1257,7 +1257,7 @@ public: virtual int init_column_meta_array( common::ObIArray &meta_array) const override; int check_column_can_be_altered_online(const ObColumnSchemaV2 *src_schema, - ObColumnSchemaV2 *dst_schema); + ObColumnSchemaV2 *dst_schema) const; int check_column_can_be_altered_offline(const ObColumnSchemaV2 *src_schema, ObColumnSchemaV2 *dst_schema) const; int check_alter_column_is_offline(const ObColumnSchemaV2 *src_schema, diff --git a/src/share/schema/ob_table_sql_service.cpp b/src/share/schema/ob_table_sql_service.cpp index d862a25450..368ea3d3bf 100644 --- a/src/share/schema/ob_table_sql_service.cpp +++ b/src/share/schema/ob_table_sql_service.cpp @@ -860,9 +860,11 @@ int ObTableSqlService::revise_check_cst_column_info( return ret; } -int ObTableSqlService::insert_single_column(ObISQLClient &sql_client, - const ObTableSchema &new_table_schema, - const ObColumnSchemaV2 &new_column_schema) +int ObTableSqlService::insert_single_column( + ObISQLClient &sql_client, + const ObTableSchema &new_table_schema, + const ObColumnSchemaV2 &new_column_schema, + const bool record_ddl_operation) { int ret = OB_SUCCESS; if (OB_FAIL(check_ddl_allowed(new_table_schema))) { @@ -881,7 +883,7 @@ int ObTableSqlService::insert_single_column(ObISQLClient &sql_client, } } } - if (OB_SUCC(ret)) { + if (OB_SUCC(ret) && record_ddl_operation) { ObSchemaOperation opt; opt.tenant_id_ = new_table_schema.get_tenant_id(); opt.database_id_ = new_table_schema.get_database_id(); @@ -3681,10 +3683,10 @@ int ObTableSqlService::gen_column_dml( orig_default_value.assign_ptr(orig_default_value_buf, static_cast(orig_default_value_len)); cur_default_value.assign_ptr(cur_default_value_buf, static_cast(cur_default_value_len)); } - LOG_DEBUG("begin gen_column_dml", K(ret), K(compat_mode), K(orig_default_value), K(cur_default_value), K(orig_default_value_len), K(cur_default_value_len)); + LOG_TRACE("begin gen_column_dml", K(ret), K(compat_mode), K(orig_default_value), K(cur_default_value), K(orig_default_value_len), K(cur_default_value_len)); } } - LOG_DEBUG("begin gen_column_dml", K(ret), K(orig_default_value), K(cur_default_value), K(column)); + LOG_TRACE("begin gen_column_dml", K(ret), K(orig_default_value), K(cur_default_value), K(column)); if (OB_SUCC(ret)) { ObString cur_default_value_v1; if (column.get_orig_default_value().is_null()) { diff --git a/src/share/schema/ob_table_sql_service.h b/src/share/schema/ob_table_sql_service.h index b18b20eab7..c41f83aa16 100644 --- a/src/share/schema/ob_table_sql_service.h +++ b/src/share/schema/ob_table_sql_service.h @@ -98,7 +98,8 @@ public: //alter table add column int insert_single_column(common::ObISQLClient &sql_client, const ObTableSchema &new_table_schema, - const ObColumnSchemaV2 &column_schema); + const ObColumnSchemaV2 &column_schema, + const bool record_ddl_operation); //alter table add constraint int insert_single_constraint(common::ObISQLClient &sql_client, const ObTableSchema &new_table_schema, diff --git a/src/sql/resolver/cmd/ob_alter_system_resolver.cpp b/src/sql/resolver/cmd/ob_alter_system_resolver.cpp index 9df7523f4e..e93c03d99d 100644 --- a/src/sql/resolver/cmd/ob_alter_system_resolver.cpp +++ b/src/sql/resolver/cmd/ob_alter_system_resolver.cpp @@ -38,6 +38,7 @@ #include "share/backup/ob_backup_io_adapter.h" #include "share/backup/ob_backup_config.h" #include "observer/mysql/ob_query_response_time.h" +#include "rootserver/ob_rs_job_table_operator.h" //ObRsJobType namespace oceanbase { @@ -2596,15 +2597,24 @@ int ObRunUpgradeJobResolver::resolve(const ParseNode &parse_tree) LOG_ERROR("create ObRunUpgradeJobStmt failed", KR(ret)); } else { stmt_ = stmt; - ObString version_str; + ObString str; uint64_t version = OB_INVALID_VERSION; - if (OB_FAIL(Util::resolve_string(parse_tree.children_[0], version_str))) { + if (OB_FAIL(Util::resolve_string(parse_tree.children_[0], str))) { LOG_WARN("resolve string failed", KR(ret)); - } else if (OB_FAIL(ObClusterVersion::get_version(version_str, version))) { - LOG_WARN("fail to get version", KR(ret), K(version_str)); + } else if (0 == str.case_compare(rootserver::ObRsJobTableOperator::get_job_type_str( + rootserver::JOB_TYPE_UPGRADE_SYSTEM_VARIABLE))) { + stmt->get_rpc_arg().action_ = obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE; + } else if (0 == str.case_compare(rootserver::ObRsJobTableOperator::get_job_type_str( + rootserver::JOB_TYPE_UPGRADE_SYSTEM_TABLE))) { + stmt->get_rpc_arg().action_ = obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE; } else { - stmt->get_rpc_arg().action_ = obrpc::ObUpgradeJobArg::RUN_UPGRADE_JOB; - stmt->get_rpc_arg().version_ = static_cast(version); + // UPGRADE_POST_ACTION + if (OB_FAIL(ObClusterVersion::get_version(str, version))) { + LOG_WARN("fail to get version", KR(ret), K(str)); + } else { + stmt->get_rpc_arg().action_ = obrpc::ObUpgradeJobArg::UPGRADE_POST_ACTION; + stmt->get_rpc_arg().version_ = static_cast(version); + } } } } diff --git a/tools/upgrade/special_upgrade_action_post.py b/tools/upgrade/special_upgrade_action_post.py index 55693563b7..bbc66492d1 100755 --- a/tools/upgrade/special_upgrade_action_post.py +++ b/tools/upgrade/special_upgrade_action_post.py @@ -194,7 +194,7 @@ def get_max_used_job_id(cur): def check_can_run_upgrade_job(cur, version): try: sql = """select job_status from oceanbase.__all_rootservice_job - where job_type = 'RUN_UPGRADE_POST_JOB' and extra_info = '{0}' + where job_type = 'UPGRADE_POST_ACTION' and extra_info = '{0}' order by job_id desc limit 1""".format(version) results = query(cur, sql) diff --git a/tools/upgrade/upgrade_post.py b/tools/upgrade/upgrade_post.py index af6d2fcc26..1da21fbc4d 100755 --- a/tools/upgrade/upgrade_post.py +++ b/tools/upgrade/upgrade_post.py @@ -12717,7 +12717,7 @@ #def check_can_run_upgrade_job(cur, version): # try: # sql = """select job_status from oceanbase.__all_rootservice_job -# where job_type = 'RUN_UPGRADE_POST_JOB' and extra_info = '{0}' +# where job_type = 'UPGRADE_POST_ACTION' and extra_info = '{0}' # order by job_id desc limit 1""".format(version) # results = query(cur, sql) # diff --git a/tools/upgrade/upgrade_pre.py b/tools/upgrade/upgrade_pre.py index 07d218f35b..60bbd808ff 100755 --- a/tools/upgrade/upgrade_pre.py +++ b/tools/upgrade/upgrade_pre.py @@ -12717,7 +12717,7 @@ #def check_can_run_upgrade_job(cur, version): # try: # sql = """select job_status from oceanbase.__all_rootservice_job -# where job_type = 'RUN_UPGRADE_POST_JOB' and extra_info = '{0}' +# where job_type = 'UPGRADE_POST_ACTION' and extra_info = '{0}' # order by job_id desc limit 1""".format(version) # results = query(cur, sql) #