[UPGRADE] UPGRADE_SYSTEM_VARIABLE/UPGRADE_SYSTEM_TABLE CMD

This commit is contained in:
obdev 2022-11-04 03:06:24 +00:00 committed by wangzelin.wzl
parent 8d51281eb2
commit acc97de57f
34 changed files with 1007 additions and 613 deletions

View File

@ -209,6 +209,7 @@ void oceanbase::observer::init_srv_xlator_for_rootserver(ObSrvRpcXlator *xlator)
RPC_PROCESSOR(rootserver::ObRpcRunJobP, *gctx_.root_service_);
RPC_PROCESSOR(rootserver::ObRpcAdminRefreshIOCalibrationP, *gctx_.root_service_);
RPC_PROCESSOR(rootserver::ObRpcRunUpgradeJobP, *gctx_.root_service_);
RPC_PROCESSOR(rootserver::ObRpcUpgradeTableSchemaP, *gctx_.root_service_);
RPC_PROCESSOR(rootserver::ObRpcAdminFlushCacheP, *gctx_.root_service_);
RPC_PROCESSOR(rootserver::ObRpcAdminUpgradeCmdP, *gctx_.root_service_);
RPC_PROCESSOR(rootserver::ObRpcAdminRollingUpgradeCmdP, *gctx_.root_service_);

View File

