[CP] enhance GTT
This commit is contained in:
@ -181,7 +181,8 @@ ObSQLSessionInfo::ObSQLSessionInfo(const uint64_t tenant_id) :
|
||||
dblink_context_(this),
|
||||
sql_req_level_(0),
|
||||
expect_group_id_(OB_INVALID_ID),
|
||||
group_id_not_expected_(false)
|
||||
gtt_session_scope_unique_id_(0),
|
||||
gtt_trans_scope_unique_id_(0)
|
||||
{
|
||||
MEMSET(tenant_buff_, 0, sizeof(share::ObTenantSpaceFetcher));
|
||||
}
|
||||
@ -342,6 +343,10 @@ void ObSQLSessionInfo::reset(bool skip_sys_var)
|
||||
ObBasicSessionInfo::reset(skip_sys_var);
|
||||
txn_free_route_ctx_.reset();
|
||||
}
|
||||
gtt_session_scope_unique_id_ = 0;
|
||||
gtt_trans_scope_unique_id_ = 0;
|
||||
gtt_session_scope_ids_.reset();
|
||||
gtt_trans_scope_ids_.reset();
|
||||
}
|
||||
|
||||
void ObSQLSessionInfo::clean_status()
|
||||
@ -589,10 +594,13 @@ int ObSQLSessionInfo::delete_from_oracle_temp_tables(const obrpc::ObDropTableArg
|
||||
common::ObOracleSqlProxy oracle_sql_proxy;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObDatabaseSchema *database_schema = NULL;
|
||||
ObSEArray<const ObSimpleTableSchemaV2 *, 512> table_schemas;
|
||||
//ObSEArray<const ObSimpleTableSchemaV2 *, 512> table_schemas;
|
||||
obrpc::ObDropTableArg &drop_table_arg = const_cast<obrpc::ObDropTableArg &>(const_drop_table_arg);
|
||||
const share::schema::ObTableType table_type = drop_table_arg.table_type_;
|
||||
const uint64_t tenant_id = drop_table_arg.tenant_id_;
|
||||
const ObTableSchema *table_schema = NULL;
|
||||
user_sql_proxy = &oracle_sql_proxy;
|
||||
|
||||
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
|
||||
tenant_id,
|
||||
schema_guard))) {
|
||||
@ -602,15 +610,17 @@ int ObSQLSessionInfo::delete_from_oracle_temp_tables(const obrpc::ObDropTableArg
|
||||
LOG_WARN("sql proxy is null", K(ret));
|
||||
} else if (OB_FAIL(oracle_sql_proxy.init(sql_proxy->get_pool()))) {
|
||||
LOG_WARN("init oracle sql proxy failed", K(ret));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schemas_in_tenant(tenant_id, table_schemas))) {
|
||||
LOG_WARN("fail to get table schema", K(ret), K(tenant_id));
|
||||
} else {
|
||||
user_sql_proxy = &oracle_sql_proxy;
|
||||
for (int64_t i = 0; i < table_schemas.count() && OB_SUCC(ret); i++) {
|
||||
const ObSimpleTableSchemaV2 *table_schema = table_schemas.at(i);
|
||||
if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("got invalid schema", K(ret), K(i));
|
||||
} else if (TMP_TABLE_ORA_SESS == table_type || TMP_TABLE_ORA_TRX == table_type) {
|
||||
ObIArray<uint64_t> &table_ids = table_type == share::schema::TMP_TABLE_ORA_TRX ?
|
||||
get_gtt_trans_scope_ids() : get_gtt_session_scope_ids();
|
||||
uint64_t unique_id = table_type == share::schema::TMP_TABLE_ORA_TRX ?
|
||||
get_gtt_trans_scope_unique_id() : get_gtt_session_scope_unique_id();
|
||||
LOG_DEBUG("delete temp table", K(table_ids), K(unique_id));
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < table_ids.count(); i++) {
|
||||
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_ids.at(i), table_schema))) {
|
||||
LOG_WARN("fail to get table schema", K(ret));
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
//table may be dropped, ignore
|
||||
} else if (tenant_id != table_schema->get_tenant_id()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tenant_id not match", K(ret), K(tenant_id), "table_id", table_schema->get_table_id());
|
||||
@ -628,28 +638,18 @@ int ObSQLSessionInfo::delete_from_oracle_temp_tables(const obrpc::ObDropTableArg
|
||||
LOG_DEBUG("skip table schema in recyclebin", K(*table_schema));
|
||||
} else {
|
||||
const int64_t limit = 1000;
|
||||
if (0 == drop_table_arg.sess_create_time_) {
|
||||
ret = sql.assign_fmt("DELETE FROM \"%.*s\".\"%.*s\" WHERE %s = %ld AND ROWNUM <= %ld",
|
||||
database_schema->get_database_name_str().length(),
|
||||
database_schema->get_database_name_str().ptr(),
|
||||
table_schema->get_table_name_str().length(),
|
||||
table_schema->get_table_name_str().ptr(),
|
||||
OB_HIDDEN_SESSION_ID_COLUMN_NAME, drop_table_arg.session_id_,
|
||||
limit);
|
||||
} else {
|
||||
ret = sql.assign_fmt("DELETE FROM \"%.*s\".\"%.*s\" WHERE %s = %ld AND %s <> %ld AND ROWNUM <= %ld",
|
||||
database_schema->get_database_name_str().length(),
|
||||
database_schema->get_database_name_str().ptr(),
|
||||
table_schema->get_table_name_str().length(),
|
||||
table_schema->get_table_name_str().ptr(),
|
||||
OB_HIDDEN_SESSION_ID_COLUMN_NAME, drop_table_arg.session_id_,
|
||||
OB_HIDDEN_SESS_CREATE_TIME_COLUMN_NAME, drop_table_arg.sess_create_time_,
|
||||
limit);
|
||||
}
|
||||
ret = sql.assign_fmt("DELETE FROM \"%.*s\".\"%.*s\" WHERE %s = %ld AND ROWNUM <= %ld",
|
||||
database_schema->get_database_name_str().length(),
|
||||
database_schema->get_database_name_str().ptr(),
|
||||
table_schema->get_table_name_str().length(),
|
||||
table_schema->get_table_name_str().ptr(),
|
||||
OB_HIDDEN_SESSION_ID_COLUMN_NAME, unique_id,
|
||||
limit);
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
int64_t affect_rows = 0;
|
||||
int64_t last_batch_affect_rows = limit;
|
||||
int64_t cur_time = ObTimeUtility::current_time();
|
||||
int64_t cur_timeout_backup = THIS_WORKER.get_timeout_ts();
|
||||
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + OB_MAX_USER_SPECIFIED_TIMEOUT);
|
||||
while (OB_SUCC(ret) && last_batch_affect_rows > 0) {
|
||||
@ -669,11 +669,18 @@ int ObSQLSessionInfo::delete_from_oracle_temp_tables(const obrpc::ObDropTableArg
|
||||
} else {
|
||||
LOG_WARN("failed to delete rows in oracle temporary table", K(ret), K(sql));
|
||||
}
|
||||
LOG_INFO("delete rows in oracle temporary table", K(sql), K(affect_rows),
|
||||
"clean_time", ObTimeUtility::current_time() - cur_time);
|
||||
THIS_WORKER.set_timeout_ts(cur_timeout_backup);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (TMP_TABLE_ORA_TRX == table_type && !get_is_deserialized()) {
|
||||
gtt_trans_scope_ids_.reuse();
|
||||
gen_gtt_trans_scope_unique_id();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1523,7 +1530,11 @@ OB_DEF_SERIALIZE(ObSQLSessionInfo)
|
||||
proxy_version_,
|
||||
min_proxy_version_ps_,
|
||||
thread_data_.is_in_retry_,
|
||||
ddl_info_);
|
||||
ddl_info_,
|
||||
gtt_session_scope_unique_id_,
|
||||
gtt_trans_scope_unique_id_,
|
||||
gtt_session_scope_ids_,
|
||||
gtt_trans_scope_ids_);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1550,7 +1561,11 @@ OB_DEF_DESERIALIZE(ObSQLSessionInfo)
|
||||
proxy_version_,
|
||||
min_proxy_version_ps_,
|
||||
thread_data_.is_in_retry_,
|
||||
ddl_info_);
|
||||
ddl_info_,
|
||||
gtt_session_scope_unique_id_,
|
||||
gtt_trans_scope_unique_id_,
|
||||
gtt_session_scope_ids_,
|
||||
gtt_trans_scope_ids_);
|
||||
(void)ObSQLUtils::adjust_time_by_ntp_offset(thread_data_.cur_query_start_time_);
|
||||
return ret;
|
||||
}
|
||||
@ -1578,7 +1593,11 @@ OB_DEF_SERIALIZE_SIZE(ObSQLSessionInfo)
|
||||
proxy_version_,
|
||||
min_proxy_version_ps_,
|
||||
thread_data_.is_in_retry_,
|
||||
ddl_info_);
|
||||
ddl_info_,
|
||||
gtt_session_scope_unique_id_,
|
||||
gtt_trans_scope_unique_id_,
|
||||
gtt_session_scope_ids_,
|
||||
gtt_trans_scope_ids_);
|
||||
return len;
|
||||
}
|
||||
|
||||
@ -3055,6 +3074,22 @@ int ObSysVarEncoder::display_sess_info(ObSQLSessionInfo &sess, const char* curre
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObSQLSessionInfo::gen_gtt_session_scope_unique_id()
|
||||
{
|
||||
static int64_t cur_ts = 0;
|
||||
int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.server_id_);
|
||||
gtt_session_scope_unique_id_ = next_ts;
|
||||
LOG_DEBUG("check temporary table ssid session scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.server_id_), K(lbt()));
|
||||
}
|
||||
|
||||
void ObSQLSessionInfo::gen_gtt_trans_scope_unique_id()
|
||||
{
|
||||
static int64_t cur_ts = 0;
|
||||
int64_t next_ts = ObSQLUtils::combine_server_id(ObSQLUtils::get_next_ts(cur_ts), GCTX.server_id_);
|
||||
gtt_trans_scope_unique_id_ = next_ts;
|
||||
LOG_DEBUG("check temporary table ssid trans scope", K(next_ts), K(get_sessid_for_table()), K(GCTX.server_id_), K(lbt()));
|
||||
}
|
||||
|
||||
int ObAppInfoEncoder::serialize(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
Reference in New Issue
Block a user