Change Cluster Version to 4.1.0.0

This commit is contained in:
tino247 2022-11-13 02:09:47 +08:00 committed by wangzelin.wzl
parent ba634cea50
commit 7f0d609c5f
13 changed files with 129 additions and 1027 deletions

View File

@ -4,7 +4,7 @@ include(cmake/Utils.cmake)
include(cmake/Env.cmake)
project("OceanBase_CE"
VERSION 4.0.0.0
VERSION 4.1.0.0
DESCRIPTION "OceanBase distributed database system"
HOMEPAGE_URL "https://open.oceanbase.com/"
LANGUAGES CXX C ASM)

View File

@ -1,4 +1,4 @@
Name: %NAME
Version:4.0.0.0
Version:4.1.0.0
Release: %RELEASE
BuildRequires: binutils = 2.30

View File

@ -7961,7 +7961,7 @@ int ObRootService::run_upgrade_job(const obrpc::ObUpgradeJobArg &arg)
|| ObUpgradeJobArg::UPGRADE_SYSTEM_VARIABLE == arg.action_
|| ObUpgradeJobArg::UPGRADE_SYSTEM_TABLE == arg.action_) {
if (ObUpgradeJobArg::UPGRADE_POST_ACTION == arg.action_
&& !ObUpgradeChecker::check_cluster_version_exist(version)) {
&& !ObUpgradeChecker::check_data_version_exist(version)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "run upgrade job with such version is");

View File

@ -306,7 +306,7 @@ int ObUpgradeExecutor::execute(
// Python upgrade script may set enable_ddl = false before it run upgrade job.
// TODO:
// 1. support run upgrade post action from `COMPATIBLE` to current cluster version.
// 1. support run upgrade post action from `COMPATIBLE` to current data version.
int ObUpgradeExecutor::run_upgrade_post_job_(const int64_t version)
{
int ret = OB_SUCCESS;
@ -314,7 +314,7 @@ int ObUpgradeExecutor::run_upgrade_post_job_(const int64_t version)
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("executor should stopped", KR(ret));
} else if (!ObUpgradeChecker::check_cluster_version_exist(version)) {
} else if (!ObUpgradeChecker::check_data_version_exist(version)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("unsupported version to run upgrade job", KR(ret), K(version));
} else {

View File

@ -230,6 +230,28 @@ void ObClusterVersion::update_cluster_version(const uint64_t cluster_version)
ATOMIC_SET(&cluster_version_, cluster_version);
}
int ObClusterVersion::get_tenant_data_version(
const uint64_t tenant_id,
uint64_t &data_version)
{
int ret = OB_SUCCESS;
//TODO: mock data version with cluster version for now.
UNUSED(tenant_id);
data_version = ATOMIC_LOAD(&cluster_version_);
return ret;
}
int ObClusterVersion::tenant_need_upgrade(
const uint64_t tenant_id,
bool &need_upgrade)
{
int ret = OB_SUCCESS;
//TODO: mock data version with cluster version for now.
UNUSED(tenant_id);
need_upgrade = get_cluster_version() < CLUSTER_CURRENT_VERSION;
return ret;
}
int ObClusterVersion::is_valid(const char *verstr)
{
int ret = OB_SUCCESS;

View File

@ -23,19 +23,28 @@ namespace common
class ObServerConfig;
class ObString;
//TODO: Will change ObClusterVersion to ObVersion later
class ObClusterVersion
{
public:
ObClusterVersion();
~ObClusterVersion() { destroy(); }
int init(const common::ObServerConfig *config);
int init(const uint64_t cluster_version);
void destroy();
int64_t to_string(char *buf, const int64_t buf_len) const;
/* cluster version related */
int init(const common::ObServerConfig *config);
int init(const uint64_t cluster_version);
int refresh_cluster_version(const char *verstr);
int reload_config();
uint64_t get_cluster_version() { return ATOMIC_LOAD(&cluster_version_); }
void update_cluster_version(const uint64_t cluster_version);
/*------------------------*/
/* data version related */
int get_tenant_data_version(const uint64_t tenant_id, uint64_t &data_version);
int tenant_need_upgrade(const uint64_t tenant_id, bool &need_upgrade);
/*------------------------*/
public:
static ObClusterVersion &get_instance();
static int is_valid(const char *verstr);
@ -70,7 +79,7 @@ private:
#define OB_VSN_MAJOR_PATCH(version) (static_cast<const uint8_t>((version >> OB_VSN_MAJOR_PATCH_SHIFT) & OB_VSN_MAJOR_PATCH_MASK))
#define OB_VSN_MINOR_PATCH(version) (static_cast<const uint8_t>(version & OB_VSN_MINOR_PATCH_MASK))
#define CALC_CLUSTER_VERSION(major, minor, major_patch, minor_patch) \
#define CALC_VERSION(major, minor, major_patch, minor_patch) \
(((major) << OB_VSN_MAJOR_SHIFT) + \
((minor) << OB_VSN_MINOR_SHIFT) + \
((major_patch) << OB_VSN_MAJOR_PATCH_SHIFT) + \
@ -78,7 +87,7 @@ private:
constexpr static inline uint64_t
cal_version(const uint64_t major, const uint64_t minor, const uint64_t major_patch, const uint64_t minor_patch)
{
return CALC_CLUSTER_VERSION(major, minor, major_patch, minor_patch);
return CALC_VERSION(major, minor, major_patch, minor_patch);
}
#define DEF_MAJOR_VERSION 1
@ -146,9 +155,10 @@ cal_version(const uint64_t major, const uint64_t minor, const uint64_t major_pat
// - 4. Print: cluster version str will be printed as 4 parts.
#define CLUSTER_VERSION_3_2_3_0 (oceanbase::common::cal_version(3, 2, 3, 0))
#define CLUSTER_VERSION_4_0_0_0 (oceanbase::common::cal_version(4, 0, 0, 0))
//FIXME If you update the above version, please update me, CLUSTER_CURRENT_VERSION & ObUpgradeChecker!!!!!!
#define CLUSTER_VERSION_4_1_0_0 (oceanbase::common::cal_version(4, 1, 0, 0))
//!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#define CLUSTER_CURRENT_VERSION CLUSTER_VERSION_4_0_0_0
//TODO: If you update the above version, please update CLUSTER_CURRENT_VERSION.
#define CLUSTER_CURRENT_VERSION CLUSTER_VERSION_4_1_0_0
#define GET_MIN_CLUSTER_VERSION() (oceanbase::common::ObClusterVersion::get_instance().get_cluster_version())
#define GET_UNIS_CLUSTER_VERSION() (::oceanbase::lib::get_unis_compat_version() ?: GET_MIN_CLUSTER_VERSION())
@ -160,6 +170,21 @@ cal_version(const uint64_t major, const uint64_t minor, const uint64_t major_pat
#define IS_CLUSTER_VERSION_BEFORE_4_0_0_0 (oceanbase::common::ObClusterVersion::get_instance().get_cluster_version() < CLUSTER_VERSION_4_0_0_0)
#define IS_CLUSTER_VERSION_AFTER_2274 (oceanbase::common::ObClusterVersion::get_instance().get_cluster_version() > CLUSTER_VERSION_2274)
// ATTENSION !!!!!!!!!!!!!!!!!!!!!!!!!!!
// 1. After 4.0, each cluster_version is corresponed to a data version.
// 2. cluster_version and data_version is not compariable.
// 3. TODO: If you update data_version below, please update DATA_CURRENT_VERSION & ObUpgradeChecker too.
// For more detail: https://yuque.antfin-inc.com/ob/rootservice/xywr36
#define DATA_VERSION_4_0_0_0 (oceanbase::common::cal_version(4, 0, 0, 0))
#define DATA_VERSION_4_1_0_0 (oceanbase::common::cal_version(4, 1, 0, 0))
// should check returned ret
#define DATA_CURRENT_VERSION DATA_VERSION_4_1_0_0
#define GET_MIN_DATA_VERSION(tenant_id, data_version) (oceanbase::common::ObClusterVersion::get_instance().get_tenant_data_version((tenant_id), (data_version)))
#define TENANT_NEED_UPGRADE(tenant_id, need) (oceanbase::common::ObClusterVersion::get_instance().tenant_need_upgrade((tenant_id), (need)))
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
} // end of namespace common
} // end of namespace oceanbase

View File

@ -33,54 +33,15 @@ using namespace sql;
namespace share
{
const uint64_t ObUpgradeChecker::UPGRADE_PATH[CLUTER_VERSION_NUM] = {
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 3UL), // 1.4.3
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 40UL), // 1.4.40
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 50UL), // 1.4.50
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 51UL), // 1.4.51
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 60UL), // 1.4.60
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 61UL), // 1.4.61
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 70UL), // 1.4.70
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 71UL), // 1.4.71
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 72UL), // 1.4.72
CALC_CLUSTER_VERSION(1UL, 4UL, 0UL, 76UL), // 1.4.76
CALC_CLUSTER_VERSION(2UL, 0UL, 0UL, 0UL), // 2.0.0
CALC_CLUSTER_VERSION(2UL, 1UL, 0UL, 0UL), // 2.1.0
CALC_CLUSTER_VERSION(2UL, 1UL, 0UL, 1UL), // 2.1.1
CALC_CLUSTER_VERSION(2UL, 1UL, 0UL, 11UL), // 2.1.11
CALC_CLUSTER_VERSION(2UL, 1UL, 0UL, 20UL), // 2.1.20
CALC_CLUSTER_VERSION(2UL, 1UL, 0UL, 30UL), // 2.1.30
CALC_CLUSTER_VERSION(2UL, 1UL, 0UL, 31UL), // 2.1.31
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 0UL), // 2.2.0
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 1UL), // 2.2.1
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 20UL), // 2.2.20
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 30UL), // 2.2.30
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 40UL), // 2.2.40
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 50UL), // 2.2.50
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 60UL), // 2.2.60
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 70UL), // 2.2.70
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 71UL), // 2.2.71
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 72UL), // 2.2.72
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 73UL), // 2.2.73
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 74UL), // 2.2.74
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 75UL), // 2.2.75
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 76UL), // 2.2.76
CALC_CLUSTER_VERSION(2UL, 2UL, 0UL, 77UL), // 2.2.77
CALC_CLUSTER_VERSION(3UL, 1UL, 0UL, 0UL), // 3.1.0
CALC_CLUSTER_VERSION(3UL, 1UL, 0UL, 1UL), // 3.1.1
CALC_CLUSTER_VERSION(3UL, 1UL, 0UL, 2UL), // 3.1.2
CALC_CLUSTER_VERSION(3UL, 2UL, 0UL, 0UL), // 3.2.0
CALC_CLUSTER_VERSION(3UL, 2UL, 0UL, 1UL), // 3.2.1
CALC_CLUSTER_VERSION(3UL, 2UL, 0UL, 2UL), // 3.2.2
CALC_CLUSTER_VERSION(3UL, 2UL, 3UL, 0UL), // 3.2.3.0
CALC_CLUSTER_VERSION(4UL, 0UL, 0UL, 0UL) // 4.0.0.0
const uint64_t ObUpgradeChecker::UPGRADE_PATH[DATA_VERSION_NUM] = {
CALC_VERSION(4UL, 1UL, 0UL, 0UL) // 4.1.0.0
};
bool ObUpgradeChecker::check_cluster_version_exist(
bool ObUpgradeChecker::check_data_version_exist(
const uint64_t version)
{
bool bret = false;
OB_ASSERT(CLUTER_VERSION_NUM == ARRAYSIZEOF(UPGRADE_PATH));
OB_ASSERT(DATA_VERSION_NUM == ARRAYSIZEOF(UPGRADE_PATH));
for (int64_t i = 0; !bret && i < ARRAYSIZEOF(UPGRADE_PATH); i++) {
bret = (version == UPGRADE_PATH[i]);
}
@ -599,24 +560,8 @@ int ObUpgradeProcesserSet::init(
} \
} \
}
// order by cluster version asc
INIT_PROCESSOR_BY_VERSION(2, 2, 0, 60);
INIT_PROCESSOR_BY_VERSION(2, 2, 0, 70);
INIT_PROCESSOR_BY_VERSION(2, 2, 0, 71);
INIT_PROCESSOR_BY_VERSION(2, 2, 0, 72);
INIT_PROCESSOR_BY_VERSION(2, 2, 0, 73);
INIT_PROCESSOR_BY_VERSION(2, 2, 0, 74);
INIT_PROCESSOR_BY_VERSION(2, 2, 0, 75);
INIT_PROCESSOR_BY_VERSION(2, 2, 0, 76);
INIT_PROCESSOR_BY_VERSION(2, 2, 0, 77);
INIT_PROCESSOR_BY_VERSION(3, 1, 0, 0);
INIT_PROCESSOR_BY_VERSION(3, 1, 0, 1);
INIT_PROCESSOR_BY_VERSION(3, 1, 0, 2);
INIT_PROCESSOR_BY_VERSION(3, 2, 0, 0);
INIT_PROCESSOR_BY_VERSION(3, 2, 0, 1);
INIT_PROCESSOR_BY_VERSION(3, 2, 0, 2);
INIT_PROCESSOR_BY_VERSION(3, 2, 3, 0);
INIT_PROCESSOR_BY_VERSION(4, 0, 0, 0);
// order by data version asc
INIT_PROCESSOR_BY_VERSION(4, 1, 0, 0);
#undef INIT_PROCESSOR_BY_VERSION
inited_ = true;
}
@ -737,7 +682,7 @@ int ObUpgradeProcesserSet::get_processor_idx_by_version(
}
ObBaseUpgradeProcessor::ObBaseUpgradeProcessor()
: inited_(false), cluster_version_(OB_INVALID_VERSION),
: inited_(false), data_version_(OB_INVALID_VERSION),
tenant_id_(common::OB_INVALID_ID), mode_(UPGRADE_MODE_INVALID),
sql_proxy_(NULL), rpc_proxy_(NULL), common_proxy_(NULL), schema_service_(NULL),
check_stop_provider_(NULL)
@ -751,12 +696,12 @@ int ObBaseUpgradeProcessor::check_inner_stat() const
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init yet", KR(ret));
} else if (cluster_version_ <= 0
} else if (data_version_ <= 0
|| tenant_id_ == OB_INVALID_ID
|| UPGRADE_MODE_INVALID == mode_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid processor status",
KR(ret), K_(cluster_version), K_(tenant_id), K_(mode));
KR(ret), K_(data_version), K_(tenant_id), K_(mode));
} else if (GCTX.is_standby_cluster() && OB_SYS_TENANT_ID != tenant_id_) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("run upgrade job for non-sys tenant in standby cluster is not supported",
@ -771,7 +716,7 @@ int ObBaseUpgradeProcessor::check_inner_stat() const
}
int ObBaseUpgradeProcessor::init(
int64_t cluster_version,
int64_t data_version,
UpgradeMode mode,
common::ObMySQLProxy &sql_proxy,
obrpc::ObSrvRpcProxy &rpc_proxy,
@ -785,7 +730,7 @@ int ObBaseUpgradeProcessor::init(
LOG_WARN("init twice", KR(ret));
} else {
mode_ = mode;
cluster_version_ = cluster_version;
data_version_ = data_version;
sql_proxy_ = &sql_proxy;
rpc_proxy_ = &rpc_proxy;
common_proxy_ = &common_proxy;
@ -796,797 +741,8 @@ int ObBaseUpgradeProcessor::init(
return ret;
}
/* =========== 22070 upgrade processor start ============= */
int ObUpgradeFor22070Processor::pre_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(modify_trigger_package_source_body())) {
LOG_WARN("fail to modify trigger package source body", KR(ret));
} else {
// TODO:(xiaoyi.xy) rename oracle's database('__public' to 'PUBLIC') by ddl
}
return ret;
}
/* =========== special upgrade processor start ============= */
int ObUpgradeFor22070Processor::post_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(rebuild_subpart_partition_gc_info())) {
LOG_WARN("fail to rebuild subpart partition gc info", KR(ret));
}
return ret;
}
// bugfix: https://aone.alibaba-inc.com/task/30451372
int ObUpgradeFor22070Processor::modify_trigger_package_source_body()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
int64_t start_ts = ObTimeUtility::current_time();
LOG_INFO("start modify_trigger_package_source_body", K(tenant_id));
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else {
const static char* update_all_tenant_trigger_str =
"update %s "
"set "
"package_spec_source = replace("
"package_spec_source,"
"'FUNCTION UPDATING(column VARCHAR2 := NULL) RETURN BOOL;',"
"'FUNCTION UPDATING(column_name VARCHAR2 := NULL) RETURN BOOL;'"
"),"
"package_body_source = replace(replace("
"package_body_source,"
"'\n"
"PROCEDURE init_trigger(update_columns IN STRINGARRAY) IS\n"
"BEGIN\n"
" NULL;\n"
"END;\n"
"',"
"'\n"
"PROCEDURE init_trigger(update_columns IN STRINGARRAY) IS\n"
"BEGIN\n"
" update_columns_ := STRINGARRAY();\n"
" update_columns_.EXTEND(update_columns.COUNT);\n"
" FOR i IN 1 .. update_columns.COUNT LOOP\n"
" update_columns_(i) := update_columns(i);\n"
" END LOOP;\n"
"END;\n"
"'),"
"'\n"
"FUNCTION UPDATING(column VARCHAR2 := NULL) RETURN BOOL IS\n"
"BEGIN\n"
" RETURN (dml_event_ = 4);\n"
"END;\n"
"',"
"'\n"
"FUNCTION UPDATING(column_name VARCHAR2 := NULL) RETURN BOOL IS\n"
" is_updating BOOL;\n"
"BEGIN\n"
" is_updating := (dml_event_ = 4);\n"
" IF (is_updating AND column_name IS NOT NULL) THEN\n"
" is_updating := FALSE;\n"
" FOR i IN 1 .. update_columns_.COUNT LOOP\n"
" IF (UPPER(update_columns_(i)) = UPPER(column_name)) THEN is_updating := TRUE; EXIT; END IF;\n"
" END LOOP;\n"
" END IF;\n"
" RETURN is_updating;\n"
"END;\n"
"');";
const static char* update_all_tenant_trigger_history_str =
"update %s "
"set "
"package_spec_source = replace("
"package_spec_source,"
"'FUNCTION UPDATING(column VARCHAR2 := NULL) RETURN BOOL;',"
"'FUNCTION UPDATING(column_name VARCHAR2 := NULL) RETURN BOOL;'"
"),"
"package_body_source = replace(replace("
"package_body_source,"
"'\n"
"PROCEDURE init_trigger(update_columns IN STRINGARRAY) IS\n"
"BEGIN\n"
" NULL;\n"
"END;\n"
"',"
"'\n"
"PROCEDURE init_trigger(update_columns IN STRINGARRAY) IS\n"
"BEGIN\n"
" update_columns_ := STRINGARRAY();\n"
" update_columns_.EXTEND(update_columns.COUNT);\n"
" FOR i IN 1 .. update_columns.COUNT LOOP\n"
" update_columns_(i) := update_columns(i);\n"
" END LOOP;\n"
"END;\n"
"'),"
"'\n"
"FUNCTION UPDATING(column VARCHAR2 := NULL) RETURN BOOL IS\n"
"BEGIN\n"
" RETURN (dml_event_ = 4);\n"
"END;\n"
"',"
"'\n"
"FUNCTION UPDATING(column_name VARCHAR2 := NULL) RETURN BOOL IS\n"
" is_updating BOOL;\n"
"BEGIN\n"
" is_updating := (dml_event_ = 4);\n"
" IF (is_updating AND column_name IS NOT NULL) THEN\n"
" is_updating := FALSE;\n"
" FOR i IN 1 .. update_columns_.COUNT LOOP\n"
" IF (UPPER(update_columns_(i)) = UPPER(column_name)) THEN is_updating := TRUE; EXIT; END IF;\n"
" END LOOP;\n"
" END IF;\n"
" RETURN is_updating;\n"
"END;\n"
"') "
"where is_deleted = 0;";
ObSqlString sql;
int64_t affected_rows = 0;
// update __all_tenant_tigger
if (OB_FAIL(sql.assign_fmt(update_all_tenant_trigger_str, OB_ALL_TENANT_TRIGGER_TNAME))) {
LOG_WARN("fail to assign sql", KR(ret));
} else if (OB_FAIL(sql_proxy_->write(tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("failed to execute", KR(ret), K(affected_rows), K(sql));
} else {
LOG_INFO("update __all_tenant_trigger",
KR(ret), K(tenant_id), K(affected_rows), K(sql));
}
// update __all_tenant_tigger_history
if (OB_SUCC(ret)) {
sql.reset();
affected_rows = 0;
if (OB_FAIL(sql.assign_fmt(update_all_tenant_trigger_history_str,
OB_ALL_TENANT_TRIGGER_HISTORY_TNAME))) {
LOG_WARN("fail to assign sql", KR(ret));
} else if (OB_FAIL(sql_proxy_->write(tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("failed to execute", KR(ret), K(affected_rows), K(sql));
} else {
LOG_INFO("update __all_tenant_trigger_history",
KR(ret), K(tenant_id), K(affected_rows), K(sql));
}
}
}
LOG_INFO("finish modify_trigger_package_source_body",
KR(ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
return ret;
}
int ObUpgradeFor22070Processor::modify_oracle_public_database_name()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
int64_t start_ts = ObTimeUtility::current_time();
LOG_INFO("[UPGRADE] start modify_oracle_public_database_name", K(tenant_id));
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else {
ObSqlString sql;
int64_t affected_rows = 0;
uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
uint64_t tenant_id_in_sql = ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id);
uint64_t db_id_in_sql = ObSchemaUtils::get_extract_schema_id(exec_tenant_id, OB_PUBLIC_SCHEMA_ID);
const static char* upd_all_database_sql =
"update %s set database_name = 'PUBLIC' where tenant_id = %ld and database_id = %ld;";
const static char* upd_all_database_history_sql =
"update %s set database_name = 'PUBLIC' where tenant_id = %ld and \
database_id = %ld and database_name = '__public';";
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(exec_tenant_id, compat_mode))) {
LOG_WARN("fail to get tenant mode", K(ret), K(exec_tenant_id));
}
if (OB_SUCC(ret) && lib::Worker::CompatMode::ORACLE == compat_mode) {
// update __all_database
if (OB_FAIL(sql.assign_fmt(upd_all_database_sql,
OB_ALL_DATABASE_TNAME,
tenant_id_in_sql,
db_id_in_sql))) {
LOG_WARN("fail to assign sql", K(ret));
} else if (OB_FAIL(sql_proxy_->write(exec_tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("failed to execute", K(ret), K(affected_rows), K(sql));
} else {
LOG_INFO("[UPGRADE] update __all_database",
KR(ret), K(exec_tenant_id), K(affected_rows), K(sql));
}
// update __all_database_history
if (OB_SUCC(ret)) {
sql.reset();
affected_rows = 0;
if (OB_FAIL(sql.assign_fmt(upd_all_database_history_sql,
OB_ALL_DATABASE_HISTORY_TNAME,
tenant_id_in_sql,
db_id_in_sql))) {
LOG_WARN("fail to assign sql", K(ret));
} else if (OB_FAIL(sql_proxy_->write(exec_tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("failed to execute", K(ret), K(affected_rows), K(sql));
} else {
LOG_INFO("[UPGRADE] update __all_database_history",
KR(ret), K(exec_tenant_id), K(affected_rows), K(sql));
}
}
}
}
LOG_INFO("[UPGRADE] finish modify_oracle_public_database_name",
KR(ret), K(tenant_id), "cost", ObTimeUtility::current_time() - start_ts);
return ret;
}
// bugfix: https://aone.alibaba-inc.com/risk/29845662
int ObUpgradeFor22070Processor::rebuild_subpart_partition_gc_info()
{
return OB_NOT_SUPPORTED;
}
/* =========== 22070 upgrade processor end ============= */
int ObUpgradeFor22075Processor::post_upgrade()
{
ObSchemaGetterGuard guard;
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(tenant_id, guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode))) {
LOG_WARN("fail to get tenant mode", K(ret), K(tenant_id));
} else if (compat_mode == lib::Worker::CompatMode::ORACLE) {
ObSqlString sql;
int64_t affected_rows = 0;
OZ (sql.assign_fmt("grant create table, create type, create trigger, "
"create procedure, create sequence to resource"));
CK (sql_proxy_ != NULL);
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows, ORACLE_MODE));
}
return ret;
}
int ObUpgradeFor22077Processor::post_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(create_inner_keystore_for_sys_tenant())) {
LOG_WARN("fail to create inner keystore", KR(ret));
} else if (OB_FAIL(alter_default_profile())) {
LOG_WARN("fail to alter default profile", K(ret));
}
return ret;
}
// TDE is forbidden in sys tenant, so there isn't an inner keystore. The master key of sys tenant
// is used in encrypted zone since 22077. TDE remains forbidden by add check in create tablespace
int ObUpgradeFor22077Processor::create_inner_keystore_for_sys_tenant()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = get_tenant_id();
ObSchemaGetterGuard guard;
const ObKeystoreSchema *ks_schema = NULL;
LOG_INFO("create inner keystore for sys tenant start", K(tenant_id), K(ret));
if (OB_SYS_TENANT_ID != tenant_id) {
// do nothing
} else if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(tenant_id, guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(guard.get_keystore_schema(tenant_id, ks_schema))) {
LOG_WARN("fail to get keystore schema", KR(ret));
} else if (OB_NOT_NULL(ks_schema)) {
/*do nothing*/
} else {
obrpc::ObKeystoreDDLArg arg;
ObKeystoreSchema &keystore_schema = arg.schema_;
arg.exec_tenant_id_ = tenant_id;
arg.type_ = obrpc::ObKeystoreDDLArg::DDLType::CREATE_KEYSTORE;
int64_t keystore_id = OB_MYSQL_TENANT_INNER_KEYSTORE_ID;
keystore_schema.set_keystore_id(keystore_id);
keystore_schema.set_tenant_id(tenant_id);
keystore_schema.set_status(2);
keystore_schema.set_keystore_name("mysql_keystore");
if (OB_FAIL(common_proxy_->do_keystore_ddl(arg))) {
LOG_WARN("create keystore error", K(ret));
}
}
LOG_INFO("create inner keystore for sys tenant end", K(tenant_id), K(ret));
return ret;
}
int ObUpgradeFor22077Processor::alter_default_profile()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
const char* ddl_string = "alter profile default limit FAILED_LOGIN_ATTEMPTS unlimited"
" PASSWORD_LOCK_TIME 1";
ObSchemaGetterGuard guard;
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(tenant_id, guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode))) {
LOG_WARN("fail to get tenant mode", K(ret), K(tenant_id));
} else if (OB_ISNULL(sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy is null", K(ret));
} else if (compat_mode == lib::Worker::CompatMode::ORACLE) {
int64_t affected_rows = 0;
LOG_INFO("alter_default_profile start", K(tenant_id), K(ret));
OZ (sql_proxy_->write(tenant_id, ddl_string, affected_rows, ObCompatibilityMode::ORACLE_MODE));
LOG_INFO("alter_default_profile end", K(tenant_id), K(ret));
}
return ret;
}
/* =========== 3100 upgrade processor start ============= */
int ObUpgradeFor3100Processor::pre_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
}
return ret;
}
int ObUpgradeFor3100Processor::post_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(revise_check_cst_schema())) {
LOG_WARN("fail to rebuild subpart partition gc info", KR(ret));
}
return ret;
}
// fill the info of check constraint columns into __all_tenant_constraint_column
// which is a new inner table from 3.1.0
int ObUpgradeFor3100Processor::revise_check_cst_schema()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
ObSchemaGetterGuard schema_guard;
ObSArray<uint64_t> table_ids;
ObSArray<int64_t> check_cst_counts_in_each_tbl;
if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(compute_check_cst_counts_in_each_table(
tenant_id, table_ids, check_cst_counts_in_each_tbl))) {
LOG_WARN("compute check cst count in each table failed",
K(ret), K(tenant_id), K(table_ids), K(check_cst_counts_in_each_tbl));
} else if (OB_FAIL(update_check_csts(
tenant_id, table_ids, check_cst_counts_in_each_tbl, schema_guard))) {
LOG_WARN("update check csts failed", K(ret), K(check_cst_counts_in_each_tbl));
}
return ret;
}
int ObUpgradeFor3100Processor::compute_check_cst_counts_in_each_table(
const uint64_t tenant_id,
ObSArray<uint64_t> &table_ids,
ObSArray<int64_t> &check_cst_counts_in_each_table)
{
int ret = OB_SUCCESS;
table_ids.reset();
check_cst_counts_in_each_table.reset();
uint64_t table_id = OB_INVALID_ID;
int64_t check_cst_count = 0;
ObSqlString sql;
const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
common::sqlclient::ObMySQLResult *result = NULL;
/*
sql is :
SELECT table_id, count(1) AS check_csts_count
FROM __all_constraint
WHERE tenant_id = XXX and constraint_type = 3
GROUP BY(table_id);
*/
if (OB_FAIL(sql.append_fmt("SELECT table_id, count(1) as check_csts_count FROM"))) {
LOG_WARN("failed to append sql", K(ret));
} else if (OB_FAIL(sql.append_fmt(
" %s WHERE tenant_id = %lu and CONSTRAINT_TYPE = %d GROUP BY(table_id)",
OB_ALL_CONSTRAINT_TNAME,
ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id),
CONSTRAINT_TYPE_CHECK))) {
// Why need to use tenant_id to filter while revising cst schemas ?
// Because old datas in inner table of sys tenant had't been removed after schema-split.
// https://work.aone.alibaba-inc.com/issue/35188973
LOG_WARN("failed to append sql for sys tenant",
K(ret), K(tenant_id), K(OB_ALL_CONSTRAINT_TNAME));
} else if (OB_FAIL(sql_proxy_->read(res, tenant_id, sql.ptr()))) {
LOG_WARN("failed to execute sql", K(ret), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get result", K(ret));
} else {
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
EXTRACT_INT_FIELD_MYSQL_WITH_TENANT_ID(*result, "table_id", table_id, tenant_id);
EXTRACT_INT_FIELD_MYSQL(*result, "check_csts_count", check_cst_count, int64_t);
if (OB_SUCC(ret)) {
if (OB_FAIL(table_ids.push_back(table_id))) {
LOG_WARN("push_back table_id into array failed", K(ret), K(table_id));
} else if (OB_FAIL(check_cst_counts_in_each_table.push_back(check_cst_count))) {
LOG_WARN("push_back check_cst_count into array failed", K(ret), K(check_cst_count));
}
}
}
if (OB_LIKELY(OB_ITER_END == ret)) {
ret = OB_SUCCESS;
} else if (OB_FAIL(ret)) {
LOG_WARN("fail to get sql result, iter quit", K(ret), K(sql));
} else if (OB_SUCC(ret)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ret should be OB_ITER_END", K(ret), K(sql));
}
}
}
return ret;
}
int ObUpgradeFor3100Processor::generate_constraint_schema(
obrpc::ObSchemaReviseArg &arg,
share::schema::ObSchemaGetterGuard &schema_guard,
ObIAllocator &allocator,
bool &is_need_to_revise)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = arg.tenant_id_;
uint64_t table_id = arg.table_id_;
ObRawExprFactory expr_factory(allocator);
ObRawExpr *expr = NULL;
ObSqlString sql;
is_need_to_revise = true; // for reentrant
SMART_VAR(sql::ObSQLSessionInfo, session) {
if (OB_FAIL(session.init(0 /*default session id*/,
0 /*default proxy id*/,
&allocator))) {
LOG_WARN("init session failed", K(ret));
} else if (OB_FAIL(session.load_default_sys_variable(false, false))) {
LOG_WARN("session load default system variable failed", K(ret));
} else {
ObResolverParams params;
params.expr_factory_ = &expr_factory;
params.allocator_ = &allocator;
params.session_info_ = &session;
const ObTableSchema *table_schema = NULL;
bool is_oracle_mode = false;
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("get table schema failed", K(ret), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table_schema is null", K(ret));
} else if (OB_FAIL(table_schema->check_if_oracle_compat_mode(is_oracle_mode))) {
LOG_WARN("fail to check oracle comapt mode", KR(ret), KPC(table_schema));
} else {
lib::CompatModeGuard g(is_oracle_mode ?
lib::Worker::CompatMode::ORACLE :
lib::Worker::CompatMode::MYSQL);
for (ObTableSchema::const_constraint_iterator iter = table_schema->constraint_begin();
OB_SUCC(ret) && (iter != table_schema->constraint_end());
++iter) {
if (CONSTRAINT_TYPE_CHECK == (*iter)->get_constraint_type()) {
if ((*iter)->get_column_cnt() > 0) {
is_need_to_revise = false; // The csts in this table have been revised already.
break;
} else {
ObConstraint cst;
const ParseNode *node = NULL;
ObRawExpr *check_constraint_expr = NULL;
if (OB_FAIL(cst.assign(**iter))) {
LOG_WARN("fail to assign ObConstraint", K(ret));
} else if (OB_FAIL(ObRawExprUtils::parse_bool_expr_node_from_str(
cst.get_check_expr_str(), *params.allocator_, node))) {
LOG_WARN("parse expr node from string failed", K(ret), K(cst));
} else if (OB_FAIL(ObResolverUtils::resolve_check_constraint_expr(
params, node, *table_schema, cst, check_constraint_expr))) {
LOG_WARN("resolve check constraint expr", K(ret), K(cst));
} else if (0 == cst.get_column_cnt()) {
// no need to revise the cst if the const check expr likes '123 > 123'
continue;
} else if (OB_FAIL(arg.csts_array_.push_back(cst))) {
LOG_WARN("push back cst to csts failed", K(ret), K(cst));
}
}
}
}
if (OB_SUCC(ret) && 0 == arg.csts_array_.count()) {
// no need to revise if all the csts in this table are const check exprs
is_need_to_revise = false;
}
}
}
}
return ret;
}
// 对拥有 check 约束的表补充 check 约束的列信息
int ObUpgradeFor3100Processor::update_check_csts(
const uint64_t tenant_id,
ObSArray<uint64_t> &table_ids,
ObSArray<int64_t> &check_cst_counts_in_each_tbl,
share::schema::ObSchemaGetterGuard &schema_guard)
{
int ret = OB_SUCCESS;
ObArenaAllocator allocator("UpdateCheckCst");
const int64_t DEFAULT_TIMEOUT = 10 * 1000 * 1000L; // 10s
const int64_t TIMEOUT_PER_RPC = GCONF.rpc_timeout; // default rpc timeout is 2s
if (table_ids.count() != check_cst_counts_in_each_tbl.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the count of table_ids is not equal to the count of check_cst_counts_in_each_tbl",
K(ret), K(tenant_id), K(table_ids.count()), K(check_cst_counts_in_each_tbl.count()));
} else {
bool need_to_revise = true; // to guarantee reentrant
for (int64_t i = 0; OB_SUCC(ret) && i < table_ids.count(); ++i) {
obrpc::ObSchemaReviseArg arg;
need_to_revise = true;
arg.exec_tenant_id_ = tenant_id;
arg.tenant_id_ = tenant_id;
arg.table_id_ = table_ids.at(i);
arg.type_ = obrpc::ObSchemaReviseArg::SchemaReviseType::REVISE_CONSTRAINT_COLUMN_INFO;
arg.csts_array_.reset();
if (OB_FAIL(arg.csts_array_.reserve(check_cst_counts_in_each_tbl.at(i)))) {
LOG_WARN("reserve space for csts failed",
K(ret), K(table_ids.at(i)), K(check_cst_counts_in_each_tbl.at(i)));
} else if (OB_FAIL(generate_constraint_schema(arg, schema_guard, allocator, need_to_revise))) {
LOG_WARN("generate constraint schemas failed", K(ret), K(arg));
} else if (need_to_revise) {
if (arg.csts_array_.count() != check_cst_counts_in_each_tbl.at(i)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the count of csts in arg is not equal to check_cst_count_in_each_tbl",
K(ret), K(tenant_id), K(table_ids.at(i)),
K(arg.csts_array_.count()), K(check_cst_counts_in_each_tbl.at(i)));
} else if (OB_ISNULL(rpc_proxy_)) {
ret = OB_NOT_INIT;
LOG_WARN("rpc proxy is not inited");
} else if (OB_FAIL(rpc_proxy_->timeout(max(DEFAULT_TIMEOUT, TIMEOUT_PER_RPC)).schema_revise(arg))) {
LOG_WARN("schema revise failed", K(ret), K(arg));
}
}
}
}
return ret;
}
/* =========== 3100 upgrade processor end ============= */
/* =========== 3102 upgrade processor start ============= */
int ObUpgradeFor3102Processor::pre_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
}
return ret;
}
int ObUpgradeFor3102Processor::post_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(revise_not_null_cst_schema())) {
LOG_ERROR("fail to revise not null cst schema", KR(ret), K(get_tenant_id()));
}
return ret;
}
int ObUpgradeFor3102Processor::revise_not_null_cst_schema()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
ObSchemaGetterGuard schema_guard;
ObSArray<uint64_t> table_ids;
oceanbase::lib::Worker::CompatMode compat_mode;
if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_tenant_compat_mode(tenant_id, compat_mode))) {
LOG_WARN("get tenant compat mode failed", K(ret), K(tenant_id));
} else if (lib::Worker::CompatMode::MYSQL == compat_mode) {
// do nothing for mysql tenant
} else if (OB_FAIL(get_all_table_with_not_null_column(tenant_id, schema_guard, table_ids))) {
LOG_WARN("get not null column ids of each table failed",
K(ret), K(tenant_id), K(table_ids));
} else {
const int64_t DEFAULT_TIMEOUT = 10 * 1000 * 1000L; // 10s
const int64_t TIMEOUT_PER_RPC = GCONF.rpc_timeout; // default rpc timeout is 2s
for (int64_t i = 0; OB_SUCC(ret) && i < table_ids.count(); ++i) {
obrpc::ObSchemaReviseArg arg;
arg.exec_tenant_id_ = tenant_id;
arg.tenant_id_ = tenant_id;
arg.table_id_ = table_ids.at(i);
arg.type_ = obrpc::ObSchemaReviseArg::SchemaReviseType::REVISE_NOT_NULL_CONSTRAINT;
if (OB_FAIL(rpc_proxy_->timeout(max(DEFAULT_TIMEOUT, TIMEOUT_PER_RPC)).schema_revise(arg))) {
LOG_WARN("schema revise failed", K(ret), K(arg));
}
}
}
return ret;
}
int ObUpgradeFor3102Processor::get_all_table_with_not_null_column(
const uint64_t tenant_id,
ObSchemaGetterGuard &schema_guard,
ObSArray<uint64_t> &table_ids)
{
return OB_NOT_SUPPORTED;
}
/* =========== 3102 upgrade processor end ============= */
/* =========== 3200 upgrade processor start ============= */
int ObUpgradeFor3200Processor::pre_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
}
return ret;
}
int ObUpgradeFor3200Processor::post_upgrade()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
oceanbase::lib::Worker::CompatMode compat_mode;
ObSchemaGetterGuard schema_guard;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_tenant_compat_mode(tenant_id, compat_mode))) {
LOG_WARN("get tenant compat mode failed", K(ret), K(tenant_id));
} else if (OB_FAIL(grant_directory_privilege_for_dba_role(compat_mode, tenant_id))) {
LOG_WARN("fail to grant directory privilege for dba role", K(ret), K(compat_mode), K(tenant_id));
}
return ret;
}
int ObUpgradeFor3200Processor::grant_directory_privilege_for_dba_role(
const lib::Worker::CompatMode compat_mode,
const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (lib::Worker::CompatMode::ORACLE == compat_mode) {
// only grant privilege under oracle mode
ObSqlString sql;
int64_t affected_rows = 0;
OZ (sql.assign_fmt("grant create any directory, drop any directory to dba"));
CK (sql_proxy_ != NULL);
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows, ORACLE_MODE));
}
return ret;
}
/* =========== 3200 upgrade processor end ============= */
/* =========== 3201 upgrade processor start ============= */
int ObUpgradeFor3201Processor::pre_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
}
return ret;
}
int ObUpgradeFor3201Processor::post_upgrade()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
oceanbase::lib::Worker::CompatMode compat_mode;
ObSchemaGetterGuard schema_guard;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_tenant_compat_mode(tenant_id, compat_mode))) {
LOG_WARN("get tenant compat mode failed", K(ret), K(tenant_id));
} else if (OB_FAIL(init_tenant_optstat_global_prefs(compat_mode, tenant_id))) {
LOG_WARN("failed to init tenant optstat global prefs", K(ret), K(compat_mode), K(tenant_id));
}
return ret;
}
int ObUpgradeFor3201Processor::init_tenant_optstat_global_prefs(
const lib::Worker::CompatMode compat_mode,
const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (lib::Worker::CompatMode::ORACLE == compat_mode) {
ObSqlString raw_sql;
int64_t affected_rows = 0;
if (OB_FAIL(ObDbmsStatsPreferences::gen_init_global_prefs_sql(raw_sql))) {
LOG_WARN("failed gen init global prefs sql", K(ret), K(raw_sql));
} else if (OB_UNLIKELY(raw_sql.empty()) || OB_ISNULL(sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret), K(raw_sql), K(sql_proxy_));
} else if (OB_FAIL(sql_proxy_->write(tenant_id, raw_sql.ptr(), affected_rows))) {
LOG_WARN("failed to exec sql", K(ret), K(raw_sql));
}
}
return ret;
}
/* =========== 3201 upgrade processor end ============= */
/* =========== 4000 upgrade processor start ============= */
int ObUpgradeFor4000Processor::pre_upgrade()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
}
return ret;
}
int ObUpgradeFor4000Processor::post_upgrade()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = get_tenant_id();
oceanbase::lib::Worker::CompatMode compat_mode;
ObSchemaGetterGuard schema_guard;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_tenant_compat_mode(tenant_id, compat_mode))) {
LOG_WARN("get tenant compat mode failed", K(ret), K(tenant_id));
} else if (OB_FAIL(grant_debug_privilege_for_dba_role(compat_mode, tenant_id))) {
LOG_WARN("fail to grant directory privilege for dba role", K(ret), K(compat_mode), K(tenant_id));
} else if (OB_FAIL(grant_context_privilege_for_dba_role(compat_mode, tenant_id))) {
LOG_WARN("fail to grant context privilege for dba role", K(ret), K(compat_mode), K(tenant_id));
}
return ret;
}
int ObUpgradeFor4000Processor::grant_debug_privilege_for_dba_role(
const lib::Worker::CompatMode compat_mode,
const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (lib::Worker::CompatMode::ORACLE == compat_mode) {
// only grant privilege under oracle mode
ObSqlString sql;
int64_t affected_rows = 0;
OZ (sql.assign_fmt("grant debug connect session, debug any procedure to dba"));
CK (sql_proxy_ != NULL);
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows, ORACLE_MODE));
}
return ret;
}
int ObUpgradeFor4000Processor::grant_context_privilege_for_dba_role(
const lib::Worker::CompatMode compat_mode,
const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (lib::Worker::CompatMode::ORACLE == compat_mode) {
// only grant privilege under oracle mode
ObSqlString sql;
int64_t affected_rows = 0;
OZ (sql.assign_fmt("grant create any context, drop any context to dba"));
CK (sql_proxy_ != NULL);
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows, ORACLE_MODE));
}
return ret;
}
/* =========== 4000 upgrade processor end ============= */
/* =========== upgrade processor end ============= */
/* =========== special upgrade processor end ============= */
} // end share
} // end oceanbase