@ -2685,7 +2685,8 @@ int ObDDLOperator::insert_single_column(ObMySQLTransaction &trans,
LOG_WARN("fail to gen new schema_version", K(ret), K(tenant_id));
} else if (FALSE_IT(new_column.set_schema_version(new_schema_version))) {
//do nothing
} else if (OB_FAIL(schema_service->get_table_sql_service().insert_single_column(trans, new_table_schema, new_column))) {
} else if (OB_FAIL(schema_service->get_table_sql_service().insert_single_column(
trans, new_table_schema, new_column, true))) {
LOG_WARN("insert single column failed", K(ret));
}
return ret;
@ -3853,6 +3854,59 @@ int ObDDLOperator::update_single_column(common::ObMySQLTransaction &trans,
return ret;
}
int ObDDLOperator::batch_update_system_table_columns(
common::ObMySQLTransaction &trans,
const share::schema::ObTableSchema &orig_table_schema,
share::schema::ObTableSchema &new_table_schema,
const common::ObIArray<uint64_t> &add_column_ids,
const common::ObIArray<uint64_t> &alter_column_ids,
const common::ObString *ddl_stmt_str/*=NULL*/)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = new_table_schema.get_tenant_id();
const uint64_t table_id = new_table_schema.get_table_id();
int64_t new_schema_version = OB_INVALID_VERSION;
ObSchemaService *schema_service_impl = schema_service_.get_schema_service();
if (OB_ISNULL(schema_service_impl)) {
ret = OB_ERR_SYS;
LOG_WARN("schema_service_impl must not null", KR(ret));
} else if (OB_FAIL(schema_service_.gen_new_schema_version(tenant_id, new_schema_version))) {
LOG_WARN("fail to gen new schema_version", KR(ret), K(tenant_id));
} else {
(void) new_table_schema.set_schema_version(new_schema_version);
ObColumnSchemaV2 *new_column = NULL;
for (int64_t i = 0; OB_SUCC(ret) && i < add_column_ids.count(); i++) {
const uint64_t column_id = add_column_ids.at(i);
if (OB_ISNULL(new_column = new_table_schema.get_column_schema(column_id))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get column", KR(ret), K(tenant_id), K(table_id), K(column_id));
} else if (FALSE_IT(new_column->set_schema_version(new_schema_version))) {
} else if (OB_FAIL(schema_service_impl->get_table_sql_service().insert_single_column(
trans, new_table_schema, *new_column, false))) {
LOG_WARN("fail to insert column", KR(ret), K(tenant_id), K(table_id), K(column_id));
}
} // end for
for (int64_t i = 0; OB_SUCC(ret) && i < alter_column_ids.count(); i++) {
const uint64_t column_id = alter_column_ids.at(i);
if (OB_ISNULL(new_column = new_table_schema.get_column_schema(column_id))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get column", KR(ret), K(tenant_id), K(table_id), K(column_id));
} else if (FALSE_IT(new_column->set_schema_version(new_schema_version))) {
} else if (OB_FAIL(schema_service_impl->get_table_sql_service().update_single_column(
trans, orig_table_schema, new_table_schema, *new_column, false))) {
LOG_WARN("fail to insert column", KR(ret), K(tenant_id), K(table_id), K(column_id));
}
} // end for
if (FAILEDx(schema_service_impl->get_table_sql_service().update_table_attribute(
trans, new_table_schema, OB_DDL_ALTER_TABLE, ddl_stmt_str))) {
LOG_WARN("failed to update table attribute", KR(ret), K(tenant_id), K(table_id));
}
}
return ret;
}
int ObDDLOperator::update_partition_option(common::ObMySQLTransaction &trans,
ObTableSchema &table_schema)
{

View File

@ -292,6 +292,13 @@ public:
int delete_single_column(common::ObMySQLTransaction &trans,
share::schema::ObTableSchema &new_table_schema,
const common::ObString &column_name);
int batch_update_system_table_columns(
common::ObMySQLTransaction &trans,
const share::schema::ObTableSchema &orig_table_schema,
share::schema::ObTableSchema &new_table_schema,
const common::ObIArray<uint64_t> &add_column_ids,
const common::ObIArray<uint64_t> &alter_column_ids,
const common::ObString *ddl_stmt_str = NULL);
int create_sequence_in_create_table(share::schema::ObTableSchema &table_schema,
common::ObMySQLTransaction &trans,
share::schema::ObSchemaGetterGuard &schema_guard,

View File

@ -18513,6 +18513,204 @@ int ObDDLService::update_index_status(const obrpc::ObUpdateIndexStatusArg &arg)
return ret;
}
int ObDDLService::upgrade_table_schema(const obrpc::ObUpgradeTableSchemaArg &arg)
{
int ret = OB_SUCCESS;
FLOG_INFO("[UPGRADE] begin upgrade system table", K(arg));
const uint64_t tenant_id = arg.get_tenant_id();
const uint64_t table_id = arg.get_table_id();
int64_t start_time = ObTimeUtility::current_time();
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (!arg.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", KR(ret), K(arg));
} else if (!GCONF.enable_sys_table_ddl) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("upgrade table schema when enable_sys_table_ddl is off is not allowed",
KR(ret), K(arg));
} else {
HEAP_VAR(ObTableSchema, hard_code_schema) {
ObSchemaGetterGuard schema_guard;
bool exist = false;
if (OB_FAIL(get_hard_code_system_table_schema_(
tenant_id, table_id, hard_code_schema))) {
LOG_WARN("fail to get hard code table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) {
LOG_WARN("get_schema_guard with version in inner table failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.check_table_exist(tenant_id, table_id, exist))) {
LOG_WARN("fail to check table exist", KR(ret), K(tenant_id), K(table_id));
} else if (!exist) {
if (OB_FAIL(create_system_table_(schema_guard, hard_code_schema))) {
LOG_WARN("fail to create system table", KR(ret), K(tenant_id), K(table_id));
}
} else if (OB_FAIL(alter_system_table_column_(schema_guard, hard_code_schema))) {
LOG_WARN("fail to alter system table's column", KR(ret), K(tenant_id), K(table_id));
}
}
}
FLOG_INFO("[UPGRADE] end upgrade system table",
KR(ret), K(tenant_id), K(table_id),
"cost", ObTimeUtility::current_time() - start_time);
return ret;
}
int ObDDLService::get_hard_code_system_table_schema_(
const uint64_t tenant_id,
const uint64_t table_id,
ObTableSchema &hard_code_schema)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == tenant_id
&& !is_system_table(table_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id or table_id", KR(ret), K(tenant_id), K(table_id));
} else {
bool finded = false;
const schema_create_func *creator_ptr_array[] = {
share::core_table_schema_creators,
share::sys_table_schema_creators, NULL };
for (const schema_create_func **creator_ptr_ptr = creator_ptr_array;
OB_SUCC(ret) && !finded && OB_NOT_NULL(*creator_ptr_ptr); ++creator_ptr_ptr) {
for (const schema_create_func *creator_ptr = *creator_ptr_ptr;
OB_SUCC(ret) && !finded && OB_NOT_NULL(*creator_ptr); ++creator_ptr) {
hard_code_schema.reset();
bool exist = false;
if (OB_FAIL((*creator_ptr)(hard_code_schema))) {
LOG_WARN("create table schema failed", KR(ret));
} else if (!is_sys_tenant(tenant_id)
&& OB_FAIL(ObSchemaUtils::construct_tenant_space_full_table(
tenant_id, hard_code_schema))) {
LOG_WARN("fail to construct tenant space table", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObSysTableChecker::is_inner_table_exist(
tenant_id, hard_code_schema, exist))) {
LOG_WARN("fail to check inner table exist",
KR(ret), K(tenant_id), K(hard_code_schema));
} else if (!exist) {
// skip
} else if (hard_code_schema.get_table_id() == table_id) {
finded = true;
}
} // end for
} // end for
if (OB_SUCC(ret) && !finded) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("hard code table schema not exist", KR(ret), K(tenant_id), K(table_id));
}
}
return ret;
}
int ObDDLService::create_system_table_(
ObSchemaGetterGuard &schema_guard,
const ObTableSchema &hard_code_schema)
{
int ret = OB_SUCCESS;
bool if_not_exist = true;
int64_t frozen_scn = 0;
ObArray<ObTableSchema> table_schemas;
// the following variable is not used
ObString ddl_stmt_str;
ObErrorInfo error_info;
obrpc::ObSequenceDDLArg sequence_ddl_arg;
uint64_t last_replay_log_id = 0;
ObArray<ObDependencyInfo> dep_infos;
ObArray<ObMockFKParentTableSchema> mock_fk_parent_table_schema_array;
// sys index、sys lob table will be added in create_user_tables()
if (OB_FAIL(table_schemas.push_back(hard_code_schema))) {
LOG_WARN("fail to push back new table schema", KR(ret));
} else if (OB_FAIL(ObMajorFreezeHelper::get_frozen_scn(
hard_code_schema.get_tenant_id(), frozen_scn))) {
LOG_WARN("get_frozen_scn failed", KR(ret), "tenant_id", hard_code_schema.get_tenant_id());
} else if (OB_FAIL(create_user_tables(if_not_exist, ddl_stmt_str,
error_info, table_schemas, frozen_scn, schema_guard, sequence_ddl_arg,
last_replay_log_id, &dep_infos, mock_fk_parent_table_schema_array))) {
LOG_WARN("fail to create system table", KR(ret), K(hard_code_schema));
}
return ret;
}
int ObDDLService::alter_system_table_column_(
ObSchemaGetterGuard &schema_guard,
const ObTableSchema &hard_code_schema)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = hard_code_schema.get_tenant_id();
const uint64_t table_id = hard_code_schema.get_table_id();
const ObTableSchema *orig_table_schema = NULL;
ObArray<uint64_t> add_column_ids;
ObArray<uint64_t> alter_column_ids;
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, orig_table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(orig_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(ObRootInspection::check_and_get_system_table_column_diff(
*orig_table_schema, hard_code_schema, add_column_ids, alter_column_ids))) {
LOG_WARN("fail to check system table's column schemas", KR(ret), K(tenant_id), K(table_id));
} else if (0 == add_column_ids.count() && 0 == alter_column_ids.count()) {
LOG_INFO("system table's column schemas not changed, just skip", KR(ret), K(tenant_id), K(table_id));
} else {
ObDDLSQLTransaction trans(schema_service_);
ObDDLOperator ddl_operator(*schema_service_, *sql_proxy_);
int64_t refreshed_schema_version = 0;
HEAP_VAR(ObTableSchema, new_table_schema) {
if (OB_FAIL(schema_guard.get_schema_version(tenant_id, refreshed_schema_version))) {
LOG_WARN("failed to get tenant schema version", KR(ret), K(tenant_id));
} else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, refreshed_schema_version))) {
LOG_WARN("failed to start trans", KR(ret), K(tenant_id), K(refreshed_schema_version));
} else if (OB_FAIL(new_table_schema.assign(*orig_table_schema))) {
LOG_WARN("fail to assign table schema", KR(ret), K(tenant_id), K(table_id));
} else {
const ObColumnSchemaV2 *hard_code_column = NULL;
for (int64_t i = 0; OB_SUCC(ret) && i < add_column_ids.count(); i++) {
const uint64_t column_id = add_column_ids.at(i);
if (OB_ISNULL(hard_code_column = hard_code_schema.get_column_schema(column_id))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get column schema", KR(ret), K(tenant_id), K(table_id), K(column_id));
} else if (OB_FAIL(new_table_schema.add_column(*hard_code_column))) {
LOG_WARN("fail to add column", KR(ret), KPC(hard_code_column));
}
} // end for
ObColumnSchemaV2 new_column;
for (int64_t i = 0; OB_SUCC(ret) && i < alter_column_ids.count(); i++) {
const uint64_t column_id = alter_column_ids.at(i);
if (OB_ISNULL(hard_code_column = hard_code_schema.get_column_schema(column_id))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get column schema", KR(ret), K(tenant_id), K(table_id), K(column_id));
} else if (OB_FAIL(new_column.assign(*hard_code_column))) {
LOG_WARN("fail to assign column", KR(ret), KPC(hard_code_column));
} else if (OB_FAIL(new_table_schema.alter_column(new_column, share::schema::ObTableSchema::CHECK_MODE_ONLINE))) {
LOG_WARN("fail to alter column", KR(ret), K(new_column));
}
} // end for
if (FAILEDx(ddl_operator.batch_update_system_table_columns(trans,
*orig_table_schema, new_table_schema, add_column_ids, alter_column_ids, NULL))) {
LOG_WARN("fail to batch update columns", KR(ret), K(new_table_schema));
}
}
if (trans.is_started()) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
LOG_WARN("trans end failed", "is_commit", OB_SUCC(ret), K(tmp_ret));
ret = (OB_SUCC(ret)) ? tmp_ret : ret;
}
}
if (FAILEDx(publish_schema(tenant_id))) {
LOG_WARN("fail to publish schema", KR(ret), K(tenant_id));
}
} // end HEAP_VAR
}
return ret;
}
int ObDDLService::add_table_schema(
ObTableSchema &table_schema,
@ -18533,18 +18731,30 @@ int ObDDLService::drop_inner_table(const share::schema::ObTableSchema &table_sch
ObString *stmt = NULL;
ObSchemaGetterGuard schema_guard;
const uint64_t tenant_id = table_schema.get_tenant_id();
const uint64_t table_id = table_schema.get_table_id();
const ObSimpleTableSchemaV2 * table = NULL;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("variable is not init");
} else if (!is_inner_table(table_schema.get_table_id())) {
LOG_WARN("variable is not init", KR(ret), K(tenant_id), K(table_id));
} else if (!is_inner_table(table_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("table not inner table", "table_id", table_schema.get_table_id(), K(ret));
LOG_WARN("table not inner table", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard with version in inner table", K(ret), K(tenant_id));
} else if (OB_FAIL(drop_table_in_trans(schema_guard, table_schema, false, table_schema.is_index_table(),
LOG_WARN("fail to get schema guard with version in inner table", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_simple_table_schema(tenant_id, table_id, table))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_ISNULL(table)) {
// bugfix: https://work.aone.alibaba-inc.com/issue/45050614
// virtual table index may be dropped with virtual table, so here we ignore OB_TABLE_NOT_EXIST failure.
LOG_INFO("table has already been dropped, just ignore",
K(tenant_id), K(table_id), "table_name", table_schema.get_table_name());
} else if (OB_FAIL(drop_table_in_trans(schema_guard,
table_schema,
false,
table_schema.is_index_table(),
false, /* to recyclebin*/
stmt,
NULL, NULL, NULL))) {
LOG_WARN("drop table in transaction failed", K(ret), K(table_schema));
LOG_WARN("drop table in transaction failed", KR(ret), K(tenant_id), K(table_id));
}
return ret;
}
@ -21685,7 +21895,8 @@ int ObDDLService::lock_tenant(const ObString &tenant_name, const bool is_lock)
int ObDDLService::add_system_variable(const ObAddSysVarArg &arg)
{
LOG_INFO("receive add system variable request", K(arg));
FLOG_INFO("[UPGRADE] begin upgrade system variable", K(arg));
int64_t start_time = ObTimeUtility::current_time();
DEBUG_SYNC(BEFORE_UPRADE_SYSTEM_VARIABLE);
int ret = OB_SUCCESS;
ObDDLSQLTransaction trans(schema_service_);
@ -21725,14 +21936,15 @@ int ObDDLService::add_system_variable(const ObAddSysVarArg &arg)
LOG_WARN("sys var schema is null", KR(ret), K(arg));
} else if (!arg.update_sys_var_) {
// case 1. add sys var, and sys var exist
if (arg.if_not_exist_) {
execute = false;
if (new_sys_var.is_equal_for_add(*old_schema)) {
// new sys var will be mocked by schema when upgrade,
// only persist new sys var when sys var is equal with old sys var.
} else {
ret = OB_ERR_PARAM_DUPLICATE;
ret = OB_SCHEMA_ERROR;
LOG_WARN("system variable duplicated", KR(ret), K(var_name));
}
} else {
// upate sys var
// update sys var
if (new_sys_var.is_equal_except_value(*old_schema)) {
// case 2. new sys var is same with existed schema(except value), do nothing
execute = false;
@ -21784,6 +21996,8 @@ int ObDDLService::add_system_variable(const ObAddSysVarArg &arg)
LOG_WARN("publish schema failed", KR(ret));
}
}
FLOG_INFO("[UPGRADE] end upgrade system variable",
KR(ret), K(arg), "cost", ObTimeUtility::current_time() - start_time);
return ret;
}

View File

@ -223,6 +223,7 @@ public:
const share::schema::ObTableSchema **table_schema);
virtual int update_index_status(const obrpc::ObUpdateIndexStatusArg &arg);
int upgrade_table_schema(const obrpc::ObUpgradeTableSchemaArg &arg);
virtual int add_table_schema(share::schema::ObTableSchema &table_schema,
share::schema::ObSchemaGetterGuard &schema_guard);
virtual int drop_inner_table(const share::schema::ObTableSchema &table_schema);
@ -2155,6 +2156,17 @@ private:
int check_table_pk(const share::schema::ObTableSchema &orig_table_schema);
int check_can_convert_to_character(const share::schema::ObColumnSchemaV2 &col_schema, bool &can_convert);
int clean_global_context(const ObContextSchema &context_schema);
int get_hard_code_system_table_schema_(
const uint64_t tenant_id,
const uint64_t table_id,
share::schema::ObTableSchema &hard_code_schema);
int create_system_table_(
share::schema::ObSchemaGetterGuard &schema_guard,
const share::schema::ObTableSchema &hard_code_schema);
int alter_system_table_column_(
share::schema::ObSchemaGetterGuard &schema_guard,
const share::schema::ObTableSchema &hard_code_schema);
private:
bool inited_;
volatile bool stopped_;

View File

@ -726,7 +726,7 @@ int ObRootInspection::check_all()
// check sys schema
tmp = OB_SUCCESS;
if (OB_SUCCESS != (tmp = check_sys_table_schemas())) {
if (OB_SUCCESS != (tmp = check_sys_table_schemas_())) {
LOG_WARN("check_sys_table_schemas failed", K(tmp));
ret = (OB_SUCCESS == ret) ? tmp : ret;
}
@ -976,51 +976,6 @@ int ObRootInspection::check_and_insert_sys_params(uint64_t tenant_id,
LOG_WARN("some sys var exist in hard code, but does not exist in inner table, "
"they will be inserted into table", K(table_name), K(miss_names));
}
ObSqlString sql;
ObSysParam sys_param;
obrpc::ObAddSysVarArg arg;
arg.exec_tenant_id_ = tenant_id;
const ObZone global_zone = "";
for (int64_t i = 0; OB_SUCC(ret) && i < miss_names.count(); ++i) {
sql.reset();
sys_param.reset();
arg.sysvar_.reset();
arg.if_not_exist_ = true;
arg.sysvar_.set_tenant_id(tenant_id);
int64_t var_store_idx = OB_INVALID_INDEX;
if (OB_ISNULL(rpc_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy is null");
} else if (OB_FAIL(check_cancel())) {
LOG_WARN("check_cancel failed", K(ret));
} else if (OB_FAIL(ObSysVarFactory::calc_sys_var_store_idx_by_name(
ObString(miss_names.at(i).ptr()), var_store_idx))) {
LOG_WARN("fail to calc sys var store idx by name", K(ret), K(miss_names.at(i).ptr()));
} else if (false == ObSysVarFactory::is_valid_sys_var_store_idx(var_store_idx)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("calc sys var store idx success but store_idx is invalid", K(ret), K(var_store_idx));
} else {
const ObString &name = ObSysVariables::get_name(var_store_idx);
const ObObjType &type = ObSysVariables::get_type(var_store_idx);
const ObString &value = ObSysVariables::get_value(var_store_idx);
const ObString &min = ObSysVariables::get_min(var_store_idx);
const ObString &max = ObSysVariables::get_max(var_store_idx);
const ObString &info = ObSysVariables::get_info(var_store_idx);
const int64_t flag = ObSysVariables::get_flags(var_store_idx);
if (OB_FAIL(sys_param.init(tenant_id, global_zone, name.ptr(), type,
value.ptr(), min.ptr(), max.ptr(), info.ptr(), flag))) {
LOG_WARN("sys_param init failed", K(tenant_id), K(name), K(type), K(value),
K(min), K(max), K(info), K(flag), K(ret));
} else if (!sys_param.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("sys param is invalid", K(sys_param), K(ret));
} else if (OB_FAIL(ObSchemaUtils::convert_sys_param_to_sysvar_schema(sys_param, arg.sysvar_))) {
LOG_WARN("convert sys param to sysvar schema failed", K(ret));
} else if (OB_FAIL(rpc_proxy_->add_system_variable(arg))) {
LOG_WARN("add system variable failed", K(ret));
}
}
}
}
return ret;
}
@ -1190,7 +1145,7 @@ int ObRootInspection::calc_diff_names(const uint64_t tenant_id,
return ret;
}
int ObRootInspection::check_sys_table_schemas()
int ObRootInspection::check_sys_table_schemas_()
{
int ret = OB_SUCCESS;
ObArray<uint64_t> tenant_ids;
@ -1337,7 +1292,7 @@ int ObRootInspection::check_table_schema(const ObTableSchema &hard_code_table,
|| !inner_table.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table_schema", K(hard_code_table), K(inner_table), K(ret));
} else if (OB_FAIL(check_table_options(inner_table, hard_code_table))) {
} else if (OB_FAIL(check_table_options_(inner_table, hard_code_table))) {
LOG_WARN("check_table_options failed", "table_id", hard_code_table.get_table_id(), K(ret));
} else {
if (hard_code_table.get_column_count() != inner_table.get_column_count()) {
@ -1361,7 +1316,7 @@ int ObRootInspection::check_table_schema(const ObTableSchema &hard_code_table,
hard_code_column->get_column_name(), K(ret));
} else {
const bool ignore_column_id = is_virtual_table(hard_code_table.get_table_id());
if (OB_FAIL(check_column_schema(hard_code_table.get_table_name(),
if (OB_FAIL(check_column_schema_(hard_code_table.get_table_name(),
*column, *hard_code_column, ignore_column_id))) {
LOG_WARN("column schema mismatch with hard code column schema",
"table_name",inner_table.get_table_name(), "column", *column,
@ -1377,6 +1332,110 @@ int ObRootInspection::check_table_schema(const ObTableSchema &hard_code_table,
return ret;
}
int ObRootInspection::check_and_get_system_table_column_diff(
const share::schema::ObTableSchema &table_schema,
const share::schema::ObTableSchema &hard_code_schema,
common::ObIArray<uint64_t> &add_column_ids,
common::ObIArray<uint64_t> &alter_column_ids)
{
int ret = OB_SUCCESS;
add_column_ids.reset();
alter_column_ids.reset();
if (!table_schema.is_valid() || !hard_code_schema.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table_schema", KR(ret), K(table_schema), K(hard_code_schema));
} else if (table_schema.get_tenant_id() != hard_code_schema.get_tenant_id()
|| table_schema.get_table_id() != hard_code_schema.get_table_id()
|| 0 != table_schema.get_table_name_str().compare(hard_code_schema.get_table_name_str())
|| !is_system_table(table_schema.get_table_id())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table_schema", KR(ret),
"tenant_id", table_schema.get_tenant_id(),
"table_id", table_schema.get_table_id(),
"table_name", table_schema.get_table_name(),
"hard_code_tenant_id", hard_code_schema.get_tenant_id(),
"hard_code_table_id", hard_code_schema.get_table_id(),
"hard_code_table_name", hard_code_schema.get_table_name());
} else {
const uint64_t tenant_id = table_schema.get_tenant_id();
const uint64_t table_id = table_schema.get_table_id();
const ObColumnSchemaV2 *column = NULL;
const ObColumnSchemaV2 *hard_code_column = NULL;
ObColumnSchemaV2 tmp_column; // check_column_can_be_altered_online() may change dst_column, is ugly.
bool ignore_column_id = false;
// case 1. check if columns should be dropped.
// case 2. check if column can be altered online.
for (int64_t i = 0; OB_SUCC(ret) && i < table_schema.get_column_count(); i++) {
column = table_schema.get_column_schema_by_idx(i);
if (OB_ISNULL(column)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("column schema is null", KR(ret), K(tenant_id), K(table_id), K(i));
} else if (OB_ISNULL(hard_code_column = hard_code_schema.get_column_schema(column->get_column_id()))) {
ret = OB_NOT_SUPPORTED; // case 1
LOG_WARN("can't drop system table's column", KR(ret),
K(tenant_id), K(table_id),
"table_name", table_schema.get_table_name(),
"column_id", column->get_column_id(),
"column_name", column->get_column_name());
} else {
// case 2
int tmp_ret = check_column_schema_(table_schema.get_table_name_str(),
*column,
*hard_code_column,
ignore_column_id);
if (OB_SUCCESS == tmp_ret) {
// not changed
} else if (OB_SCHEMA_ERROR != tmp_ret) {
ret = tmp_ret;
LOG_WARN("fail to check column schema", KR(ret),
K(tenant_id), K(table_id), KPC(column), KPC(hard_code_column));
} else if (OB_FAIL(tmp_column.assign(*hard_code_column))) {
LOG_WARN("fail to assign hard code column schema", KR(ret),
K(tenant_id), K(table_id), "column_id", hard_code_column->get_column_id());
} else if (OB_FAIL(table_schema.check_column_can_be_altered_online(column, &tmp_column))) {
LOG_WARN("fail to check alter column online", KR(ret),
K(tenant_id), K(table_id),
"table_name", table_schema.get_table_name(),
"column_id", column->get_column_id(),
"column_name", column->get_column_name());
} else if (OB_FAIL(alter_column_ids.push_back(column->get_column_id()))) {
LOG_WARN("fail to push back column_id", KR(ret), K(tenant_id), K(table_id),
"column_id", column->get_column_id());
}
}
} // end for
// case 3: check if columns should be added.
for (int64_t i = 0; OB_SUCC(ret) && i < hard_code_schema.get_column_count(); i++) {
hard_code_column = hard_code_schema.get_column_schema_by_idx(i);
if (OB_ISNULL(hard_code_column)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("column schema is null", KR(ret), K(tenant_id), K(table_id), K(i));
} else if (OB_NOT_NULL(column = table_schema.get_column_schema(hard_code_column->get_column_id()))) {
// column exist, just skip
} else {
const uint64_t hard_code_column_id = hard_code_column->get_column_id();
const ObColumnSchemaV2 *last_column = NULL;
if (table_schema.get_column_count() <= 0
|| OB_ISNULL(last_column = table_schema.get_column_schema_by_idx(
table_schema.get_column_count() - 1))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid column count or column", KR(ret), K(table_schema));
} else if (table_schema.get_max_used_column_id() >= hard_code_column_id
|| last_column->get_column_id() >= hard_code_column_id) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("column should be added at last", KR(ret), KPC(hard_code_column), K(table_schema));
} else if (OB_FAIL(add_column_ids.push_back(hard_code_column_id))) {
LOG_WARN("fail to push back column_id", KR(ret), K(tenant_id), K(table_id),
"column_id", hard_code_column_id);
}
}
} // end for
}
return ret;
}
bool ObRootInspection::check_str_with_lower_case_(const ObString &str)
{
bool bret = false;
@ -1462,8 +1521,8 @@ int ObRootInspection::check_sys_view_(
return ret;
}
int ObRootInspection::check_table_options(const ObTableSchema &table,
const ObTableSchema &hard_code_table)
int ObRootInspection::check_table_options_(const ObTableSchema &table,
const ObTableSchema &hard_code_table)
{
int ret = OB_SUCCESS;
if (!table.is_valid() || !hard_code_table.is_valid()) {
@ -1615,23 +1674,23 @@ int ObRootInspection::check_table_options(const ObTableSchema &table,
return ret;
}
int ObRootInspection::check_column_schema(const ObString &table_name,
const ObColumnSchemaV2 &column,
const ObColumnSchemaV2 &hard_code_column,
const bool ignore_column_id)
int ObRootInspection::check_column_schema_(const ObString &table_name,
const ObColumnSchemaV2 &column,
const ObColumnSchemaV2 &hard_code_column,
const bool ignore_column_id)
{
int ret = OB_SUCCESS;
if (table_name.empty() || !column.is_valid() || !hard_code_column.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("table_name is empty or invalid column or invalid hard_code_column",
K(table_name), K(column), K(hard_code_column), K(ret));
KR(ret), K(table_name), K(column), K(hard_code_column));
} else {
#define CMP_COLUMN_ATTR(attr) \
if (OB_SUCC(ret)) { \
if (column.get_##attr() != hard_code_column.get_##attr()) { \
ret = OB_SCHEMA_ERROR; \
LOG_WARN(#attr " mismatch", K(table_name), "column_name", column.get_column_name(), \
"in_memory", column.get_##attr(), "hard_code", hard_code_column.get_##attr(), K(ret)); \
LOG_WARN(#attr " mismatch", KR(ret), K(table_name), "column_name", column.get_column_name(), \
"in_memory", column.get_##attr(), "hard_code", hard_code_column.get_##attr()); \
} \
}
@ -1639,10 +1698,18 @@ int ObRootInspection::check_column_schema(const ObString &table_name,
if (OB_SUCC(ret)) { \
if (column.is_##attr() != hard_code_column.is_##attr()) { \
ret = OB_SCHEMA_ERROR; \
LOG_WARN(#attr " mismatch", K(table_name), "column_name", column.get_column_name(), \
"in_memory", column.is_##attr(), "hard_code", hard_code_column.is_##attr(), K(ret)); \
LOG_WARN(#attr " mismatch", KR(ret), K(table_name), "column_name", column.get_column_name(), \
"in_memory", column.is_##attr(), "hard_code", hard_code_column.is_##attr()); \
} \
}
if (OB_SUCC(ret)) {
if (column.get_column_name_str() != hard_code_column.get_column_name_str()) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("column_name mismatch", KR(ret), K(table_name),
"in_memory", column.get_column_name(),
"hard_code", hard_code_column.get_column_name());
}
}
if (!ignore_column_id) {
CMP_COLUMN_ATTR(column_id);

View File

@ -168,18 +168,26 @@ public:
// return OB_SCHEMA_ERROR for table schema mismatch
virtual int check_table_schema(const uint64_t tenant_id,
const share::schema::ObTableSchema &hard_code_table);
// TODO:(yanmu.ztl) standby cluster won't sync sys table schema any more. To be removed.
static int check_table_schema(const share::schema::ObTableSchema &hard_code_table,
const share::schema::ObTableSchema &inner_table);
// TODO:(yanmu.ztl) standby cluster wont't sync sys table schema any more. Should be private.
int check_sys_table_schemas();
// For system tables, check and get column schemas' difference
// between table schema in memory and hard code table schema.
// 1. Drop column: Not supported.
// 2. Add column: Can only add columns at last.
// 3. Alter column: Can only alter columns online.
static int check_and_get_system_table_column_diff(
const share::schema::ObTableSchema &table_schema,
const share::schema::ObTableSchema &hard_code_schema,
common::ObIArray<uint64_t> &add_column_ids,
common::ObIArray<uint64_t> &alter_column_ids);
private:
static const int64_t NAME_BUF_LEN = 64;
typedef common::ObFixedLengthString<NAME_BUF_LEN> Name;
int check_zone();
int check_sys_stat();
int check_sys_param();
int check_sys_table_schemas_(const uint64_t tenant_id);
template<typename Item>
int get_names(const common::ObDList<Item> &list, common::ObIArray<const char*> &names);
@ -199,12 +207,15 @@ private:
common::ObIArray<Name> &fetch_names, /* data from inner table*/
common::ObIArray<Name> &extra_names, /* inner table more than hard code*/
common::ObIArray<Name> &miss_names /* inner table less than hard code*/);
static int check_table_options(const share::schema::ObTableSchema &table,
const share::schema::ObTableSchema &hard_code_table);
static int check_column_schema(const common::ObString &table_name,
const share::schema::ObColumnSchemaV2 &column,
const share::schema::ObColumnSchemaV2 &hard_code_column,
const bool ignore_column_id);
int check_sys_table_schemas_();
int check_sys_table_schemas_(const uint64_t tenant_id);
static int check_table_options_(const share::schema::ObTableSchema &table,
const share::schema::ObTableSchema &hard_code_table);
static int check_column_schema_(const common::ObString &table_name,
const share::schema::ObColumnSchemaV2 &column,
const share::schema::ObColumnSchemaV2 &hard_code_column,
const bool ignore_column_id);
bool check_str_with_lower_case_(const ObString &str);
int check_sys_view_(const uint64_t tenant_id,

View File

@ -1431,20 +1431,22 @@ int ObRootService::submit_offline_server_task(const common::ObAddr &server)
return ret;
}
int ObRootService::submit_upgrade_task(const int64_t version)
int ObRootService::submit_upgrade_task(
const obrpc::ObUpgradeJobArg::Action action,
const int64_t version)
{
int ret = OB_SUCCESS;
ObUpgradeTask task(upgrade_executor_, version);
ObUpgradeTask task(upgrade_executor_, action, version);
task.set_retry_times(0); //not repeat
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_FAIL(upgrade_executor_.can_execute())) {
LOG_WARN("can't run task now", KR(ret), K(version));
LOG_WARN("can't run task now", KR(ret), K(action), K(version));
} else if (OB_FAIL(task_queue_.add_async_task(task))) {
LOG_WARN("submit upgrade task fail", KR(ret), K(version));
LOG_WARN("submit upgrade task fail", KR(ret), K(action), K(version));
} else {
LOG_INFO("submit upgrade task success", KR(ret), K(version));
LOG_INFO("submit upgrade task success", KR(ret), K(action), K(version));
}
return ret;
}
@ -7955,13 +7957,15 @@ int ObRootService::run_upgrade_job(const obrpc::ObUpgradeJobArg &arg)
} else if (!arg.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(arg), KR(ret));
} else if (version < CLUSTER_VERSION_2270
|| !ObUpgradeChecker::check_cluster_version_exist(version)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "run upgrade job with such version is");
} else if (ObUpgradeJobArg::RUN_UPGRADE_JOB == arg.action_) {
if (OB_FAIL(submit_upgrade_task(arg.version_))) {
} else if (ObUpgradeJobArg::UPGRADE_POST_ACTION == arg.action_
|| ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE == arg.action_
|| ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE == arg.action_) {
if (ObUpgradeJobArg::UPGRADE_POST_ACTION == arg.action_
&& !ObUpgradeChecker::check_cluster_version_exist(version)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "run upgrade job with such version is");
} else if (OB_FAIL(submit_upgrade_task(arg.action_, version))) {
LOG_WARN("fail to submit upgrade task", KR(ret), K(arg));
}
} else if (ObUpgradeJobArg::STOP_UPGRADE_JOB == arg.action_) {
@ -7978,6 +7982,18 @@ int ObRootService::run_upgrade_job(const obrpc::ObUpgradeJobArg &arg)
return ret;
}
int ObRootService::upgrade_table_schema(const obrpc::ObUpgradeTableSchemaArg &arg)
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_FAIL(ddl_service_.upgrade_table_schema(arg))) {
LOG_WARN("fail to upgrade table schema", KR(ret), K(arg));
}
return ret;
}
int ObRootService::merge_finish(const obrpc::ObMergeFinishArg &arg)
{
int ret = OB_SUCCESS;

View File

@ -671,6 +671,7 @@ public:
int admin_upgrade_virtual_schema();
int run_job(const obrpc::ObRunJobArg &arg);
int run_upgrade_job(const obrpc::ObUpgradeJobArg &arg);
int upgrade_table_schema(const obrpc::ObUpgradeTableSchemaArg &arg);
int admin_flush_cache(const obrpc::ObAdminFlushCacheArg &arg);
int admin_upgrade_cmd(const obrpc::Bool &arg);
int admin_rolling_upgrade_cmd(const obrpc::ObAdminRollingUpgradeArg &arg);
@ -696,7 +697,7 @@ public:
int report_single_replica(const int64_t tenant_id, const share::ObLSID &ls_id);
// @see RsListChangeCb
int submit_update_rslist_task(const bool force_update = false);
int submit_upgrade_task(const int64_t version);
int submit_upgrade_task(const obrpc::ObUpgradeJobArg::Action action, const int64_t version);
int submit_upgrade_storage_format_version_task();
int submit_create_inner_schema_task();
int submit_async_minor_freeze_task(const obrpc::ObRootMinorFreezeArg &arg);

View File

@ -63,7 +63,9 @@ static const char* job_type_str_array[JOB_TYPE_MAX] = {
"UPGRADE_STORAGE_FORMAT_VERSION",
"STOP_UPGRADE_STORAGE_FORMAT_VERSION",
"CREATE_INNER_SCHEMA",
"RUN_UPGRADE_POST_JOB"
"UPGRADE_POST_ACTION",
"UPGRADE_SYSTEM_VARIABLE",
"UPGRADE_SYSTEM_TABLE"
};
const char* ObRsJobTableOperator::get_job_type_str(ObRsJobType job_type)

View File

@ -82,7 +82,9 @@ enum ObRsJobType
JOB_TYPE_UPGRADE_STORAGE_FORMAT_VERSION,
JOB_TYPE_STOP_UPGRADE_STORAGE_FORMAT_VERSION,
JOB_TYPE_CREATE_INNER_SCHEMA,
JOB_TYPE_RUN_UPGRADE_POST_JOB,
JOB_TYPE_UPGRADE_POST_ACTION,
JOB_TYPE_UPGRADE_SYSTEM_VARIABLE,
JOB_TYPE_UPGRADE_SYSTEM_TABLE,
JOB_TYPE_MAX
};

View File

@ -35,6 +35,7 @@ bool is_allow_when_disable_ddl(const obrpc::ObRpcPacketCode pcode, const obrpc::
if (OB_ISNULL(ddl_arg)) {
} else if (obrpc::OB_COMMIT_ALTER_TENANT_LOCALITY == pcode
|| obrpc::OB_SCHEMA_REVISE == pcode // for upgrade
|| obrpc::OB_UPGRADE_TABLE_SCHEMA == pcode
|| ((obrpc::OB_MODIFY_TENANT == pcode
|| obrpc::OB_MODIFY_SYSVAR == pcode
|| obrpc::OB_DO_KEYSTORE_DDL == pcode
@ -46,21 +47,6 @@ bool is_allow_when_disable_ddl(const obrpc::ObRpcPacketCode pcode, const obrpc::
return bret;
}
// precondition: enable_ddl = false
bool is_allow_when_upgrade(const obrpc::ObRpcPacketCode pcode, const obrpc::ObDDLArg *ddl_arg)
{
bool bret = false;
UNUSED(pcode);
if (obrpc::OB_UPGRADE_STAGE_DBUPGRADE != GCTX.get_upgrade_stage()) {
bret = true;
} else if (OB_ISNULL(ddl_arg)) {
bret = false;
} else {
bret = ddl_arg->is_allow_when_upgrade();
}
return bret;
}
bool is_allow_when_create_tenant(const obrpc::ObRpcPacketCode pcode)
{
bool bret = false;
@ -126,8 +112,7 @@ protected:
RS_LOG(WARN, "RS major freeze not finished, can not process ddl request",
K(ret), K(pcode));
} else if (is_ddl_like_
&& ((!GCONF.enable_ddl && !is_allow_when_disable_ddl(pcode, ddl_arg_))
|| (GCONF.enable_ddl && !is_allow_when_upgrade(pcode, ddl_arg_)))) {
&& (!GCONF.enable_ddl && !is_allow_when_disable_ddl(pcode, ddl_arg_))) {
ret = OB_OP_NOT_ALLOW;
RS_LOG(WARN, "ddl operation not allow, can not process this request", K(ret), K(pcode));
} else {
@ -476,6 +461,7 @@ DEFINE_RS_RPC_PROCESSOR(obrpc::OB_BROADCAST_SCHEMA, ObBroadcastSchemaP, broadcas
// only for upgrade
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_CHECK_MERGE_FINISH, ObCheckMergeFinishP, check_merge_finish(arg_));
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_GET_RECYCLE_SCHEMA_VERSIONS, ObGetRecycleSchemaVersionsP, get_recycle_schema_versions(arg_, result_));
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_UPGRADE_TABLE_SCHEMA, ObRpcUpgradeTableSchemaP, upgrade_table_schema(arg_));
//label security ddl
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_HANDLE_LABEL_SE_POLICY_DDL, ObRpcHandleLabelSePolicyDDLP, handle_label_se_policy_ddl(arg_));
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_HANDLE_LABEL_SE_COMPONENT_DDL, ObRpcHandleLabelSeComponentDDLP, handle_label_se_component_ddl(arg_));

View File

@ -1307,34 +1307,49 @@ int ObAdminUpgradeVirtualSchema::upgrade_(
} else if (OB_ISNULL(ctx_.ddl_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddl service is null", KR(ret));
} else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
tenant_id, schema_guard))) {
}
// 1. check table name duplicated
if (FAILEDx(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
tenant_id, schema_guard))) {
LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(table.get_tenant_id(),
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id,
table.get_database_id(),
table.get_table_name(),
table.is_index_table(),
exist_schema))) {
// check if the table_name is occupied by others
LOG_WARN("get table schema failed", KR(ret), K(tenant_id), "table", table.get_table_name());
if (OB_TABLE_NOT_EXIST == ret) {
ret = OB_SUCCESS;
}
} else if (OB_NOT_NULL(exist_schema)) {
// name modification except virtual table should first delete the old table,
// then create the new one to make virtual table upgrade valid
if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) {
LOG_WARN("get table schema failed", KR(ret), K(tenant_id),
"table", table.get_table_name(), "table_id", table.get_table_id());
} else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
tenant_id, schema_guard))) {
LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
}
} else if (OB_ISNULL(exist_schema)) {
// no duplicate table name
} else if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) {
LOG_WARN("get table schema failed", KR(ret), K(tenant_id),
"table", table.get_table_name(), "table_id", table.get_table_id());
} else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
tenant_id, schema_guard))) {
LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
}
// rebuild the inner table
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ctx_.ddl_service_->add_table_schema(table, schema_guard))) {
// 2. try drop table first
exist_schema = NULL;
if (FAILEDx(schema_guard.get_table_schema(tenant_id,
table.get_table_id(),
exist_schema))) {
LOG_WARN("get table schema failed", KR(ret), "table", table.get_table_name(),
"table_id", table.get_table_id());
if (OB_TABLE_NOT_EXIST == ret) {
ret = OB_SUCCESS;
}
} else if (OB_ISNULL(exist_schema)) {
// missed table
} else if (OB_FAIL(ctx_.ddl_service_->drop_inner_table(*exist_schema))) {
LOG_WARN("drop table schema failed", KR(ret), "table_schema", *exist_schema);
} else if (OB_FAIL(ctx_.ddl_service_->get_tenant_schema_guard_with_version_in_inner_table(
tenant_id, schema_guard))) {
LOG_WARN("get schema guard in inner table failed", KR(ret), K(tenant_id));
}
// 3. create table
if (FAILEDx(ctx_.ddl_service_->add_table_schema(table, schema_guard))) {
LOG_WARN("add table schema failed", KR(ret), K(tenant_id), K(table));
} else if (OB_FAIL(ctx_.ddl_service_->refresh_schema(tenant_id))) {
LOG_WARN("refresh schema failed", KR(ret), K(tenant_id));

View File

@ -15,6 +15,7 @@
#include "rootserver/ob_upgrade_executor.h"
#include "rootserver/ob_rs_job_table_operator.h"
#include "observer/ob_server_struct.h"
#include "rootserver/ob_root_inspection.h"
namespace oceanbase
{
@ -43,7 +44,7 @@ ObAsyncTask *ObUpgradeTask::deep_copy(char *buf, const int64_t buf_size) const
ret = OB_INVALID_ARGUMENT;
LOG_WARN("buf is not long enough", K(need_size), K(buf_size), KR(ret));
} else {
task = new(buf) ObUpgradeTask(*upgrade_executor_, version_);
task = new(buf) ObUpgradeTask(*upgrade_executor_, action_, version_);
}
return task;
}
@ -51,22 +52,24 @@ ObAsyncTask *ObUpgradeTask::deep_copy(char *buf, const int64_t buf_size) const
int ObUpgradeTask::process()
{
const int64_t start = ObTimeUtility::current_time();
LOG_INFO("[UPGRADE] start to do execute upgrade task", K(start), K_(version));
FLOG_INFO("[UPGRADE] start to do execute upgrade task",
K(start), K_(action), K_(version));
int ret = OB_SUCCESS;
if (OB_ISNULL(upgrade_executor_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("upgrade_executor_ is null", KR(ret));
} else if (OB_FAIL(upgrade_executor_->execute(version_))) {
LOG_WARN("fail to execute upgrade task", KR(ret));
} else if (OB_FAIL(upgrade_executor_->execute(action_, version_))) {
LOG_WARN("fail to execute upgrade task", KR(ret), K_(action), K_(version));
}
LOG_INFO("[UPGRADE] finish execute upgrade task",
KR(ret), K_(version), "cost_us", ObTimeUtility::current_time() - start);
FLOG_INFO("[UPGRADE] finish execute upgrade task",
KR(ret), K_(action), K_(version),
"cost_us", ObTimeUtility::current_time() - start);
return ret;
}
ObUpgradeExecutor::ObUpgradeExecutor()
: inited_(false), stopped_(false), execute_(false), rwlock_(),
sql_proxy_(NULL), rpc_proxy_(NULL), schema_service_(NULL),
sql_proxy_(NULL), rpc_proxy_(NULL), common_rpc_proxy_(NULL), schema_service_(NULL),
upgrade_processors_()
{}
@ -88,6 +91,7 @@ int ObUpgradeExecutor::init(
schema_service_ = &schema_service;
sql_proxy_ = &sql_proxy;
rpc_proxy_ = &rpc_proxy;
common_rpc_proxy_ = &common_proxy;
stopped_ = false;
execute_ = false;
inited_ = true;
@ -128,9 +132,8 @@ int ObUpgradeExecutor::check_stop() const
{
int ret = OB_SUCCESS;
SpinRLockGuard guard(rwlock_);
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (stopped_) {
ret = OB_CANCELED;
LOG_WARN("executor should stopped", KR(ret));
@ -145,13 +148,12 @@ bool ObUpgradeExecutor::check_execute() const
return bret;
}
int ObUpgradeExecutor::set_execute_mark()
int ObUpgradeExecutor::set_execute_mark_()
{
int ret = OB_SUCCESS;
SpinWLockGuard guard(rwlock_);
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (stopped_ || execute_) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("can't run job at the same time", KR(ret));
@ -165,9 +167,8 @@ int ObUpgradeExecutor::can_execute()
{
int ret = OB_SUCCESS;
SpinWLockGuard guard(rwlock_);
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (stopped_ || execute_) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("status not matched", KR(ret),
@ -177,17 +178,33 @@ int ObUpgradeExecutor::can_execute()
return ret;
}
int ObUpgradeExecutor::check_inner_stat_() const
{
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", KR(ret));
} else if (OB_ISNULL(schema_service_)
|| OB_ISNULL(sql_proxy_)
|| OB_ISNULL(rpc_proxy_)
|| OB_ISNULL(common_rpc_proxy_)){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ptr is null", KR(ret), KP_(schema_service),
KP_(sql_proxy), KP_(rpc_proxy), KP_(common_rpc_proxy));
}
return ret;
}
// wait schema sync in cluster
int ObUpgradeExecutor::check_schema_sync()
int ObUpgradeExecutor::check_schema_sync_()
{
const int64_t start = ObTimeUtility::current_time();
LOG_INFO("[UPGRADE] start to check schema sync", K(start));
int ret = OB_SUCCESS;
bool enable_ddl = GCONF.enable_ddl;
bool enable_sys_table_ddl = GCONF.enable_sys_table_ddl;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", KR(ret));
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (enable_ddl || enable_sys_table_ddl) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("ddl should disable now", KR(ret), K(enable_ddl), K(enable_sys_table_ddl));
@ -213,7 +230,7 @@ int ObUpgradeExecutor::check_schema_sync()
}
// Ensure primary cluster's schema_version is not greator than standby clusters'.
int ObUpgradeExecutor::check_schema_sync(
int ObUpgradeExecutor::check_schema_sync_(
obrpc::ObTenantSchemaVersions &primary_schema_versions,
obrpc::ObTenantSchemaVersions &standby_schema_versions,
bool &schema_sync)
@ -255,61 +272,31 @@ int ObUpgradeExecutor::check_schema_sync(
return ret;
}
int ObUpgradeExecutor::get_tenant_ids(common::ObIArray<uint64_t> &tenant_ids)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard guard;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (OB_ISNULL(schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_service is null", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
OB_SYS_TENANT_ID, guard))) {
LOG_WARN("fail to get schema guard", KR(ret));
} else if (OB_FAIL(guard.get_tenant_ids(tenant_ids))) {
LOG_WARN("fail to get tenant_ids", KR(ret));
}
return ret;
}
int ObUpgradeExecutor::execute(const int64_t version)
//TODO:
//1. Run upgrade job by tenant.
//2. Check tenant role/tenant status before run upgrade job.
int ObUpgradeExecutor::execute(
const obrpc::ObUpgradeJobArg::Action action,
const int64_t version)
{
ObCurTraceId::init(GCONF.self_addr_);
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_FAIL(set_execute_mark())) {
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(set_execute_mark_())) {
LOG_WARN("fail to set execute mark", KR(ret));
} else {
int64_t job_id = OB_INVALID_ID;
ObRsJobType job_type = ObRsJobType::JOB_TYPE_RUN_UPGRADE_POST_JOB;
char version_str[common::OB_CLUSTER_VERSION_LENGTH] = {0};
int64_t len = ObClusterVersion::print_version_str(
version_str, common::OB_CLUSTER_VERSION_LENGTH, version);
if (OB_ISNULL(sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql_proxy_ is null", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (OB_FAIL(RS_JOB_CREATE_WITH_RET(job_id, job_type, *sql_proxy_,
"extra_info", ObString(len, version_str)))) {
LOG_WARN("fail to create rs job", KR(ret));
} else if (job_id <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("job_id is invalid", KR(ret), K(job_id));
} else if (OB_FAIL(run_upgrade_job(version))) {
LOG_WARN("fail to run upgrade job", KR(ret), K(job_id), K(version));
}
int tmp_ret = OB_SUCCESS;
if (job_id > 0) {
if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) {
LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id));
ret = OB_FAIL(ret) ? ret : tmp_ret;
if (obrpc::ObUpgradeJobArg::UPGRADE_POST_ACTION == action) {
if (OB_FAIL(run_upgrade_post_job_(version))) {
LOG_WARN("fail to run upgrade post job", KR(ret), K(version));
}
} else if (obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE == action) {
if (OB_FAIL(run_upgrade_system_variable_job_())) {
LOG_WARN("fail to run upgrade system variable job", KR(ret));
}
} else if (obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE == action) {
if (OB_FAIL(run_upgrade_system_table_job_())) {
LOG_WARN("fail to run upgrade system table job", KR(ret));
}
}
execute_ = false;
@ -317,49 +304,267 @@ int ObUpgradeExecutor::execute(const int64_t version)
return ret;
}
// Python upgrade script will set enable_ddl = false before it run upgrade job.
int ObUpgradeExecutor::run_upgrade_job(const int64_t version)
// Python upgrade script may set enable_ddl = false before it run upgrade job.
// TODO:
// 1. support run upgrade post action from `COMPATIBLE` to current cluster version.
int ObUpgradeExecutor::run_upgrade_post_job_(const int64_t version)
{
int ret = OB_SUCCESS;
ObArray<uint64_t> tenant_ids;
ObBaseUpgradeProcessor *processor = NULL;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (version < CLUSTER_VERSION_2270
|| !ObUpgradeChecker::check_cluster_version_exist(version)) {
} else if (!ObUpgradeChecker::check_cluster_version_exist(version)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version));
} else if (OB_FAIL(get_tenant_ids(tenant_ids))) {
LOG_WARN("fail to get tenant_ids", KR(ret), K(version));
} else if (OB_FAIL(upgrade_processors_.get_processor_by_version(
version, processor))) {
LOG_WARN("fail to get processor by version", KR(ret), K(version));
} else {
// 1. Run upgrade jobs(by tenant_id desc) in primary cluster.
// 2. Only run sys tenant's upgrade job in standby clusters.
for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
const uint64_t tenant_id = tenant_ids.at(i);
if (OB_SYS_TENANT_ID == tenant_id || !GCTX.is_standby_cluster()) {
ObArray<uint64_t> tenant_ids;
ObBaseUpgradeProcessor *processor = NULL;
int64_t job_id = OB_INVALID_ID;
ObRsJobType job_type = ObRsJobType::JOB_TYPE_UPGRADE_POST_ACTION;
char version_str[common::OB_CLUSTER_VERSION_LENGTH] = {0};
int64_t len = ObClusterVersion::print_version_str(
version_str, common::OB_CLUSTER_VERSION_LENGTH, version);
int tmp_ret = OB_SUCCESS;
int64_t backup_ret = OB_SUCCESS;
if (OB_FAIL(RS_JOB_CREATE_WITH_RET(job_id, job_type, *sql_proxy_,
"tenant_id", 0,
"extra_info", ObString(len, version_str)))) {
LOG_WARN("fail to create rs job", KR(ret));
} else if (job_id <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("job_id is invalid", KR(ret), K(job_id));
} else if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) {
LOG_WARN("fail to get tenant_ids", KR(ret), K(version));
} else if (OB_FAIL(upgrade_processors_.get_processor_by_version(
version, processor))) {
LOG_WARN("fail to get processor by version", KR(ret), K(version));
} else {
for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
const uint64_t tenant_id = tenant_ids.at(i);
int64_t start_ts = ObTimeUtility::current_time();
int64_t current_version = processor->get_version();
processor->set_tenant_id(tenant_id);
LOG_INFO("[UPGRADE] start to run post upgrade job by version",
K(tenant_id), K(current_version));
if (OB_FAIL(processor->post_upgrade())) {
FLOG_INFO("[UPGRADE] start to run post upgrade job by version",
K(tenant_id), K(current_version));
if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (OB_TMP_FAIL(processor->post_upgrade())) {
LOG_WARN("run post upgrade by version failed",
KR(ret), K(tenant_id), K(current_version));
KR(tmp_ret), K(tenant_id), K(current_version));
backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
}
LOG_INFO("[UPGRADE] finish post upgrade job by version",
KR(ret), K(tenant_id), K(current_version),
"cost", ObTimeUtility::current_time() - start_ts);
FLOG_INFO("[UPGRADE] finish post upgrade job by version",
KR(tmp_ret), K(tenant_id), K(current_version),
"cost", ObTimeUtility::current_time() - start_ts);
} // end for
}
ret = OB_SUCC(ret) ? backup_ret : ret;
if (job_id > 0) {
if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) {
LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id));
ret = OB_FAIL(ret) ? ret : tmp_ret;
}
}
}
return ret;
}
int ObUpgradeExecutor::run_upgrade_system_variable_job_()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else {
ObArray<uint64_t> tenant_ids;
int64_t job_id = OB_INVALID_ID;
ObRsJobType job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_VARIABLE;
int tmp_ret = OB_SUCCESS;
int backup_ret = OB_SUCCESS;
if (OB_FAIL(RS_JOB_CREATE_WITH_RET(job_id, job_type, *sql_proxy_, "tenant_id", 0))) {
LOG_WARN("fail to create rs job", KR(ret));
} else if (job_id <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("job_id is invalid", KR(ret), K(job_id));
} else if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) {
LOG_WARN("fail to get tenant_ids", KR(ret));
} else {
for (int64_t i = tenant_ids.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
const uint64_t tenant_id = tenant_ids.at(i);
int64_t start_ts = ObTimeUtility::current_time();
FLOG_INFO("[UPGRADE] start to run upgrade system variable job", K(tenant_id));
if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (OB_TMP_FAIL(ObUpgradeUtils::upgrade_sys_variable(*common_rpc_proxy_, *sql_proxy_, tenant_id))) {
LOG_WARN("fail to upgrade sys variable", KR(tmp_ret), K(tenant_id));
backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
}
FLOG_INFO("[UPGRADE] finish run upgrade system variable job",
KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
} // end for
ret = OB_SUCC(ret) ? backup_ret : ret;
}
if (job_id > 0) {
if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) {
LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id));
ret = OB_FAIL(ret) ? ret : tmp_ret;
}
}
}
return ret;
}
// NOTICE: enable_sys_table_ddl should be true before run this job.
int ObUpgradeExecutor::run_upgrade_system_table_job_()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else {
ObArray<uint64_t> tenant_ids;
int64_t job_id = OB_INVALID_ID;
ObRsJobType job_type = ObRsJobType::JOB_TYPE_UPGRADE_SYSTEM_TABLE;
int tmp_ret = OB_SUCCESS;
int backup_ret = OB_SUCCESS;
if (OB_FAIL(RS_JOB_CREATE_WITH_RET(job_id, job_type, *sql_proxy_, "tenant_id", 0))) {
LOG_WARN("fail to create rs job", KR(ret));
} else if (job_id <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("job_id is invalid", KR(ret), K(job_id));
} else if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) {
LOG_WARN("fail to get tenant_ids", KR(ret));
} else {
for (int64_t i = tenant_ids.count() - 1; i >= 0; i--) {
const uint64_t tenant_id = tenant_ids.at(i);
int64_t start_ts = ObTimeUtility::current_time();
FLOG_INFO("[UPGRADE] start to run upgrade system table job", K(tenant_id));
if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (OB_TMP_FAIL(upgrade_system_table_(tenant_id))) {
LOG_WARN("fail to upgrade system table", KR(tmp_ret), K(tenant_id));
backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
}
FLOG_INFO("[UPGRADE] finish run upgrade system table job",
KR(tmp_ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
} // end for
ret = OB_SUCC(ret) ? backup_ret : ret;
}
if (job_id > 0) {
if (OB_SUCCESS != (tmp_ret = RS_JOB_COMPLETE(job_id, ret, *sql_proxy_))) {
LOG_ERROR("fail to complete job", K(tmp_ret), KR(ret), K(job_id));
ret = OB_FAIL(ret) ? ret : tmp_ret;
}
}
}
return ret;
}
int ObUpgradeExecutor::upgrade_system_table_(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else {
ObArray<uint64_t> upgrade_table_ids; // miss or mismatch
// Only core/system tables can be upgraded here.
// 1. __all_core_table can't be altered.
// 2. sys index table and sys lob table will be added with sys data table, and can't be altered.
const schema_create_func *creator_ptr_array[] = {
share::core_table_schema_creators,
share::sys_table_schema_creators, NULL };
// check system table
ObTableSchema table_schema;
bool exist = false;
for (const schema_create_func **creator_ptr_ptr = creator_ptr_array;
OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr_ptr); ++creator_ptr_ptr) {
for (const schema_create_func *creator_ptr = *creator_ptr_ptr;
OB_SUCC(ret) && OB_NOT_NULL(*creator_ptr); ++creator_ptr) {
table_schema.reset();
if (OB_FAIL(check_stop())) {
LOG_WARN("check_cancel failed", KR(ret));
} else if (OB_FAIL((*creator_ptr)(table_schema))) {
LOG_WARN("create table schema failed", KR(ret));
} else if (!is_sys_tenant(tenant_id)
&& OB_FAIL(ObSchemaUtils::construct_tenant_space_full_table(
tenant_id, table_schema))) {
LOG_WARN("fail to construct tenant space table", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObSysTableChecker::is_inner_table_exist(
tenant_id, table_schema, exist))) {
LOG_WARN("fail to check inner table exist",
KR(ret), K(tenant_id), K(table_schema));
} else if (!exist) {
// skip
} else if (OB_FAIL(check_table_schema_(tenant_id, table_schema))) {
const uint64_t table_id = table_schema.get_table_id();
if (OB_SCHEMA_ERROR != ret) {
LOG_WARN("check_table_schema failed", KR(ret), K(tenant_id), K(table_id));
} else {
FLOG_INFO("[UPGRADE] table need upgrade", K(tenant_id), K(table_id),
"table_name", table_schema.get_table_name());
if (OB_FAIL(upgrade_table_ids.push_back(table_id))) { // overwrite ret
LOG_WARN("fail to push back upgrade table ids", KR(ret), K(tenant_id), K(table_id));
}
}
}
} // end for
} // end for
int tmp_ret = OB_SUCCESS;
int backup_ret = OB_SUCCESS;
// upgrade system table(create or alter)
obrpc::ObUpgradeTableSchemaArg arg;
const int64_t timeout = GCONF.internal_sql_execute_timeout;
for (int64_t i = 0; OB_SUCC(ret) && i < upgrade_table_ids.count(); i++) {
const uint64_t table_id = upgrade_table_ids.at(i);
int64_t start_ts = ObTimeUtility::current_time();
FLOG_INFO("[UPGRADE] start upgrade system table", K(tenant_id), K(table_id));
if (OB_FAIL(check_stop())) {
LOG_WARN("check_cancel failed", KR(ret));
} else if (OB_FAIL(arg.init(tenant_id, table_id))) {
LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(table_id));
} else if (OB_TMP_FAIL(common_rpc_proxy_->timeout(timeout).upgrade_table_schema(arg))) {
LOG_WARN("fail to uggrade table schema", KR(tmp_ret), K(timeout), K(arg));
backup_ret = OB_SUCCESS == backup_ret ? tmp_ret : backup_ret;
}
FLOG_INFO("[UPGRADE] finish upgrade system table",
KR(tmp_ret), K(tenant_id), K(table_id), "cost", ObTimeUtility::current_time() - start_ts);
} // end for
ret = OB_SUCC(ret) ? backup_ret : ret;
}
return ret;
}
int ObUpgradeExecutor::check_table_schema_(const uint64_t tenant_id, const ObTableSchema &hard_code_table)
{
int ret = OB_SUCCESS;
const ObTableSchema *table = NULL;
ObSchemaGetterGuard schema_guard;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("failed to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(
tenant_id, hard_code_table.get_table_id(), table))) {
LOG_WARN("get_table_schema failed", KR(ret), K(tenant_id),
"table_id", hard_code_table.get_table_id(),
"table_name", hard_code_table.get_table_name());
} else if (OB_ISNULL(table)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("table should not be null", KR(ret), K(tenant_id),
"table_id", hard_code_table.get_table_id(),
"table_name", hard_code_table.get_table_name());
} else if (OB_FAIL(ObRootInspection::check_table_schema(hard_code_table, *table))) {
LOG_WARN("fail to check table schema", KR(ret), K(tenant_id), K(hard_code_table), KPC(table));
}
return ret;
}
}//end rootserver
}//end oceanbase

