[CP] Clear tablet autoinc seq cache when drop table

This commit is contained in:
Hongqin-Li
2024-03-05 03:19:34 +00:00
committed by ob-robot
parent d87ab32ed3
commit 5cb186e4a7
10 changed files with 253 additions and 6 deletions

View File

@ -227,8 +227,7 @@ int ObTabletAutoincrementService::acquire_mgr(
lib::ObMutex &mutex = init_node_mutexs_[key.tablet_id_.id() % INIT_NODE_MUTEX_NUM];
lib::ObMutexGuard guard(mutex);
if (OB_ENTRY_NOT_EXIST == (ret = tablet_autoinc_mgr_map_.get(key, autoinc_mgr))) {
if (NULL == (autoinc_mgr = op_alloc(ObTabletAutoincMgr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
if (OB_FAIL(tablet_autoinc_mgr_map_.alloc_value(autoinc_mgr))) {
LOG_WARN("failed to alloc table mgr", K(ret));
} else if (OB_FAIL(autoinc_mgr->init(key.tablet_id_, init_cache_size))) {
LOG_WARN("fail to init tablet autoinc mgr", K(ret), K(key));
@ -236,7 +235,7 @@ int ObTabletAutoincrementService::acquire_mgr(
LOG_WARN("failed to create table node", K(ret));
}
if (OB_FAIL(ret) && autoinc_mgr != nullptr) {
op_free(autoinc_mgr);
tablet_autoinc_mgr_map_.free_value(autoinc_mgr);
autoinc_mgr = nullptr;
}
}
@ -341,5 +340,174 @@ int ObTabletAutoincrementService::get_tablet_cache_interval(const uint64_t tenan
return ret;
}
int ObTabletAutoincrementService::clear_tablet_autoinc_seq_cache(
const uint64_t tenant_id,
const common::ObIArray<common::ObTabletID> &tablet_ids,
const int64_t abs_timeout_us)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("tablet auto increment service is not inited", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
ObTabletAutoincKey key;
key.tenant_id_ = tenant_id;
key.tablet_id_ = tablet_ids.at(i);
lib::ObMutex &mutex = init_node_mutexs_[key.tablet_id_.id() % INIT_NODE_MUTEX_NUM];
lib::ObMutexGuardWithTimeout guard(mutex, abs_timeout_us);
if (OB_FAIL(guard.get_ret())) {
LOG_WARN("failed to lock", K(ret));
} else if (OB_FAIL(tablet_autoinc_mgr_map_.del(key))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to del tablet autoinc", K(ret), K(key));
}
}
}
return ret;
}
int ObTabletAutoincCacheCleaner::add_single_table(const schema::ObSimpleTableSchemaV2 &table_schema)
{
int ret = OB_SUCCESS;
if (table_schema.is_heap_table() || table_schema.is_aux_lob_meta_table()) {
const uint64_t tenant_id = table_schema.get_tenant_id();
ObArray<ObTabletID> tablet_ids;
if (OB_UNLIKELY(tenant_id != tenant_id_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant id mismatch", K(ret), K(tenant_id), K(tenant_id_));
} else if (OB_FAIL(table_schema.get_tablet_ids(tablet_ids))) {
LOG_WARN("failed to get tablet ids", K(ret));
} else if (OB_FAIL(append(tablet_ids_, tablet_ids))) {
LOG_WARN("failed to append tablet ids", K(ret));
}
}
return ret;
}
// add user table and its related tables that use tablet autoinc, e.g., lob meta table
int ObTabletAutoincCacheCleaner::add_table(schema::ObSchemaGetterGuard &schema_guard, const schema::ObTableSchema &table_schema)
{
int ret = OB_SUCCESS;
if (OB_FAIL(add_single_table(table_schema))) {
LOG_WARN("failed to add single table", K(ret));
}
if (OB_SUCC(ret)) {
const uint64_t lob_meta_tid = table_schema.get_aux_lob_meta_tid();
if (OB_INVALID_ID != lob_meta_tid) {
const ObTableSchema *lob_meta_table_schema = nullptr;
if (OB_FAIL(schema_guard.get_table_schema(table_schema.get_tenant_id(), lob_meta_tid, lob_meta_table_schema))) {
LOG_WARN("failed to get aux table schema", K(ret), K(lob_meta_tid));
} else if (OB_ISNULL(lob_meta_table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid table schema", K(ret), K(lob_meta_tid));
} else if (OB_FAIL(add_single_table(*lob_meta_table_schema))) {
LOG_WARN("failed to add single table", K(ret));
}
}
}
return ret;
}
int ObTabletAutoincCacheCleaner::add_database(const schema::ObDatabaseSchema &database_schema)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard schema_guard;
const uint64_t tenant_id = database_schema.get_tenant_id();
const uint64_t database_id = database_schema.get_database_id();
ObArray<const ObSimpleTableSchemaV2 *> table_schemas;
ObMultiVersionSchemaService &schema_service = ObMultiVersionSchemaService::get_instance();
if (OB_FAIL(schema_service.get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schemas_in_database(tenant_id,
database_id,
table_schemas))) {
LOG_WARN("fail to get table ids in database", K(tenant_id), K(database_id), K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < table_schemas.count(); i++) {
const ObSimpleTableSchemaV2 *table_schema = table_schemas.at(i);
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schema should not be null", K(ret));
} else if (OB_FAIL(add_single_table(*table_schema))) {
LOG_WARN("fail to lock_table", KR(ret), KPC(table_schema));
}
}
}
return ret;
}
int ObTabletAutoincCacheCleaner::commit(const int64_t timeout_us)
{
int ret = OB_SUCCESS;
ObTimeGuard time_guard("ObTabletAutoincCacheCleaner", 1 * 1000 * 1000);
ObTabletAutoincrementService &tablet_autoinc_service = share::ObTabletAutoincrementService::get_instance();
uint64_t data_version = 0;
common::ObZone zone;
common::ObSEArray<common::ObAddr, 8> server_list;
ObUnitInfoGetter ui_getter;
obrpc::ObClearTabletAutoincSeqCacheArg arg;
const ObLSID unused_ls_id = SYS_LS;
int64_t abs_timeout_us = ObTimeUtility::current_time() + timeout_us;
const ObTimeoutCtx &ctx = ObTimeoutCtx::get_ctx();
if (THIS_WORKER.is_timeout_ts_valid()) {
abs_timeout_us = std::min(abs_timeout_us, THIS_WORKER.get_timeout_ts());
}
if (ctx.is_timeout_set()) {
abs_timeout_us = std::min(abs_timeout_us, ctx.get_abs_timeout());
}
if (ctx.is_trx_timeout_set()) {
abs_timeout_us = std::min(abs_timeout_us, ObTimeUtility::current_time() + ctx.get_trx_timeout_us());
}
const int64_t min_cluster_version = GET_MIN_CLUSTER_VERSION();
if ((min_cluster_version >= MOCK_CLUSTER_VERSION_4_2_3_0 && min_cluster_version < CLUSTER_VERSION_4_3_0_0)
|| min_cluster_version >= CLUSTER_VERSION_4_3_0_1) {
if (OB_ISNULL(GCTX.srv_rpc_proxy_) || OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("srv_rpc_proxy or sql_proxy in GCTX is null", K(ret), K(GCTX.srv_rpc_proxy_), K(GCTX.sql_proxy_));
} else if (OB_FAIL(ui_getter.init(*GCTX.sql_proxy_, &GCONF))) {
LOG_WARN("init unit info getter failed", K(ret));
} else if (OB_FAIL(ui_getter.get_tenant_servers(tenant_id_, server_list))) {
LOG_WARN("get tenant servers failed", K(ret));
} else if (OB_FAIL(arg.init(tablet_ids_, unused_ls_id))) {
LOG_WARN("failed to init clear tablet autoinc arg", K(ret));
} else {
bool need_clean_self = false;
rootserver::ObClearTabletAutoincSeqCacheProxy proxy(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::clear_tablet_autoinc_seq_cache);
for (int64_t i = 0; i < server_list.count(); i++) { // overwrite ret
const common::ObAddr &addr = server_list.at(i);
const int64_t cur_timeout_us = abs_timeout_us - ObTimeUtility::current_time();
if (cur_timeout_us <= 0) {
ret = OB_TIMEOUT;
break;
} else if (GCTX.self_addr() == addr) {
need_clean_self = true;
} else {
if (OB_FAIL(proxy.call(addr, cur_timeout_us, tenant_id_, arg))) {
LOG_WARN("failed to send rpc call", K(ret), K(addr), K(timeout_us), K(tenant_id_));
}
}
}
if (need_clean_self) { // overwrite ret
if (OB_FAIL(tablet_autoinc_service.clear_tablet_autoinc_seq_cache(tenant_id_, tablet_ids_, abs_timeout_us))) {
LOG_WARN("failed to clear tablet autoinc", K(ret));
}
}
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(proxy.wait())) {
LOG_WARN("wait batch result failed", K(tmp_ret), K(ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
}
}
return ret;
}
}
}