View File

@ -71,7 +71,7 @@ private:
/* =========== upgrade processor ============= */
// Special upgrade actions for specific cluster version,
// Special upgrade actions for specific data version,
// which should be stateless and reentrant.
class ObBaseUpgradeProcessor
{
@ -85,24 +85,24 @@ public:
ObBaseUpgradeProcessor();
virtual ~ObBaseUpgradeProcessor() {};
public:
int init(int64_t cluster_version,
int init(int64_t data_version,
UpgradeMode mode,
common::ObMySQLProxy &sql_proxy,
obrpc::ObSrvRpcProxy &rpc_proxy,
obrpc::ObCommonRpcProxy &common_proxy,
share::schema::ObMultiVersionSchemaService &schema_service,
share::ObCheckStopProvider &check_server_provider);
int64_t get_version() const { return cluster_version_; }
int64_t get_version() const { return data_version_; }
void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; }
int64_t get_tenant_id() const { return tenant_id_; }
virtual int pre_upgrade() = 0;
virtual int post_upgrade() = 0;
TO_STRING_KV(K_(inited), K_(cluster_version), K_(tenant_id), K_(mode));
TO_STRING_KV(K_(inited), K_(data_version), K_(tenant_id), K_(mode));
protected:
virtual int check_inner_stat() const;
protected:
bool inited_;
int64_t cluster_version_;
int64_t data_version_;
uint64_t tenant_id_;
UpgradeMode mode_;
common::ObMySQLProxy *sql_proxy_;
@ -156,157 +156,23 @@ public: \
};
/*
* NOTE: The Following code should be modified when CLUSTER_CURRENT_VERSION changed.
* 1. ObUpgradeChecker: CLUTER_VERSION_NUM, UPGRADE_PATH
* 2. Implement new ObUpgradeProcessor by cluster version.
* NOTE: The Following code should be modified when DATA_CURRENT_VERSION changed.
* 1. ObUpgradeChecker: DATA_VERSION_NUM, UPGRADE_PATH
* 2. Implement new ObUpgradeProcessor by data_version.
* 3. Modify int ObUpgradeProcesserSet::init().
*/
class ObUpgradeChecker
{
public:
static bool check_cluster_version_exist(const uint64_t version);
static bool check_data_version_exist(const uint64_t version);
public:
static const int64_t CLUTER_VERSION_NUM = 40;
static const uint64_t UPGRADE_PATH[CLUTER_VERSION_NUM];
static const int64_t DATA_VERSION_NUM = 1;
static const uint64_t UPGRADE_PATH[DATA_VERSION_NUM];
};
// 2.2.60
DEF_SIMPLE_UPGRARD_PROCESSER(2, 2, 0, 60);
// 2.2.70
class ObUpgradeFor22070Processor : public ObBaseUpgradeProcessor
{
public:
ObUpgradeFor22070Processor() : ObBaseUpgradeProcessor() {}
virtual ~ObUpgradeFor22070Processor() {}
virtual int pre_upgrade() override;
virtual int post_upgrade() override;
private:
int modify_trigger_package_source_body();
int modify_oracle_public_database_name();
int rebuild_subpart_partition_gc_info();
};
// 2.2.71
DEF_SIMPLE_UPGRARD_PROCESSER(2, 2, 0, 71);
// 2.2.72
DEF_SIMPLE_UPGRARD_PROCESSER(2, 2, 0, 72);
// 2.2.73
DEF_SIMPLE_UPGRARD_PROCESSER(2, 2, 0, 73);
// 2.2.74
DEF_SIMPLE_UPGRARD_PROCESSER(2, 2, 0, 74);
class ObUpgradeFor22075Processor : public ObBaseUpgradeProcessor
{
public:
ObUpgradeFor22075Processor() : ObBaseUpgradeProcessor() {}
virtual ~ObUpgradeFor22075Processor() {}
virtual int pre_upgrade() override { return common::OB_SUCCESS; }
virtual int post_upgrade() override;
};
// 2.2.76
DEF_SIMPLE_UPGRARD_PROCESSER(2, 2, 0, 76);
// 2.2.77
class ObUpgradeFor22077Processor : public ObBaseUpgradeProcessor
{
public:
ObUpgradeFor22077Processor() : ObBaseUpgradeProcessor() {}
virtual ~ObUpgradeFor22077Processor() {}
virtual int pre_upgrade() override { return common::OB_SUCCESS; };
virtual int post_upgrade() override;
private:
int create_inner_keystore_for_sys_tenant();
int alter_default_profile();
};
// 3.1.0
class ObUpgradeFor3100Processor : public ObBaseUpgradeProcessor
{
public:
ObUpgradeFor3100Processor() : ObBaseUpgradeProcessor() {}
virtual ~ObUpgradeFor3100Processor() {}
virtual int pre_upgrade() override;
virtual int post_upgrade() override;
private:
int compute_check_cst_counts_in_each_table(
const uint64_t tenant_id,
ObSArray<uint64_t> &table_ids,
ObSArray<int64_t> &check_cst_counts_in_each_table);
int generate_constraint_schema(
obrpc::ObSchemaReviseArg &arg,
share::schema::ObSchemaGetterGuard &schema_guard,
ObIAllocator &allocator,
bool &is_need_to_revise);
int update_check_csts(
const uint64_t tenant_id,
ObSArray<uint64_t> &table_ids,
ObSArray<int64_t> &check_cst_counts_in_each_tbl,
share::schema::ObSchemaGetterGuard &schema_guard);
int revise_check_cst_schema();
};
DEF_SIMPLE_UPGRARD_PROCESSER(3, 1, 0, 1);
// 3.1.2
class ObUpgradeFor3102Processor : public ObBaseUpgradeProcessor
{
public:
ObUpgradeFor3102Processor() : ObBaseUpgradeProcessor() {}
virtual ~ObUpgradeFor3102Processor() {}
virtual int pre_upgrade() override;
virtual int post_upgrade() override;
private:
int revise_not_null_cst_schema();
int get_all_table_with_not_null_column(
const uint64_t tenant_id,
schema::ObSchemaGetterGuard &schema_guard,
ObSArray<uint64_t> &table_ids);
};
// 3.2.0
class ObUpgradeFor3200Processor : public ObBaseUpgradeProcessor
{
public:
ObUpgradeFor3200Processor() : ObBaseUpgradeProcessor() {}
virtual ~ObUpgradeFor3200Processor() {}
virtual int pre_upgrade() override;
virtual int post_upgrade() override;
private:
int grant_directory_privilege_for_dba_role(
const lib::Worker::CompatMode compat_mode,
const uint64_t tenant_id);
};
// 3.2.1
class ObUpgradeFor3201Processor : public ObBaseUpgradeProcessor
{
public:
ObUpgradeFor3201Processor() : ObBaseUpgradeProcessor() {}
virtual ~ObUpgradeFor3201Processor() {}
virtual int pre_upgrade() override;
virtual int post_upgrade() override;
private:
int init_tenant_optstat_global_prefs(
const lib::Worker::CompatMode compat_mode,
const uint64_t tenant_id);
};
DEF_SIMPLE_UPGRARD_PROCESSER(3, 2, 0, 2);
DEF_SIMPLE_UPGRARD_PROCESSER(3, 2, 3, 0);
// 4.0.0.0
class ObUpgradeFor4000Processor : public ObBaseUpgradeProcessor
{
public:
ObUpgradeFor4000Processor() : ObBaseUpgradeProcessor() {}
virtual ~ObUpgradeFor4000Processor() {}
virtual int pre_upgrade() override;
virtual int post_upgrade() override;
private:
int grant_debug_privilege_for_dba_role(
const lib::Worker::CompatMode compat_mode,
const uint64_t tenant_id);
int grant_context_privilege_for_dba_role(
const lib::Worker::CompatMode compat_mode,
const uint64_t tenant_id);
};
/* =========== special upgrade processor start ============= */
DEF_SIMPLE_UPGRARD_PROCESSER(4, 1, 0, 0)
/* =========== special upgrade processor end ============= */
/* =========== upgrade processor end ============= */