View File

@ -18,6 +18,7 @@
#include "share/ob_check_stop_provider.h"
#include "share/schema/ob_multi_version_schema_service.h"
#include "share/schema/ob_schema_getter_guard.h"
#include "share/ob_rpc_struct.h"
namespace oceanbase
{
@ -29,8 +30,9 @@ class ObUpgradeTask: public share::ObAsyncTask
{
public:
explicit ObUpgradeTask(ObUpgradeExecutor &upgrade_executor,
const obrpc::ObUpgradeJobArg::Action action,
const int64_t version)
: upgrade_executor_(&upgrade_executor), version_(version)
: upgrade_executor_(&upgrade_executor), action_(action), version_(version)
{}
virtual ~ObUpgradeTask() {}
virtual int64_t get_deep_copy_size() const;
@ -38,6 +40,7 @@ public:
virtual int process();
private:
ObUpgradeExecutor *upgrade_executor_;
obrpc::ObUpgradeJobArg::Action action_;
int64_t version_;
};
@ -51,7 +54,8 @@ public:
obrpc::ObSrvRpcProxy &rpc_proxy,
obrpc::ObCommonRpcProxy &common_proxy);
int execute(const int64_t version);
int execute(const obrpc::ObUpgradeJobArg::Action action,
const int64_t version);
int can_execute();
int check_stop() const;
bool check_execute() const;
@ -59,16 +63,22 @@ public:
void start();
int stop();
private:
int set_execute_mark();
int check_inner_stat_() const;
int set_execute_mark_();
int check_schema_sync();
int check_schema_sync(
int run_upgrade_post_job_(const int64_t version);
int run_upgrade_system_variable_job_();
int run_upgrade_system_table_job_();
int upgrade_system_table_(const uint64_t tenant_id);
int check_table_schema_(const uint64_t tenant_id,
const share::schema::ObTableSchema &hard_code_table);
int check_schema_sync_();
int check_schema_sync_(
obrpc::ObTenantSchemaVersions &primary_schema_versions,
obrpc::ObTenantSchemaVersions &standby_schema_versions,
bool &schema_sync);
int get_tenant_ids(common::ObIArray<uint64_t> &tenant_ids);
int run_upgrade_job(const int64_t version);
private:
bool inited_;
bool stopped_;
@ -76,6 +86,7 @@ private:
common::SpinRWLock rwlock_;
common::ObMySQLProxy *sql_proxy_;
obrpc::ObSrvRpcProxy *rpc_proxy_;
obrpc::ObCommonRpcProxy *common_rpc_proxy_;
share::schema::ObMultiVersionSchemaService *schema_service_;
share::ObUpgradeProcesserSet upgrade_processors_;
DISALLOW_COPY_AND_ASSIGN(ObUpgradeExecutor);

View File

@ -214,6 +214,7 @@ public:
RPC_S(PRD admin_upgrade_virtual_schema, obrpc::OB_ADMIN_UPGRADE_VIRTUAL_SCHEMA);
RPC_S(PRD run_job, obrpc::OB_RUN_JOB, (ObRunJobArg));
RPC_S(PRD run_upgrade_job, obrpc::OB_RUN_UPGRADE_JOB, (ObUpgradeJobArg));
RPC_S(PRD upgrade_table_schema, obrpc::OB_UPGRADE_TABLE_SCHEMA, (ObUpgradeTableSchemaArg));
RPC_S(PR5 admin_flush_cache, obrpc::OB_ADMIN_FLUSH_CACHE, (ObAdminFlushCacheArg));
RPC_S(PR5 admin_upgrade_cmd, obrpc::OB_ADMIN_UPGRADE_CMD, (Bool));
RPC_S(PR5 admin_rolling_upgrade_cmd, obrpc::OB_ADMIN_ROLLING_UPGRADE_CMD, (ObAdminRollingUpgradeArg));

View File

@ -3819,7 +3819,8 @@ ObUpgradeJobArg::ObUpgradeJobArg()
{}
bool ObUpgradeJobArg::is_valid() const
{
return INVALID_ACTION != action_ && version_ > 0;
return INVALID_ACTION != action_
&& (UPGRADE_POST_ACTION != action_ || version_ > 0);
}
int ObUpgradeJobArg::assign(const ObUpgradeJobArg &other)
{
@ -3830,6 +3831,41 @@ int ObUpgradeJobArg::assign(const ObUpgradeJobArg &other)
}
OB_SERIALIZE_MEMBER(ObUpgradeJobArg, action_, version_);
int ObUpgradeTableSchemaArg::init(
const uint64_t tenant_id,
const uint64_t table_id)
{
int ret = OB_SUCCESS;
ObDDLArg::reset();
exec_tenant_id_ = tenant_id;
tenant_id_ = tenant_id;
table_id_ = table_id;
return ret;
}
bool ObUpgradeTableSchemaArg::is_valid() const
{
return common::OB_INVALID_TENANT_ID != exec_tenant_id_
&& common::OB_INVALID_TENANT_ID != tenant_id_
/*index、lob table will be created with related system table*/
&& is_system_table(table_id_);
}
int ObUpgradeTableSchemaArg::assign(const ObUpgradeTableSchemaArg &other)
{
int ret = OB_SUCCESS;
if (this == &other) {
} else if (OB_FAIL(ObDDLArg::assign(other))) {
LOG_WARN("fail to assign ObDDLArg", KR(ret));
} else {
tenant_id_ = other.tenant_id_;
table_id_ = other.table_id_;
}
return ret;
}
OB_SERIALIZE_MEMBER((ObUpgradeTableSchemaArg, ObDDLArg), tenant_id_, table_id_);
int ObAdminFlushCacheArg::assign(const ObAdminFlushCacheArg &other)
{
int ret = OB_SUCCESS;

View File

@ -4451,8 +4451,10 @@ struct ObUpgradeJobArg
public:
enum Action {
INVALID_ACTION,
RUN_UPGRADE_JOB,
STOP_UPGRADE_JOB
UPGRADE_POST_ACTION,
STOP_UPGRADE_JOB,
UPGRADE_SYSTEM_VARIABLE,
UPGRADE_SYSTEM_TABLE,
};
public:
ObUpgradeJobArg();
@ -4464,6 +4466,26 @@ public:
int64_t version_;
};
struct ObUpgradeTableSchemaArg : public ObDDLArg
{
OB_UNIS_VERSION(1);
public:
ObUpgradeTableSchemaArg()
: ObDDLArg(),
tenant_id_(common::OB_INVALID_TENANT_ID),
table_id_(common::OB_INVALID_ID) {}
~ObUpgradeTableSchemaArg() {}
int init(const uint64_t tenant_id, const uint64_t table_id);
bool is_valid() const;
int assign(const ObUpgradeTableSchemaArg &other);
uint64_t get_tenant_id() const { return tenant_id_; }
uint64_t get_table_id() const { return table_id_; }
TO_STRING_KV(K_(tenant_id), K_(table_id));
private:
uint64_t tenant_id_;
uint64_t table_id_;
};
struct ObAdminMergeArg
{
OB_UNIS_VERSION(1);

View File

@ -10,7 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SHARE
#define USING_LOG_PREFIX RS
#include "lib/string/ob_sql_string.h"
#include "share/ob_rpc_struct.h"
@ -280,17 +280,17 @@ int ObUpgradeUtils::upgrade_sys_variable(
|| OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else if (OB_FAIL(calc_diff_sys_var(sql_client, tenant_id, update_list, add_list))) {
} else if (OB_FAIL(calc_diff_sys_var_(sql_client, tenant_id, update_list, add_list))) {
LOG_WARN("fail to calc diff sys var", KR(ret), K(tenant_id));
} else if (OB_FAIL(update_sys_var(rpc_proxy, tenant_id, update_list))) {
} else if (OB_FAIL(update_sys_var_(rpc_proxy, tenant_id, true, update_list))) {
LOG_WARN("fail to update sys var", KR(ret), K(tenant_id));
} else if (OB_FAIL(add_sys_var(sql_client, tenant_id, add_list))) {
} else if (OB_FAIL(update_sys_var_(rpc_proxy, tenant_id, false, add_list))) {
LOG_WARN("fail to add sys var", KR(ret), K(tenant_id));
}
return ret;
}
int ObUpgradeUtils::calc_diff_sys_var(
int ObUpgradeUtils::calc_diff_sys_var_(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
common::ObArray<int64_t> &update_list,
@ -361,7 +361,7 @@ int ObUpgradeUtils::calc_diff_sys_var(
|| 0 != hard_code_min_val.compare(min_val)
|| 0 != hard_code_max_val.compare(max_val)) {
// sys var to modify
LOG_INFO("sys var diff, need modify", K(tenant_id), K(name),
LOG_INFO("[UPGRADE] sys var diff, need modify", K(tenant_id), K(name),
K(data_type), K(flags), K(min_val), K(max_val), K(info),
K(hard_code_type), K(hard_code_flag), K(hard_code_min_val),
K(hard_code_max_val), K(hard_code_info));
@ -398,7 +398,7 @@ int ObUpgradeUtils::calc_diff_sys_var(
} else if (OB_FAIL(add_list.push_back(i))) {
LOG_WARN("fail to push back var_store_idx", KR(ret), K(tenant_id), K(name));
} else {
LOG_INFO("sys var miss, need add", K(tenant_id), K(name), K(i));
LOG_INFO("[UPGRADE] sys var miss, need add", K(tenant_id), K(name), K(i));
}
}
}
@ -406,15 +406,11 @@ int ObUpgradeUtils::calc_diff_sys_var(
return ret;
}
/*
* This function is used to restore backup data from cluster with lower cluster version.
* For modified system variable schema in physical restore, we compensate by methods below.
* 1. Modify system variable(except value) by DDL in physical restore.
* 2. Observer should run with system variable schema which is not modified yet.
*/
int ObUpgradeUtils::update_sys_var(
// modify & add sys var according by hard code schema
int ObUpgradeUtils::update_sys_var_(
obrpc::ObCommonRpcProxy &rpc_proxy,
const uint64_t tenant_id,
const bool is_update,
common::ObArray<int64_t> &update_list)
{
int ret = OB_SUCCESS;
@ -423,7 +419,9 @@ int ObUpgradeUtils::update_sys_var(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else {
const int64_t timeout = GCONF.internal_sql_execute_timeout;
for (int64_t i = 0; OB_SUCC(ret) && i < update_list.count(); i++) {
int64_t start_ts = ObTimeUtility::current_time();
int64_t var_store_idx = update_list.at(i);
const ObString &name = ObSysVariables::get_name(var_store_idx);
const ObObjType &type = ObSysVariables::get_type(var_store_idx);
@ -436,9 +434,9 @@ int ObUpgradeUtils::update_sys_var(
ObSysParam sys_param;
obrpc::ObAddSysVarArg arg;
arg.exec_tenant_id_ = tenant_id;
arg.if_not_exist_ = true;
arg.if_not_exist_ = true; // not used
arg.sysvar_.set_tenant_id(tenant_id);
arg.update_sys_var_ = true;
arg.update_sys_var_ = is_update;
if (OB_FAIL(sys_param.init(tenant_id, zone, name.ptr(), type,
value.ptr(), min.ptr(), max.ptr(), info.ptr(), flag))) {
LOG_WARN("sys_param init failed", KR(ret), K(tenant_id), K(name),
@ -448,298 +446,16 @@ int ObUpgradeUtils::update_sys_var(
LOG_WARN("sys param is invalid", KR(ret), K(tenant_id), K(sys_param));
} else if (OB_FAIL(ObSchemaUtils::convert_sys_param_to_sysvar_schema(sys_param, arg.sysvar_))) {
LOG_WARN("convert sys param to sysvar schema failed", KR(ret));
} else if (OB_FAIL(rpc_proxy.add_system_variable(arg))) {
LOG_WARN("add system variable failed", KR(ret), K(arg));
/*} else if (OB_FAIL(execute_update_sys_var_sql(sql_client, tenant_id, sys_param))) {
LOG_WARN("fail to execute update sys var sql", KR(ret), K(tenant_id));
} else if (OB_FAIL(execute_update_sys_var_history_sql(sql_client, tenant_id, sys_param))) {
LOG_WARN("fail to execute update sys var history sql", KR(ret), K(tenant_id));*/
} else if (OB_FAIL(rpc_proxy.timeout(timeout).add_system_variable(arg))) {
LOG_WARN("add system variable failed", KR(ret), K(timeout), K(arg));
}
LOG_INFO("[UPGRADE] finish upgrade system variable",
KR(ret), K(tenant_id), K(name), "cost", ObTimeUtility::current_time() - start_ts);
}
}
return ret;
}
int ObUpgradeUtils::execute_update_sys_var_sql(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObSysParam &sys_param)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == tenant_id
|| OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else {
int64_t affected_rows = 0;
ObDMLSqlSplicer dml;
ObDMLExecHelper exec(sql_client, tenant_id);
if (OB_FAIL(gen_basic_sys_variable_dml(tenant_id, sys_param, dml))) {
LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param));
} else if (OB_FAIL(exec.exec_update(OB_ALL_SYS_VARIABLE_TNAME, dml, affected_rows))) {
LOG_WARN("execute insert failed", KR(ret));
} else if (!is_zero_row(affected_rows) && !is_single_row(affected_rows)) {
LOG_WARN("invalid affected_rows", KR(ret), K(tenant_id), K(affected_rows));
} else {
LOG_INFO("[UPGRADE] modify sys var", KR(ret), K(tenant_id), K(sys_param));
}
}
return ret;
}
int ObUpgradeUtils::execute_update_sys_var_history_sql(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObSysParam &sys_param)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == tenant_id
|| OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else {
int64_t schema_version = OB_INVALID_VERSION;
{
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
ObSqlString sql;
if (OB_FAIL(sql.append_fmt(
"select schema_version from %s where tenant_id = %lu"
" and zone = '' and name = '%s' order by schema_version desc limit 1",
OB_ALL_SYS_VARIABLE_HISTORY_TNAME,
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
sys_param.name_))) {
LOG_WARN("fail to append sql", KR(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql));
} else if (NULL == (result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result is not expected to be NULL", KR(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(result->next())) {
LOG_WARN("fail to get row", KR(ret), K(tenant_id), K(sql));
} else {
EXTRACT_INT_FIELD_MYSQL(*result, "schema_version", schema_version, int64_t);
}
}
}
if (OB_SUCC(ret)) {
int64_t affected_rows = 0;
ObDMLSqlSplicer dml;
ObDMLExecHelper exec(sql_client, tenant_id);
if (OB_INVALID_VERSION == schema_version) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid schema_version", KR(ret), K(tenant_id), K(sys_param));
} else if (OB_FAIL(gen_basic_sys_variable_dml(tenant_id, sys_param, dml))) {
LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param));
} else if (OB_FAIL(dml.add_pk_column("schema_version", schema_version))) {
LOG_WARN("fail to add column", KR(ret), K(tenant_id), K(schema_version));
} else if (OB_FAIL(exec.exec_update(OB_ALL_SYS_VARIABLE_HISTORY_TNAME, dml, affected_rows))) {
LOG_WARN("execute insert failed", KR(ret));
} else if (!is_zero_row(affected_rows) && !is_single_row(affected_rows)) {
LOG_WARN("invalid affected_rows", KR(ret), K(tenant_id), K(affected_rows));
} else {
LOG_INFO("[UPGRADE] modify sys var history", KR(ret), K(tenant_id), K(sys_param));
}
}
}
return ret;
}
/*
* This function is used to restore backup data from cluster with lower cluster version.
* For missing system variable schema in physical restore, we compensate by methods below.
* 1. Missing system variable schema will be added according to hardcoded meta schema when refreshing schema.
* 2. (Not necessary) Modify __all_sys_variable/__all_sys_variable_history, so we can construct system variable schema
* from inner table when observer restarts.
*/
int ObUpgradeUtils::add_sys_var(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
common::ObArray<int64_t> &add_list)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == tenant_id
|| OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else {
ObArenaAllocator allocator("AddSysVar");
for (int64_t i = 0; OB_SUCC(ret) && i < add_list.count(); i++) {
int64_t var_store_idx = add_list.at(i);
const ObString &name = ObSysVariables::get_name(var_store_idx);
const ObObjType &type = ObSysVariables::get_type(var_store_idx);
const ObString &min = ObSysVariables::get_min(var_store_idx);
const ObString &max = ObSysVariables::get_max(var_store_idx);
const ObString &info = ObSysVariables::get_info(var_store_idx);
const int64_t flag = ObSysVariables::get_flags(var_store_idx);
const ObString zone("");
ObSysParam sys_param;
ObString value;
if (OB_FAIL(convert_sys_variable_value(var_store_idx, allocator, value))) {
LOG_WARN("fail to get sys variable value", KR(ret), K(tenant_id), K(var_store_idx));
} else if (OB_FAIL(sys_param.init(tenant_id, zone, name.ptr(), type,
value.ptr(), min.ptr(), max.ptr(), info.ptr(), flag))) {
LOG_WARN("sys_param init failed", KR(ret), K(tenant_id), K(name),
K(type), K(value), K(min), K(max), K(info), K(flag));
} else if (!sys_param.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("sys param is invalid", KR(ret), K(tenant_id), K(sys_param));
} else if (OB_FAIL(execute_add_sys_var_sql(sql_client, tenant_id, sys_param))) {
LOG_WARN("fail to execute add sys var sql", KR(ret), K(tenant_id));
} else if (OB_FAIL(execute_add_sys_var_history_sql(sql_client, tenant_id, sys_param))) {
LOG_WARN("fail to execute add sys var history sql", KR(ret), K(tenant_id));
}
}
}
return ret;
}
// C++ implement for special_update_sys_vars_for_tenant() in python upgrade script.
int ObUpgradeUtils::convert_sys_variable_value(
const int64_t var_store_idx,
common::ObIAllocator &allocator,
ObString &value)
{
int ret = OB_SUCCESS;
const ObString &name = ObSysVariables::get_name(var_store_idx);
if (0 == name.compare("nls_date_format")) {
if (OB_FAIL(ob_write_string(
allocator, ObString("YYYY-MM-DD HH24:MI:SS"), value))) {
LOG_WARN("fail to write string", KR(ret), K(name));
}
} else if (0 == name.compare("nls_timestamp_format")) {
if (OB_FAIL(ob_write_string(
allocator, ObString("YYYY-MM-DD HH24:MI:SS.FF"), value))) {
LOG_WARN("fail to write string", KR(ret), K(name));
}
} else if (0 == name.compare("nls_timestamp_tz_format")) {
if (OB_FAIL(ob_write_string(
allocator, ObString("YYYY-MM-DD HH24:MI:SS.FF TZR TZD"), value))) {
LOG_WARN("fail to write string", KR(ret), K(name));
}
} else {
const ObString &ori_value = ObSysVariables::get_value(var_store_idx);
value.assign_ptr(ori_value.ptr(), ori_value.length());
}
return ret;
}
int ObUpgradeUtils::execute_add_sys_var_sql(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObSysParam &sys_param)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == tenant_id
|| OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else {
int64_t affected_rows = 0;
ObDMLSqlSplicer dml;
ObDMLExecHelper exec(sql_client, tenant_id);
if (OB_FAIL(gen_basic_sys_variable_dml(tenant_id, sys_param, dml))) {
LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param));
} else if (OB_FAIL(dml.add_column("value", FORMAT_STR(ObString(sys_param.value_))))) {
LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param));
} else if (OB_FAIL(exec.exec_replace(OB_ALL_SYS_VARIABLE_TNAME, dml, affected_rows))) {
LOG_WARN("execute insert failed", KR(ret));
} else if (!is_zero_row(affected_rows)
&& !is_single_row(affected_rows)
&& !is_double_row(affected_rows)) {
LOG_WARN("invalid affected_rows", KR(ret), K(tenant_id), K(affected_rows));
} else {
LOG_INFO("[UPGRADE] add sys var", KR(ret), K(tenant_id), K(sys_param));
}
}
return ret;
}
int ObUpgradeUtils::execute_add_sys_var_history_sql(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObSysParam &sys_param)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == tenant_id
|| OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else {
int64_t schema_version = OB_INVALID_VERSION;
{
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
ObSqlString sql;
if (OB_FAIL(sql.append_fmt(
"select schema_version from %s where tenant_id = %lu"
" order by schema_version asc limit 1",
OB_ALL_SYS_VARIABLE_HISTORY_TNAME,
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))) {
LOG_WARN("fail to append sql", KR(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql));
} else if (NULL == (result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result is not expected to be NULL", KR(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(result->next())) {
LOG_WARN("fail to get row", KR(ret), K(tenant_id), K(sql));
} else {
EXTRACT_INT_FIELD_MYSQL(*result, "schema_version", schema_version, int64_t);
}
}
}
if (OB_SUCC(ret)) {
int64_t affected_rows = 0;
ObDMLSqlSplicer dml;
ObDMLExecHelper exec(sql_client, tenant_id);
if (OB_INVALID_VERSION == schema_version) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid schema_version", KR(ret), K(tenant_id), K(sys_param));
} else if (OB_FAIL(gen_basic_sys_variable_dml(tenant_id, sys_param, dml))) {
LOG_WARN("fail to gen dml", KR(ret), K(tenant_id), K(sys_param));
} else if (OB_FAIL(dml.add_pk_column("schema_version", schema_version))
|| OB_FAIL(dml.add_column("value", FORMAT_STR(ObString(sys_param.value_))))
|| OB_FAIL(dml.add_column("is_deleted", 0))) {
LOG_WARN("fail to add column", KR(ret), K(tenant_id), K(schema_version));
} else if (OB_FAIL(exec.exec_replace(OB_ALL_SYS_VARIABLE_HISTORY_TNAME, dml, affected_rows))) {
LOG_WARN("execute insert failed", KR(ret));
} else if (!is_zero_row(affected_rows)
&& !is_single_row(affected_rows)
&& !is_double_row(affected_rows)) {
LOG_WARN("invalid affected_rows", KR(ret), K(tenant_id), K(affected_rows));
} else {
LOG_INFO("[UPGRADE] add sys var history", KR(ret), K(tenant_id), K(sys_param));
}
}
}
return ret;
}
int ObUpgradeUtils::gen_basic_sys_variable_dml(
const uint64_t tenant_id,
const ObSysParam &sys_param,
ObDMLSqlSplicer &dml)
{
int ret = OB_SUCCESS;
if (OB_INVALID_TENANT_ID == tenant_id
|| OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else if (
OB_FAIL(dml.add_pk_column("tenant_id",
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))
|| OB_FAIL(dml.add_pk_column("zone", ""))
|| OB_FAIL(dml.add_pk_column("name", FORMAT_STR(ObString(sys_param.name_))))
|| OB_FAIL(dml.add_column("data_type", sys_param.data_type_))
|| OB_FAIL(dml.add_column("min_val", FORMAT_STR(ObString(sys_param.min_val_))))
|| OB_FAIL(dml.add_column("max_val", FORMAT_STR(ObString(sys_param.max_val_))))
|| OB_FAIL(dml.add_column("info", FORMAT_STR(ObString(sys_param.info_))))
|| OB_FAIL(dml.add_column("flags", sys_param.flags_))) {
LOG_WARN("fail to add column", KR(ret), K(tenant_id), K(sys_param));
}
return ret;
}
/* =========== upgrade sys variable end =========== */
/* =========== upgrade sys stat =========== */

View File

@ -38,7 +38,6 @@ public:
static int can_run_upgrade_job(rootserver::ObRsJobType job_type, bool &can);
static int check_upgrade_job_passed(rootserver::ObRsJobType job_type);
static int check_schema_sync(bool &is_sync);
/* physical restore related */
// upgrade_sys_variable()/upgrade_sys_stat() can be called when enable_ddl = false.
static int upgrade_sys_variable(
obrpc::ObCommonRpcProxy &rpc_proxy,
@ -51,42 +50,16 @@ private:
static int check_rs_job_success(rootserver::ObRsJobType job_type, bool &success);
/* upgrade sys variable */
static int calc_diff_sys_var(
static int calc_diff_sys_var_(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
common::ObArray<int64_t> &update_list,
common::ObArray<int64_t> &add_list);
static int update_sys_var(
static int update_sys_var_(
obrpc::ObCommonRpcProxy &rpc_proxy,
const uint64_t tenant_id,
const bool is_update,
common::ObArray<int64_t> &update_list);
static int add_sys_var(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
common::ObArray<int64_t> &add_list);
static int execute_update_sys_var_sql(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const share::schema::ObSysParam &sys_param);
static int execute_update_sys_var_history_sql(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const share::schema::ObSysParam &sys_param);
static int execute_add_sys_var_sql(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const share::schema::ObSysParam &sys_param);
static int execute_add_sys_var_history_sql(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const share::schema::ObSysParam &sys_param);
static int convert_sys_variable_value(
const int64_t var_store_idx,
common::ObIAllocator &allocator,
common::ObString &value);
static int gen_basic_sys_variable_dml(
const uint64_t tenant_id,
const share::schema::ObSysParam &sys_param,
share::ObDMLSqlSplicer &dml);
/* upgrade sys variable end */
static int filter_sys_stat(
common::ObISQLClient &sql_client,

View File

@ -169,6 +169,12 @@ ObColumnSchemaV2 &ObColumnSchemaV2::operator =(const ObColumnSchemaV2 &src_schem
return *this;
}
int ObColumnSchemaV2::assign(const ObColumnSchemaV2 &other)
{
*this = other;
return error_ret_;
}
bool ObColumnSchemaV2::operator==(const ObColumnSchemaV2 &r) const
{
return (tenant_id_ == r.tenant_id_ && table_id_ == r.table_id_ && column_id_ == r.column_id_

View File

@ -61,6 +61,8 @@ public:
bool operator==(const ObColumnSchemaV2 &r) const;
bool operator!=(const ObColumnSchemaV2 &r) const;
int assign(const ObColumnSchemaV2 &other);
//set methods
inline void set_tenant_id(const uint64_t id) { tenant_id_ = id; }
inline void set_table_id(const uint64_t id) { table_id_ = id; }

View File

@ -1271,7 +1271,7 @@ int ObSchemaServiceSQLImpl::get_sys_variable_schema(
}
if (OB_SUCC(ret)) {
// mock missed system variable schema by hardcoded schema
// To avoid -5044 error, mock missed system variable schema with default value by hardcoded schema.
for (int64_t i = 0; OB_SUCC(ret) && i < ObSysVariables::get_amount(); i++) {
ObSysVarClassType sys_var_id = ObSysVariables::get_sys_var_id(i);
const ObSysVarSchema *sys_var = NULL;

View File

@ -2252,6 +2252,17 @@ bool ObSysVarSchema::is_equal_except_value(const ObSysVarSchema &other) const
}
return bret;
}
bool ObSysVarSchema::is_equal_for_add(const ObSysVarSchema &other) const
{
bool bret = false;
if (is_equal_except_value(other)
&& 0 == value_.compare(other.value_)
&& zone_ == other.zone_) {
bret = true;
}
return bret;
}
/*-------------------------------------------------------------------------------------------------
* ------------------------------ObDatabaseSchema-------------------------------------------
----------------------------------------------------------------------------------------------------*/

View File

@ -1271,6 +1271,7 @@ public:
void reset();
int64_t get_convert_size() const;
bool is_equal_except_value(const ObSysVarSchema &other) const;
bool is_equal_for_add(const ObSysVarSchema &other) const;
uint64_t get_tenant_id() const { return tenant_id_; }
void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; }
const common::ObString &get_name() const { return name_; }

View File

@ -339,23 +339,18 @@ int ObSchemaUtils::construct_tenant_space_simple_table(
ObSimpleTableSchemaV2 &table)
{
int ret = OB_SUCCESS;
if (is_sys_tenant(tenant_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
} else {
table.set_tenant_id(tenant_id);
// for distributed virtual table in tenant space
int64_t part_num = table.get_partition_num();
for (int64_t i = 0; OB_SUCC(ret) && i < part_num; i++) {
ObPartition *part = table.get_part_array()[i];
if (OB_ISNULL(part)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("part is null", KR(ret), K(i));
} else {
part->set_tenant_id(tenant_id);
}
} // end for
}
table.set_tenant_id(tenant_id);
// for distributed virtual table in tenant space
int64_t part_num = table.get_partition_num();
for (int64_t i = 0; OB_SUCC(ret) && i < part_num; i++) {
ObPartition *part = table.get_part_array()[i];
if (OB_ISNULL(part)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("part is null", KR(ret), K(i));
} else {
part->set_tenant_id(tenant_id);
}
} // end for
return ret;
}

View File

@ -1852,8 +1852,7 @@ int ObTableSchema::alter_column(ObColumnSchemaV2 &column_schema, ObColumnCheckMo
}
}
if (OB_FAIL(ret)) {
} else if (OB_FALSE_IT(*src_schema = column_schema)) {
} else if (OB_FAIL(src_schema->get_err_ret())) {
} else if (OB_FAIL(src_schema->assign(column_schema))) {
LOG_WARN("failed to assign src schema", K(ret), K(column_schema));
}
}
@ -4271,10 +4270,10 @@ int ObTableSchema::check_column_can_be_altered_offline(
int ObTableSchema::check_column_can_be_altered_online(
const ObColumnSchemaV2 *src_schema,
ObColumnSchemaV2 *dst_schema)
ObColumnSchemaV2 *dst_schema) const
{
int ret = OB_SUCCESS;
ObColumnSchemaV2 *tmp_column = NULL;
const ObColumnSchemaV2 *tmp_column = NULL;
if (OB_ISNULL(src_schema) || NULL == dst_schema) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("The column schema is NULL", K(ret));
@ -4289,8 +4288,23 @@ int ObTableSchema::check_column_can_be_altered_online(
LOG_WARN("Only NORMAL table and INDEX table and SYSTEM table are allowed", K(ret));
} else {
LOG_DEBUG("check column schema can be altered", KPC(src_schema), KPC(dst_schema));
// Additional restriction for system table:
// 1. Can't alter column name
// 2. Can't alter column from "NULL" to "NOT NULL"
if (is_system_table(get_table_id())) {
if (0 != src_schema->get_column_name_str().compare(dst_schema->get_column_name_str())) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "Alter system table's column name is");
LOG_WARN("Alter system table's column name is not supported", KR(ret), K(src_schema), K(dst_schema));
} else if (src_schema->is_nullable() && !dst_schema->is_nullable()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "Alter system table's column from `NULL` to `NOT NULL`");
LOG_WARN("Alter system table's column from `NULL` to `NOT NULL` is not supported", KR(ret), K(src_schema), K(dst_schema));
}
}
bool is_oracle_mode = false;
if (OB_FAIL(check_if_oracle_compat_mode(is_oracle_mode))) {
if (FAILEDx(check_if_oracle_compat_mode(is_oracle_mode))) {
LOG_WARN("check if oracle compat mode failed", K(ret));
} else if (is_oracle_mode
&& ob_is_number_tc(src_schema->get_data_type())

View File

@ -1257,7 +1257,7 @@ public:
virtual int init_column_meta_array(
common::ObIArray<blocksstable::ObSSTableColumnMeta> &meta_array) const override;
int check_column_can_be_altered_online(const ObColumnSchemaV2 *src_schema,
ObColumnSchemaV2 *dst_schema);
ObColumnSchemaV2 *dst_schema) const;
int check_column_can_be_altered_offline(const ObColumnSchemaV2 *src_schema,
ObColumnSchemaV2 *dst_schema) const;
int check_alter_column_is_offline(const ObColumnSchemaV2 *src_schema,

View File

@ -860,9 +860,11 @@ int ObTableSqlService::revise_check_cst_column_info(
return ret;
}
int ObTableSqlService::insert_single_column(ObISQLClient &sql_client,
const ObTableSchema &new_table_schema,
const ObColumnSchemaV2 &new_column_schema)
int ObTableSqlService::insert_single_column(
ObISQLClient &sql_client,
const ObTableSchema &new_table_schema,
const ObColumnSchemaV2 &new_column_schema,
const bool record_ddl_operation)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_ddl_allowed(new_table_schema))) {
@ -881,7 +883,7 @@ int ObTableSqlService::insert_single_column(ObISQLClient &sql_client,
}
}
}
if (OB_SUCC(ret)) {
if (OB_SUCC(ret) && record_ddl_operation) {
ObSchemaOperation opt;
opt.tenant_id_ = new_table_schema.get_tenant_id();
opt.database_id_ = new_table_schema.get_database_id();
@ -3681,10 +3683,10 @@ int ObTableSqlService::gen_column_dml(
orig_default_value.assign_ptr(orig_default_value_buf, static_cast<int32_t>(orig_default_value_len));
cur_default_value.assign_ptr(cur_default_value_buf, static_cast<int32_t>(cur_default_value_len));
}
LOG_DEBUG("begin gen_column_dml", K(ret), K(compat_mode), K(orig_default_value), K(cur_default_value), K(orig_default_value_len), K(cur_default_value_len));
LOG_TRACE("begin gen_column_dml", K(ret), K(compat_mode), K(orig_default_value), K(cur_default_value), K(orig_default_value_len), K(cur_default_value_len));
}
}
LOG_DEBUG("begin gen_column_dml", K(ret), K(orig_default_value), K(cur_default_value), K(column));
LOG_TRACE("begin gen_column_dml", K(ret), K(orig_default_value), K(cur_default_value), K(column));
if (OB_SUCC(ret)) {
ObString cur_default_value_v1;
if (column.get_orig_default_value().is_null()) {

View File

@ -98,7 +98,8 @@ public:
//alter table add column
int insert_single_column(common::ObISQLClient &sql_client,
const ObTableSchema &new_table_schema,
const ObColumnSchemaV2 &column_schema);
const ObColumnSchemaV2 &column_schema,
const bool record_ddl_operation);
//alter table add constraint
int insert_single_constraint(common::ObISQLClient &sql_client,
const ObTableSchema &new_table_schema,

View File

@ -38,6 +38,7 @@
#include "share/backup/ob_backup_io_adapter.h"
#include "share/backup/ob_backup_config.h"
#include "observer/mysql/ob_query_response_time.h"
#include "rootserver/ob_rs_job_table_operator.h" //ObRsJobType
namespace oceanbase
{
@ -2596,15 +2597,24 @@ int ObRunUpgradeJobResolver::resolve(const ParseNode &parse_tree)
LOG_ERROR("create ObRunUpgradeJobStmt failed", KR(ret));
} else {
stmt_ = stmt;
ObString version_str;
ObString str;
uint64_t version = OB_INVALID_VERSION;
if (OB_FAIL(Util::resolve_string(parse_tree.children_[0], version_str))) {
if (OB_FAIL(Util::resolve_string(parse_tree.children_[0], str))) {
LOG_WARN("resolve string failed", KR(ret));
} else if (OB_FAIL(ObClusterVersion::get_version(version_str, version))) {
LOG_WARN("fail to get version", KR(ret), K(version_str));
} else if (0 == str.case_compare(rootserver::ObRsJobTableOperator::get_job_type_str(
rootserver::JOB_TYPE_UPGRADE_SYSTEM_VARIABLE))) {
stmt->get_rpc_arg().action_ = obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE;
} else if (0 == str.case_compare(rootserver::ObRsJobTableOperator::get_job_type_str(
rootserver::JOB_TYPE_UPGRADE_SYSTEM_TABLE))) {
stmt->get_rpc_arg().action_ = obrpc::ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE;
} else {
stmt->get_rpc_arg().action_ = obrpc::ObUpgradeJobArg::RUN_UPGRADE_JOB;
stmt->get_rpc_arg().version_ = static_cast<int64_t>(version);
// UPGRADE_POST_ACTION
if (OB_FAIL(ObClusterVersion::get_version(str, version))) {
LOG_WARN("fail to get version", KR(ret), K(str));
} else {
stmt->get_rpc_arg().action_ = obrpc::ObUpgradeJobArg::UPGRADE_POST_ACTION;
stmt->get_rpc_arg().version_ = static_cast<int64_t>(version);
}
}
}
}

View File

@ -194,7 +194,7 @@ def get_max_used_job_id(cur):
def check_can_run_upgrade_job(cur, version):
try:
sql = """select job_status from oceanbase.__all_rootservice_job
where job_type = 'RUN_UPGRADE_POST_JOB' and extra_info = '{0}'
where job_type = 'UPGRADE_POST_ACTION' and extra_info = '{0}'
order by job_id desc limit 1""".format(version)
results = query(cur, sql)

View File

@ -12717,7 +12717,7 @@
#def check_can_run_upgrade_job(cur, version):
# try:
# sql = """select job_status from oceanbase.__all_rootservice_job
# where job_type = 'RUN_UPGRADE_POST_JOB' and extra_info = '{0}'
# where job_type = 'UPGRADE_POST_ACTION' and extra_info = '{0}'
# order by job_id desc limit 1""".format(version)
# results = query(cur, sql)
#

View File

@ -12717,7 +12717,7 @@
#def check_can_run_upgrade_job(cur, version):
# try:
# sql = """select job_status from oceanbase.__all_rootservice_job
# where job_type = 'RUN_UPGRADE_POST_JOB' and extra_info = '{0}'
# where job_type = 'UPGRADE_POST_ACTION' and extra_info = '{0}'
# order by job_id desc limit 1""".format(version)
# results = query(cur, sql)
#