[CP] [RECYCLE_SCHEMA_HISTORY] Support recycling object privilege history

This commit is contained in:
obdev 2022-09-19 08:02:30 +00:00 committed by wangzelin.wzl
parent 46a5748aba
commit fbabe9d0f1
2 changed files with 491 additions and 98 deletions

View File

@ -802,118 +802,184 @@ int ObSchemaHistoryRecycler::try_recycle_schema_history(const uint64_t tenant_id
int ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
LOG_WARN("not inited", KR(ret));
} else if (OB_INVALID_TENANT_ID == tenant_id || OB_SYS_TENANT_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", K(ret), K(tenant_id));
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else if (!is_valid_recycle_schema_version(recycle_schema_version)) {
ret = OB_EAGAIN;
LOG_WARN("fail to get recycle schema version", K(ret), K(tenant_id), K(recycle_schema_version));
LOG_WARN("fail to get recycle schema version", KR(ret), K(tenant_id), K(recycle_schema_version));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("schema history recycler is stopped", K(ret));
LOG_WARN("schema history recycler is stopped", KR(ret));
} else {
int64_t start_ts = ObTimeUtility::current_time();
LOG_INFO(
"[SCHEMA_RECYCLE] recycle schema history by tenant start", K(ret), K(tenant_id), K(recycle_schema_version));
#define RECYCLE_FIRST_SCHEMA(RECYCLE_MODE, SCHEMA_TYPE, TNAME, KEY_NAME) \
if (OB_SUCC(ret)) { \
ObRecycleSchemaExecutor::RecycleMode mode = ObRecycleSchemaExecutor::RECYCLE_MODE; \
ObRecycleSchemaExecutor executor(tenant_id, recycle_schema_version, TNAME, #KEY_NAME, mode, sql_proxy_, this); \
if (OB_FAIL(executor.execute())) { \
LOG_WARN( \
"fail to recycle schema history", K(ret), "type", #SCHEMA_TYPE, K(tenant_id), K(recycle_schema_version)); \
} \
"[SCHEMA_RECYCLE] recycle schema history by tenant start", KR(ret), K(tenant_id), K(recycle_schema_version));
#define RECYCLE_FIRST_SCHEMA(RECYCLE_MODE, SCHEMA_TYPE, TNAME, KEY_NAME) \
if (OB_SUCC(ret)) { \
ObRecycleSchemaExecutor::RecycleMode mode = ObRecycleSchemaExecutor::RECYCLE_MODE; \
ObRecycleSchemaExecutor executor(tenant_id, recycle_schema_version, TNAME, #KEY_NAME, mode, sql_proxy_, this); \
if (OB_FAIL(executor.execute())) { \
LOG_WARN( \
"fail to recycle schema history", KR(ret), "type", #SCHEMA_TYPE, K(tenant_id), K(recycle_schema_version)); \
} \
}
#define RECYCLE_SECOND_SCHEMA(SCHEMA_TYPE, TNAME, KEY_NAME, SECOND_KEY_NAME) \
if (OB_SUCC(ret)) { \
ObSecondRecycleSchemaExecutor executor( \
tenant_id, recycle_schema_version, TNAME, #KEY_NAME, #SECOND_KEY_NAME, sql_proxy_, this); \
if (OB_FAIL(executor.execute())) { \
LOG_WARN( \
"fail to recycle schema history", K(ret), "type", #SCHEMA_TYPE, K(tenant_id), K(recycle_schema_version)); \
} \
#define RECYCLE_SECOND_SCHEMA(SCHEMA_TYPE, TNAME, KEY_NAME, SECOND_KEY_NAME) \
if (OB_SUCC(ret)) { \
ObSecondRecycleSchemaExecutor executor( \
tenant_id, recycle_schema_version, TNAME, #KEY_NAME, #SECOND_KEY_NAME, sql_proxy_, this); \
if (OB_FAIL(executor.execute())) { \
LOG_WARN( \
"fail to recycle schema history", KR(ret), "type", #SCHEMA_TYPE, K(tenant_id), K(recycle_schema_version)); \
} \
}
#define RECYCLE_THIRD_SCHEMA(SCHEMA_TYPE, TNAME, KEY_NAME, SECOND_KEY_NAME, THIRD_KEY_NAME) \
if (OB_SUCC(ret)) { \
ObThirdRecycleSchemaExecutor executor( \
tenant_id, recycle_schema_version, TNAME, #KEY_NAME, #SECOND_KEY_NAME, #THIRD_KEY_NAME, sql_proxy_, this); \
if (OB_FAIL(executor.execute())) { \
LOG_WARN( \
"fail to recycle schema history", K(ret), "type", #SCHEMA_TYPE, K(tenant_id), K(recycle_schema_version)); \
} \
#define RECYCLE_THIRD_SCHEMA(SCHEMA_TYPE, TNAME, KEY_NAME, SECOND_KEY_NAME, THIRD_KEY_NAME) \
if (OB_SUCC(ret)) { \
ObThirdRecycleSchemaExecutor executor( \
tenant_id, recycle_schema_version, TNAME, #KEY_NAME, #SECOND_KEY_NAME, #THIRD_KEY_NAME, sql_proxy_, this); \
if (OB_FAIL(executor.execute())) { \
LOG_WARN( \
"fail to recycle schema history", KR(ret), "type", #SCHEMA_TYPE, K(tenant_id), K(recycle_schema_version)); \
} \
}
// TODO:() implement later
// 1.table_priv, databse_priv, udf
// 2.tenant
// 3.__all_ddl_operation
// 4.sys/obj priv
// TODO:(yanmu.ztl) implement later
// 1.tenant
// 2.__all_ddl_operation
// 3.table_priv/databse_priv
// 4.udf
// 5.audit
// 6.dblink
// 7.user-role
// 8.PL
// 9.ols
// table related
// ---------------------------- table related ------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_ONLY, table, OB_ALL_TABLE_V2_HISTORY_TNAME, table_id);
RECYCLE_SECOND_SCHEMA(column, OB_ALL_COLUMN_HISTORY_TNAME, table_id, column_id);
RECYCLE_FIRST_SCHEMA(RECYCLE_ONLY, part_info, OB_ALL_PART_INFO_HISTORY_TNAME, table_id);
RECYCLE_SECOND_SCHEMA(part, OB_ALL_PART_HISTORY_TNAME, table_id, part_id);
RECYCLE_SECOND_SCHEMA(def_sub_part, OB_ALL_DEF_SUB_PART_HISTORY_TNAME, table_id, sub_part_id);
RECYCLE_THIRD_SCHEMA(sub_part, OB_ALL_SUB_PART_HISTORY_TNAME, table_id, part_id, sub_part_id);
RECYCLE_SECOND_SCHEMA(constraint, OB_ALL_CONSTRAINT_HISTORY_TNAME, table_id, constraint_id);
RECYCLE_FIRST_SCHEMA(RECYCLE_ONLY, foreign_key, OB_ALL_FOREIGN_KEY_HISTORY_TNAME, foreign_key_id);
RECYCLE_THIRD_SCHEMA(
foreign_key, OB_ALL_FOREIGN_KEY_COLUMN_HISTORY_TNAME, foreign_key_id, child_column_id, parent_column_id);
ret = OB_SUCCESS; // overwrite ret
// system variable
ObSystemVariableRecycleSchemaExecutor executor(
tenant_id, recycle_schema_version, OB_ALL_SYS_VARIABLE_HISTORY_TNAME, sql_proxy_, this);
if (OB_FAIL(executor.execute())) { // overwrite ret
LOG_WARN(
"fail to recycle schema history", K(ret), "type", "system_variable", K(tenant_id), K(recycle_schema_version));
}
// tablegroup
RECYCLE_FIRST_SCHEMA(RECYCLE_ONLY, tablegroup, OB_ALL_TABLEGROUP_HISTORY_TNAME, tablegroup_id);
ret = OB_SUCCESS; // overwrite ret
// database
// ----------------------------- database ----------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, database, OB_ALL_DATABASE_HISTORY_TNAME, database_id);
ret = OB_SUCCESS; // overwrite ret
// user
// ----------------------------- tablegroup --------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_ONLY, tablegroup, OB_ALL_TABLEGROUP_HISTORY_TNAME, tablegroup_id);
// tablegroup's partition will be recycled as table's partition
ret = OB_SUCCESS; // overwrite ret
// ----------------------------- user/role ---------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, user, OB_ALL_USER_HISTORY_TNAME, user_id);
// TODO: should be tested
// RECYCLE_SECOND_SCHEMA(role_grantee, OB_ALL_TENANT_ROLE_GRANTEE_MAP_HISTORY_TNAME, grantee_id, role_id);
ret = OB_SUCCESS; // overwrite ret
// synoym
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, synoym, OB_ALL_SYNONYM_HISTORY_TNAME, synonym_id);
ret = OB_SUCCESS; // overwrite ret
// outline
// ---------------------------- outline ------------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, outline, OB_ALL_OUTLINE_HISTORY_TNAME, outline_id);
ret = OB_SUCCESS; // overwrite ret
// plan_baseline
// ---------------------------- synonym ------------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, synonym, OB_ALL_SYNONYM_HISTORY_TNAME, synonym_id);
ret = OB_SUCCESS; // overwrite ret
// ---------------------------- plan_baseline -----------------------------------
RECYCLE_FIRST_SCHEMA(
RECYCLE_AND_COMPRESS, plan_baseline, OB_ALL_TENANT_PLAN_BASELINE_HISTORY_TNAME, plan_baseline_id);
ret = OB_SUCCESS; // overwrite ret
// sequence
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, sequence, OB_ALL_SEQUENCE_OBJECT_HISTORY_TNAME, sequence_id);
ret = OB_SUCCESS; // overwrite ret
// profile
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, profile, OB_ALL_TENANT_PROFILE_HISTORY_TNAME, profile_id);
ret = OB_SUCCESS; // overwrite ret
/* TODO (): recycle schema history of package, routine, udt and label should be supported after test.
// package
// ---------------------------- package/routine ---------------------------------
/* TODO(yanmu.ztl): package、routine、udt should be tested.
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, package, OB_ALL_PACKAGE_HISTORY_TNAME, package_id);
ret = OB_SUCCESS; // overwrite ret
// routine
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, routine, OB_ALL_ROUTINE_HISTORY_TNAME, routine_id);
RECYCLE_SECOND_SCHEMA(routine, OB_ALL_ROUTINE_PARAM_HISTORY_TNAME, routine_id, sequence);
ret = OB_SUCCESS; // overwrite ret
// udt related
// --------------------------- udt related -------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, udt, OB_ALL_TYPE_HISTORY_TNAME, type_id);
RECYCLE_SECOND_SCHEMA(udt, OB_ALL_TYPE_ATTR_HISTORY_TNAME, type_id, attribute);
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, udt, OB_ALL_COLL_TYPE_HISTORY_TNAME, coll_type_id);
// TODO: () udt has additional schema(object_type) to be recycled after ver 3.x.
RECYCLE_SECOND_SCHEMA(udt, OB_ALL_TENANT_OBJECT_TYPE_TNAME, object_type_id, type);
ret = OB_SUCCESS; // overwrite ret
*/
// -------------------------- sequence -----------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, sequence, OB_ALL_SEQUENCE_OBJECT_HISTORY_TNAME, sequence_id);
ret = OB_SUCCESS; // overwrite ret
// -------------------------- system variable ----------------------------------
// global variable only supports system variable,
// so system variable schema history is only compressed but not recycled.
{
ObSystemVariableRecycleSchemaExecutor executor(
tenant_id, recycle_schema_version, OB_ALL_SYS_VARIABLE_HISTORY_TNAME, sql_proxy_, this);
if (OB_FAIL(executor.execute())) { // overwrite ret
LOG_WARN("fail to recycle schema history",
KR(ret),
"type",
"system_variable",
K(tenant_id),
K(recycle_schema_version));
}
ret = OB_SUCCESS;
}
// keystore can't be recycled
// RECYCLE_FIRST_SCHEMA(NONE, keystore, OB_ALL_TENANT_KEYSTORE_HISTORY_TNAME, keystore_id);
// ---------------------------- ols ----------------------------------------------------
/*
// TODO(yanmu.ztl): should be tested
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, label_se_policy, OB_ALL_TENANT_OLS_POLICY_HISTORY_TNAME,
label_se_policy_id);
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, label_se_component, OB_ALL_TENANT_OLS_COMPONENT_HISTORY_TNAME,
label_se_component_id);
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, label_se_label, OB_ALL_TENANT_OLS_LABEL_HISTORY_TNAME,
label_se_label_id);
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, label_se_user_level, OB_ALL_TENANT_OLS_USER_LEVEL_HISTORY_TNAME,
label_se_user_level_id);
ret = OB_SUCCESS; // overwrite ret
*/
// ---------------------------- tablespace --------------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, tablespace, OB_ALL_TENANT_TABLESPACE_HISTORY_TNAME, tablespace_id);
ret = OB_SUCCESS; // overwrite ret
// ---------------------------- trigger ----------------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, trigger, OB_ALL_TENANT_TRIGGER_HISTORY_TNAME, trigger_id);
ret = OB_SUCCESS; // overwrite ret
// --------------------------- profile -----------------------------------------------
RECYCLE_FIRST_SCHEMA(RECYCLE_AND_COMPRESS, profile, OB_ALL_TENANT_PROFILE_HISTORY_TNAME, profile_id);
ret = OB_SUCCESS; // overwrite ret
// -------------------------- object priv --------------------------------------------
// (RECYCLE_AND_COMPRESS)
// 1. sys object priv history won't be recycle.
// 2. user object priv history will be recycled and compressed.
{
ObObjectPrivRecycleSchemaExecutor executor(
tenant_id, recycle_schema_version, OB_ALL_TENANT_OBJAUTH_HISTORY_TNAME, sql_proxy_, this);
if (OB_FAIL(executor.execute())) { // overwrite ret
LOG_WARN(
"fail to recycle schema history", KR(ret), "type", "object_priv", K(tenant_id), K(recycle_schema_version));
}
ret = OB_SUCCESS;
}
#undef RECYCLE_FIRST_SCHEMA
int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
ROOTSERVICE_EVENT_ADD("schema_recycler",
@ -925,7 +991,7 @@ int ObSchemaHistoryRecycler::try_recycle_schema_history(const uint64_t tenant_id
"cost",
cost_ts);
LOG_INFO("[SCHEMA_RECYCLE] recycle schema history by tenant end",
K(ret),
KR(ret),
K(tenant_id),
K(recycle_schema_version),
K(cost_ts));
@ -1391,6 +1457,7 @@ DEFINE_FILL_SCHEMA_HISTORY_MAP(ObRecycleSchemaExecutor, ObFirstSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_MAP(ObSecondRecycleSchemaExecutor, ObSecondSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_MAP(ObThirdRecycleSchemaExecutor, ObThirdSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_MAP(ObSystemVariableRecycleSchemaExecutor, ObSystemVariableSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_MAP(ObObjectPrivRecycleSchemaExecutor, ObObjectPrivSchemaKey);
#undef DEFINE_FILL_SCHEMA_HISTORY_MAP
#define DEFINE_FILL_SCHEMA_HISTORY_KEY(EXECUTOR, KEY) \
@ -1403,6 +1470,7 @@ DEFINE_FILL_SCHEMA_HISTORY_MAP(ObSystemVariableRecycleSchemaExecutor, ObSystemVa
DEFINE_FILL_SCHEMA_HISTORY_KEY(ObRecycleSchemaExecutor, ObFirstSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_KEY(ObSecondRecycleSchemaExecutor, ObSecondSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_KEY(ObThirdRecycleSchemaExecutor, ObThirdSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_KEY(ObObjectPrivRecycleSchemaExecutor, ObObjectPrivSchemaKey);
#undef DEFINE_FILL_SCHEMA_HISTORY_KEY
#define DEFINE_FILL_SCHEMA_HISTORY_FUNC(EXECUTOR, KEY) \
@ -1442,6 +1510,7 @@ DEFINE_FILL_SCHEMA_HISTORY_FUNC(ObRecycleSchemaExecutor, ObFirstSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_FUNC(ObSecondRecycleSchemaExecutor, ObSecondSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_FUNC(ObThirdRecycleSchemaExecutor, ObThirdSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_FUNC(ObSystemVariableRecycleSchemaExecutor, ObSystemVariableSchemaKey);
DEFINE_FILL_SCHEMA_HISTORY_FUNC(ObObjectPrivRecycleSchemaExecutor, ObObjectPrivSchemaKey);
#undef DEFINE_FILL_SCHEMA_HISTORY_FUNC
#define DEFINE_RECYCLE_SCHEMA_HISTORY(EXECUTOR, KEY) \
@ -1506,6 +1575,7 @@ DEFINE_FILL_SCHEMA_HISTORY_FUNC(ObSystemVariableRecycleSchemaExecutor, ObSystemV
DEFINE_RECYCLE_SCHEMA_HISTORY(ObRecycleSchemaExecutor, ObFirstSchemaKey)
DEFINE_RECYCLE_SCHEMA_HISTORY(ObSecondRecycleSchemaExecutor, ObSecondSchemaKey);
DEFINE_RECYCLE_SCHEMA_HISTORY(ObThirdRecycleSchemaExecutor, ObThirdSchemaKey);
DEFINE_RECYCLE_SCHEMA_HISTORY(ObObjectPrivRecycleSchemaExecutor, ObObjectPrivSchemaKey);
#undef DEFINE_RECYCLE_SCHEMA_HISTORY
int ObRecycleSchemaExecutor::gen_batch_recycle_schema_history_sql(
@ -1557,6 +1627,7 @@ int ObRecycleSchemaExecutor::gen_batch_recycle_schema_history_sql(
DEFINE_BATCH_RECYCLE_SCHEMA_HISTORY(ObRecycleSchemaExecutor, ObFirstSchemaKey)
DEFINE_BATCH_RECYCLE_SCHEMA_HISTORY(ObSecondRecycleSchemaExecutor, ObSecondSchemaKey);
DEFINE_BATCH_RECYCLE_SCHEMA_HISTORY(ObThirdRecycleSchemaExecutor, ObThirdSchemaKey);
DEFINE_BATCH_RECYCLE_SCHEMA_HISTORY(ObObjectPrivRecycleSchemaExecutor, ObObjectPrivSchemaKey);
#undef DEFINE_BATCH_RECYCLE_SCHEMA_HISTORY
int ObRecycleSchemaExecutor::gen_batch_compress_schema_history_sql(
@ -1658,6 +1729,8 @@ int ObRecycleSchemaExecutor::gen_batch_compress_schema_history_sql(
DEFINE_COMPRESS_SCHEMA_HISTORY(ObRecycleSchemaExecutor, ObFirstSchemaKey, ObFirstCompressSchemaInfo);
DEFINE_COMPRESS_SCHEMA_HISTORY(
ObSystemVariableRecycleSchemaExecutor, ObSystemVariableSchemaKey, ObSystemVariableCompressSchemaInfo);
DEFINE_COMPRESS_SCHEMA_HISTORY(
ObObjectPrivRecycleSchemaExecutor, ObObjectPrivSchemaKey, ObObjectPrivCompressSchemaInfo);
#undef DEFINE_COMPRESS_SCHEMA_HISTORY
#define BATCH_COMPRESS_SCHEMA_HISTORY(EXECUTOR, MINFO) \
@ -1680,6 +1753,7 @@ DEFINE_COMPRESS_SCHEMA_HISTORY(
}
BATCH_COMPRESS_SCHEMA_HISTORY(ObRecycleSchemaExecutor, ObFirstCompressSchemaInfo);
BATCH_COMPRESS_SCHEMA_HISTORY(ObSystemVariableRecycleSchemaExecutor, ObSystemVariableCompressSchemaInfo);
BATCH_COMPRESS_SCHEMA_HISTORY(ObObjectPrivRecycleSchemaExecutor, ObObjectPrivCompressSchemaInfo);
#undef BATCH_COMPRESS_SCHEMA_HISTORY
ObSecondRecycleSchemaExecutor::ObSecondRecycleSchemaExecutor(const uint64_t tenant_id, const int64_t schema_version,
@ -2094,5 +2168,238 @@ int ObSystemVariableRecycleSchemaExecutor::gen_batch_compress_schema_history_sql
}
return ret;
}
ObObjectPrivSchemaKey::ObObjectPrivSchemaKey()
: obj_id_(OB_INVALID_ID),
obj_type_(OB_INVALID_ID),
col_id_(OB_INVALID_ID),
grantor_id_(OB_INVALID_ID),
grantee_id_(OB_INVALID_ID),
priv_id_(OB_INVALID_ID)
{}
ObObjectPrivSchemaKey::~ObObjectPrivSchemaKey()
{}
bool ObObjectPrivSchemaKey::operator==(const ObObjectPrivSchemaKey &other) const
{
return obj_id_ == other.obj_id_ && obj_type_ == other.obj_type_ && col_id_ == other.col_id_ &&
grantor_id_ == other.grantor_id_ && grantee_id_ == other.grantee_id_ && priv_id_ == other.priv_id_;
}
bool ObObjectPrivSchemaKey::operator!=(const ObObjectPrivSchemaKey &other) const
{
return obj_id_ != other.obj_id_ || obj_type_ != other.obj_type_ || col_id_ != other.col_id_ ||
grantor_id_ != other.grantor_id_ || grantee_id_ != other.grantee_id_ || priv_id_ != other.priv_id_;
}
bool ObObjectPrivSchemaKey::operator<(const ObObjectPrivSchemaKey &other) const
{
bool bret = false;
if (obj_id_ != other.obj_id_) {
bret = obj_id_ < other.obj_id_;
} else if (obj_type_ != other.obj_type_) {
bret = obj_type_ < other.obj_type_;
} else if (col_id_ != other.col_id_) {
bret = col_id_ < other.col_id_;
} else if (grantor_id_ != other.grantor_id_) {
bret = grantor_id_ < other.grantor_id_;
} else if (grantee_id_ != other.grantee_id_) {
bret = grantee_id_ < other.grantee_id_;
} else if (priv_id_ != other.priv_id_) {
bret = priv_id_ < other.priv_id_;
} else {
bret = false;
}
return bret;
}
ObObjectPrivSchemaKey &ObObjectPrivSchemaKey::operator=(const ObObjectPrivSchemaKey &other)
{
assign(other);
return *this;
}
int ObObjectPrivSchemaKey::assign(const ObObjectPrivSchemaKey &other)
{
int ret = OB_SUCCESS;
if (this != &other) {
obj_id_ = other.obj_id_;
obj_type_ = other.obj_type_;
col_id_ = other.col_id_;
grantor_id_ = other.grantor_id_;
grantee_id_ = other.grantee_id_;
priv_id_ = other.priv_id_;
}
return ret;
}
void ObObjectPrivSchemaKey::reset()
{
obj_id_ = OB_INVALID_ID;
obj_type_ = OB_INVALID_ID;
col_id_ = OB_INVALID_ID;
grantor_id_ = OB_INVALID_ID;
grantee_id_ = OB_INVALID_ID;
priv_id_ = OB_INVALID_ID;
}
bool ObObjectPrivSchemaKey::is_valid() const
{
return obj_id_ != OB_INVALID_ID && obj_type_ != OB_INVALID_ID && col_id_ != OB_INVALID_ID &&
grantor_id_ != OB_INVALID_ID && grantee_id_ != OB_INVALID_ID && priv_id_ != OB_INVALID_ID;
}
uint64_t ObObjectPrivSchemaKey::hash() const
{
uint64_t hash_val = 0;
hash_val = murmurhash(&obj_id_, sizeof(obj_id_), hash_val);
hash_val = murmurhash(&obj_type_, sizeof(obj_type_), hash_val);
hash_val = murmurhash(&col_id_, sizeof(col_id_), hash_val);
hash_val = murmurhash(&grantor_id_, sizeof(grantor_id_), hash_val);
hash_val = murmurhash(&grantee_id_, sizeof(grantee_id_), hash_val);
hash_val = murmurhash(&priv_id_, sizeof(priv_id_), hash_val);
return hash_val;
}
ObObjectPrivRecycleSchemaExecutor::ObObjectPrivRecycleSchemaExecutor(const uint64_t tenant_id,
const int64_t schema_version, const char *table_name, common::ObMySQLProxy *sql_proxy,
ObSchemaHistoryRecycler *recycler)
: ObIRecycleSchemaExecutor(tenant_id, schema_version, table_name, sql_proxy, recycler), schema_history_map_()
{}
ObObjectPrivRecycleSchemaExecutor::~ObObjectPrivRecycleSchemaExecutor()
{}
bool ObObjectPrivRecycleSchemaExecutor::is_valid() const
{
bool bret = true;
if (OB_INVALID_TENANT_ID == tenant_id_ || OB_SYS_TENANT_ID == tenant_id_ ||
!ObSchemaService::is_formal_version(schema_version_) || OB_ISNULL(table_name_) || OB_ISNULL(sql_proxy_) ||
OB_ISNULL(recycler_)) {
bret = false;
LOG_WARN(
"invalid argument", K(bret), K_(tenant_id), K_(schema_version), KP_(table_name), KP_(sql_proxy), KP_(recycler));
}
return bret;
}
int ObObjectPrivRecycleSchemaExecutor::gen_fill_schema_history_sql(int64_t start_idx, ObSqlString &sql)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_stop())) {
LOG_WARN("schema history recycler is stopped", KR(ret));
} else if (OB_FAIL(sql.assign_fmt(
"select obj_id, objtype, col_id, grantor_id, grantee_id, priv_id, schema_version, is_deleted "
"from %s where tenant_id = 0 and schema_version <= %ld "
"order by obj_id, objtype, col_id, grantor_id, grantee_id, priv_id, schema_version "
"limit %ld, %ld",
table_name_,
schema_version_,
start_idx,
SCHEMA_HISTORY_BATCH_FETCH_NUM))) {
LOG_WARN("fail to assign sql", KR(ret), K_(tenant_id), K_(schema_version));
}
return ret;
}
int ObObjectPrivRecycleSchemaExecutor::retrieve_schema_history(
common::sqlclient::ObMySQLResult &result, ObObjectPrivSchemaKey &cur_key, ObRecycleSchemaValue &cur_value)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_stop())) {
LOG_WARN("schema history recycler is stopped", KR(ret));
} else {
EXTRACT_INT_FIELD_MYSQL(result, "obj_id", cur_key.obj_id_, int64_t);
EXTRACT_INT_FIELD_MYSQL(result, "objtype", cur_key.obj_type_, int64_t);
EXTRACT_INT_FIELD_MYSQL(result, "col_id", cur_key.col_id_, int64_t);
EXTRACT_INT_FIELD_MYSQL(result, "grantor_id", cur_key.grantor_id_, int64_t);
EXTRACT_INT_FIELD_MYSQL(result, "grantee_id", cur_key.grantee_id_, int64_t);
EXTRACT_INT_FIELD_MYSQL(result, "priv_id", cur_key.priv_id_, int64_t);
EXTRACT_INT_FIELD_MYSQL(result, "schema_version", cur_value.max_schema_version_, int64_t);
EXTRACT_INT_FIELD_MYSQL(result, "is_deleted", cur_value.is_deleted_, bool);
}
return ret;
}
int ObObjectPrivRecycleSchemaExecutor::gen_batch_recycle_schema_history_sql(
const common::ObIArray<ObObjectPrivSchemaKey> &dropped_schema_keys, common::ObSqlString &sql)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_stop())) {
LOG_WARN("schema history recycler is stopped", KR(ret));
} else if (dropped_schema_keys.count() <= 0) {
// skip
} else {
if (OB_FAIL(sql.assign_fmt(" delete from %s where schema_version <= %ld"
" and (tenant_id, obj_id, objtype, col_id, grantor_id, grantee_id, priv_id) in ( ",
table_name_,
schema_version_))) {
LOG_WARN("fail to assign sql", KR(ret), K_(tenant_id), K_(schema_version));
}
for (int64_t i = 0; OB_SUCC(ret) && i < dropped_schema_keys.count(); i++) {
const ObObjectPrivSchemaKey &key = dropped_schema_keys.at(i);
if (OB_FAIL(check_stop())) {
LOG_WARN("schema history recycler is stopped", KR(ret));
} else if (OB_FAIL(sql.append_fmt("%s (0, %ld, %ld, %ld, %ld, %ld, %ld)",
0 == i ? "" : ",",
key.obj_id_,
key.obj_type_,
key.col_id_,
key.grantor_id_,
key.grantee_id_,
key.priv_id_))) {
LOG_WARN("fail to append fmt", KR(ret), K(key));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(sql.append_fmt(")"))) {
LOG_WARN("fail to append fmt", KR(ret), K_(tenant_id), K_(schema_version));
}
}
return ret;
}
int ObObjectPrivRecycleSchemaExecutor::gen_batch_compress_schema_history_sql(
const ObIArray<ObObjectPrivCompressSchemaInfo> &compress_schema_infos, common::ObSqlString &sql)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_stop())) {
LOG_WARN("schema history recycler is stopped", KR(ret));
} else if (compress_schema_infos.count() <= 0) {
// skip
} else {
if (OB_FAIL(sql.assign_fmt("delete from %s where tenant_id = 0 and ( ", table_name_))) {
LOG_WARN("fail to assign sql", KR(ret), K_(tenant_id), K_(schema_version));
}
for (int64_t i = 0; OB_SUCC(ret) && i < compress_schema_infos.count(); i++) {
if (OB_FAIL(check_stop())) {
LOG_WARN("schema history recycler is stopped", KR(ret));
} else if (OB_FAIL(sql.append_fmt("%s (obj_id = %ld "
"and objtype = %ld "
"and col_id = %ld "
"and grantor_id = %ld "
"and grantee_id = %ld "
"and priv_id = %ld "
"and schema_version < %ld)",
0 == i ? "" : "or",
compress_schema_infos.at(i).key_.obj_id_,
compress_schema_infos.at(i).key_.obj_type_,
compress_schema_infos.at(i).key_.col_id_,
compress_schema_infos.at(i).key_.grantor_id_,
compress_schema_infos.at(i).key_.grantee_id_,
compress_schema_infos.at(i).key_.priv_id_,
compress_schema_infos.at(i).max_schema_version_))) {
LOG_WARN("fail to append fmt", KR(ret), "schema_info", compress_schema_infos.at(i));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(sql.append_fmt(")"))) {
LOG_WARN("fail to append fmt", KR(ret), K_(tenant_id), K_(schema_version));
}
}
return ret;
}
} // end namespace rootserver
} // end namespace oceanbase

View File

@ -116,38 +116,6 @@ public:
};
};
struct ObSystemVariableSchemaKey {
public:
ObSystemVariableSchemaKey();
~ObSystemVariableSchemaKey();
bool operator==(const ObSystemVariableSchemaKey& other) const;
bool operator!=(const ObSystemVariableSchemaKey& other) const;
bool operator<(const ObSystemVariableSchemaKey& other) const;
ObSystemVariableSchemaKey& operator=(const ObSystemVariableSchemaKey& other);
int assign(const ObSystemVariableSchemaKey& other);
void reset();
bool is_valid() const;
uint64_t hash() const;
TO_STRING_KV(K_(zone), K_(name));
public:
common::ObString zone_;
common::ObString name_;
};
struct ObSystemVariableCompressSchemaInfo {
public:
ObSystemVariableCompressSchemaInfo() : key_(), max_schema_version_(common::OB_INVALID_VERSION)
{}
~ObSystemVariableCompressSchemaInfo()
{}
TO_STRING_KV(K_(key), K_(max_schema_version));
public:
ObSystemVariableSchemaKey key_;
int64_t max_schema_version_;
};
struct ObRecycleSchemaValue {
public:
ObRecycleSchemaValue();
@ -313,6 +281,7 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObRecycleSchemaExecutor);
};
// RECYCLE ONLY
class ObSecondRecycleSchemaExecutor : public ObIRecycleSchemaExecutor {
public:
ObSecondRecycleSchemaExecutor() = delete;
@ -345,6 +314,7 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObSecondRecycleSchemaExecutor);
};
// RECYCLE ONLY
class ObThirdRecycleSchemaExecutor : public ObIRecycleSchemaExecutor {
public:
ObThirdRecycleSchemaExecutor() = delete;
@ -378,6 +348,44 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObThirdRecycleSchemaExecutor);
};
/*
* ##############################
* # system variable #
* ##############################
*/
struct ObSystemVariableSchemaKey {
public:
ObSystemVariableSchemaKey();
~ObSystemVariableSchemaKey();
bool operator==(const ObSystemVariableSchemaKey &other) const;
bool operator!=(const ObSystemVariableSchemaKey &other) const;
bool operator<(const ObSystemVariableSchemaKey &other) const;
ObSystemVariableSchemaKey &operator=(const ObSystemVariableSchemaKey &other);
int assign(const ObSystemVariableSchemaKey &other);
void reset();
bool is_valid() const;
uint64_t hash() const;
TO_STRING_KV(K_(zone), K_(name));
public:
common::ObString zone_;
common::ObString name_;
};
struct ObSystemVariableCompressSchemaInfo {
public:
ObSystemVariableCompressSchemaInfo() : key_(), max_schema_version_(common::OB_INVALID_VERSION)
{}
~ObSystemVariableCompressSchemaInfo()
{}
TO_STRING_KV(K_(key), K_(max_schema_version));
public:
ObSystemVariableSchemaKey key_;
int64_t max_schema_version_;
};
// COMPRESS ONLY
class ObSystemVariableRecycleSchemaExecutor : public ObIRecycleSchemaExecutor {
public:
ObSystemVariableRecycleSchemaExecutor() = delete;
@ -408,6 +416,84 @@ private:
common::ObArenaAllocator allocator_;
DISALLOW_COPY_AND_ASSIGN(ObSystemVariableRecycleSchemaExecutor);
};
/*
* #######################################
* # tenant object priviledge #
* #######################################
*/
struct ObObjectPrivSchemaKey {
public:
ObObjectPrivSchemaKey();
~ObObjectPrivSchemaKey();
bool operator==(const ObObjectPrivSchemaKey &other) const;
bool operator!=(const ObObjectPrivSchemaKey &other) const;
bool operator<(const ObObjectPrivSchemaKey &other) const;
ObObjectPrivSchemaKey &operator=(const ObObjectPrivSchemaKey &other);
int assign(const ObObjectPrivSchemaKey &other);
void reset();
bool is_valid() const;
uint64_t hash() const;
TO_STRING_KV(K_(obj_id), K_(obj_type), K_(col_id), K_(grantor_id), K_(grantee_id), K_(priv_id));
public:
int64_t obj_id_;
int64_t obj_type_;
int64_t col_id_;
int64_t grantor_id_;
int64_t grantee_id_;
int64_t priv_id_;
};
struct ObObjectPrivCompressSchemaInfo {
public:
ObObjectPrivCompressSchemaInfo() : key_(), max_schema_version_(common::OB_INVALID_VERSION)
{}
~ObObjectPrivCompressSchemaInfo()
{}
TO_STRING_KV(K_(key), K_(max_schema_version));
public:
ObObjectPrivSchemaKey key_;
int64_t max_schema_version_;
};
// RECYCLE AND COMPRESS
class ObObjectPrivRecycleSchemaExecutor : public ObIRecycleSchemaExecutor {
public:
ObObjectPrivRecycleSchemaExecutor() = delete;
ObObjectPrivRecycleSchemaExecutor(const uint64_t tenant_id, const int64_t schema_version, const char *table_name,
common::ObMySQLProxy *sql_proxy, ObSchemaHistoryRecycler *recycler);
virtual ~ObObjectPrivRecycleSchemaExecutor();
private:
virtual bool is_valid() const;
virtual int fill_schema_history_map();
virtual int recycle_schema_history();
virtual int compress_schema_history();
virtual int gen_fill_schema_history_sql(int64_t start_idx, common::ObSqlString &sql);
virtual int retrieve_schema_history(
common::sqlclient::ObMySQLResult &result, ObObjectPrivSchemaKey &key, ObRecycleSchemaValue &value);
virtual int fill_schema_history_key(const ObObjectPrivSchemaKey &cur_key, ObObjectPrivSchemaKey &key);
virtual int fill_schema_history(const ObObjectPrivSchemaKey &key, const ObRecycleSchemaValue &value);
virtual int gen_batch_recycle_schema_history_sql(
const common::ObIArray<ObObjectPrivSchemaKey> &dropped_schema_keys, common::ObSqlString &sql);
virtual int batch_recycle_schema_history(const common::ObIArray<ObObjectPrivSchemaKey> &dropped_schema_keys);
virtual int gen_batch_compress_schema_history_sql(
const ObIArray<ObObjectPrivCompressSchemaInfo> &compress_schema_infos, common::ObSqlString &sql);
virtual int batch_compress_schema_history(
const common::ObIArray<ObObjectPrivCompressSchemaInfo> &compress_schema_infos);
private:
common::hash::ObHashMap<ObObjectPrivSchemaKey, ObRecycleSchemaValue, common::hash::NoPthreadDefendMode>
schema_history_map_;
DISALLOW_COPY_AND_ASSIGN(ObObjectPrivRecycleSchemaExecutor);
};
} // end namespace rootserver
} // end namespace oceanbase