View File

@ -457,7 +457,9 @@ DEF_TIME(tablet_meta_table_check_interval, OB_CLUSTER_PARAMETER, "30m", "[1m,)",
"the time interval that observer compares tablet meta table with local ls replica info "
"and make adjustments to ensure the correctness of tablet meta table. Range: [1m,+∞)",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_STR(min_observer_version, OB_CLUSTER_PARAMETER, "4.0.0.0", "the min observer version",
DEF_STR(min_observer_version, OB_CLUSTER_PARAMETER, "4.1.0.0", "the min observer version",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_STR(compatible, OB_TENANT_PARAMETER, "4.1.0.0", "compatible version for persisted data",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(enable_ddl, OB_CLUSTER_PARAMETER, "True", "specifies whether DDL operation is turned on. "
"Value: True:turned on; False: turned off",

View File

@ -16,9 +16,9 @@
#include <string.h>
#include "lib/oblog/ob_log.h"
#include "objit/common/ob_item_type.h"
#include "sql/session/ob_sql_session_info.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/session/ob_sql_session_info.h"
#include "sql/engine/ob_exec_context.h"
using namespace oceanbase::common;
namespace oceanbase
@ -117,9 +117,9 @@ int ObExprToOutfileRow::calc_outfile_info(const ObExpr &expr,
{
int ret = OB_SUCCESS;
ObObj objs_array[PARAM_SELECT_ITEM];
ObSQLSessionInfo *session = ctx.exec_ctx_.get_my_session();
if (OB_ISNULL(session)) {
ret = OB_ERR_UNEXPECTED;
ObSQLSessionInfo *session = ctx.exec_ctx_.get_my_session();
if (OB_ISNULL(session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else if (OB_ISNULL(out_info.print_params_.tz_info_ = session->get_timezone_info())) {
ret = OB_ERR_UNEXPECTED;
@ -133,9 +133,9 @@ int ObExprToOutfileRow::calc_outfile_info(const ObExpr &expr,
for (int i = 0; OB_SUCC(ret) && i < PARAM_SELECT_ITEM; ++i) {
OZ(expr.locate_param_datum(ctx, i).to_obj(objs_array[i], expr.args_[i]->obj_meta_,
expr.args_[i]->obj_datum_map_));
}
if (OB_SUCC(ret)) {
out_info.field_ = objs_array[PARAM_FIELD];
}
if (OB_SUCC(ret)) {
out_info.field_ = objs_array[PARAM_FIELD];
out_info.line_ = objs_array[PARAM_LINE];
out_info.enclose_ = objs_array[PARAM_ENCLOSED];
out_info.escape_ = objs_array[PARAM_ESCAPED];

View File

@ -40,6 +40,7 @@ cluster_id
compaction_high_thread_score
compaction_low_thread_score
compaction_mid_thread_score
compatible
config_additional_dir
connection_control_failed_connections_threshold
connection_control_max_connection_delay

View File

@ -0,0 +1,30 @@
# 描述oceanbase各个版本升级依赖关系
# 参考https://yuque.antfin-inc.com/ob/release/ptw5y7
# 对于每一个正式发布的ob版本,在下面的yaml文档中添加一项,包括如下属性:
# * version: 待升级的版本,或者升级过程中经过的版本;一般用A.B.C的版本号,如果是A.B.C-D的形式,表示build号必须大于等于D
# * can_be_upgraded_to: 表示该版本经过OB QA兼容性测试,可以*直接*升级到的ob版本号,是一个列表可以包括多个版本
# * deprecated: 缺省为False。如果为True,表示这个版本已经废除。可以作
# 为升级的起点,可以作为升级过度版本,但是不能作为升级目标版本。
# * require_from_binary: 缺省为False。如果为True,表示升级过程中如果作为中间版本,除了运行upgrade脚本,还需要把observer也升级到该版本
#
# OCP的OB升级模块会读取本文件,给定升级的起始版本和目标版本,自动在满
# 足上述约束的情况下寻找一个升级的最短路径。基本算法是:
# 基于如下描述构建一个图,每个版本一个结点,can_be_upgraded_to关系定义
# 一条边,寻找起始版本和升级目标两个版本之间的最短路径。除了起始点,其
# 他节点不能是deprecated。如果有A.B.C和A.B.C-D两种节点匹配,优先选择后
# 者。
#
- version: 4.0.0.0
can_be_upgraded_to:
- 4.1.0.0
require_from_binary:
value: True
when_come_from: [4.0.0.0]
- version: 4.1.0.0
require_from_binary:
value: True
when_come_from: [4.0.0.0, 4.1.0.0]

View File

@ -12,7 +12,7 @@ import time
class UpgradeParams:
log_filename = 'upgrade_post_checker.log'
new_version = '4.0.0.0'
new_version = '4.1.0.0'
#### --------------start : my_error.py --------------
class MyError(Exception):
def __init__(self, value):