diff --git a/mittest/mtlenv/mock_tenant_module_env.h b/mittest/mtlenv/mock_tenant_module_env.h index 6d99442afc..1897b3d511 100644 --- a/mittest/mtlenv/mock_tenant_module_env.h +++ b/mittest/mtlenv/mock_tenant_module_env.h @@ -72,6 +72,7 @@ #include "mock_gts_source.h" #include "storage/blocksstable/ob_shared_macro_block_manager.h" #include "storage/concurrency_control/ob_multi_version_garbage_collector.h" +#include "storage/tablelock/ob_table_lock_service.h" #include "storage/tx/wrs/ob_tenant_weak_read_service.h" namespace oceanbase @@ -669,6 +670,7 @@ int MockTenantModuleEnv::init() MTL_BIND2(mtl_new_default, storage::ObTenantFreezeInfoMgr::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObSharedMacroBlockMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObMultiVersionGarbageCollector::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); + MTL_BIND2(mtl_new_default, ObTableLockService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(server_obj_pool_mtl_new, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy); } if (OB_FAIL(ret)) { diff --git a/mittest/mtlenv/tablelock/test_lock_memtable.cpp b/mittest/mtlenv/tablelock/test_lock_memtable.cpp index a7d3dd3f4a..f9bfd73dce 100644 --- a/mittest/mtlenv/tablelock/test_lock_memtable.cpp +++ b/mittest/mtlenv/tablelock/test_lock_memtable.cpp @@ -138,6 +138,7 @@ TEST_F(TestLockMemtable, lock) MyTxCtx default_ctx; ObStoreCtx store_ctx; + ObStoreCtx unlock_store_ctx; min_commited_scn.set_min(); flushed_scn.set_min(); // 1.1 get store ctx @@ -174,10 +175,6 @@ TEST_F(TestLockMemtable, lock) memtable_.remove_lock_record(DEFAULT_IN_TRANS_LOCK_OP); // 1.6 check again LOG_INFO("TestLockMemtable::lock 1.6"); - ret = memtable_.obj_lock_map_.get_obj_lock_with_ref_(DEFAULT_TABLET_LOCK_ID, - obj_lock); - - ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); ret = mem_ctx->check_lock_exist(DEFAULT_IN_TRANS_LOCK_OP.lock_id_, DEFAULT_IN_TRANS_LOCK_OP.owner_id_, DEFAULT_IN_TRANS_LOCK_OP.lock_mode_, @@ -217,7 +214,6 @@ TEST_F(TestLockMemtable, lock) // 2.3 unlock not complete lock LOG_INFO("TestLockMemtable::lock 2.3"); MyTxCtx ctx2; - ObStoreCtx unlock_store_ctx; start_tx(TRANS_ID2, ctx2); get_store_ctx(ctx2, unlock_store_ctx); ctx2.tx_ctx_.change_to_leader(); @@ -294,6 +290,7 @@ TEST_F(TestLockMemtable, replay) share::SCN flushed_scn; MyTxCtx default_ctx; ObStoreCtx store_ctx; + ObStoreCtx unlock_store_ctx; ObMemtableCtx *mem_ctx = nullptr; bool lock_exist = false; ObOBJLock *obj_lock = NULL; @@ -346,10 +343,6 @@ TEST_F(TestLockMemtable, replay) memtable_.remove_lock_record(DEFAULT_IN_TRANS_LOCK_OP); // 1.5 check again LOG_INFO("TestLockMemtable::replay 1.5"); - ret = memtable_.obj_lock_map_.get_obj_lock_with_ref_(DEFAULT_TABLET_LOCK_ID, - obj_lock); - - ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); ret = mem_ctx->check_lock_exist(DEFAULT_IN_TRANS_LOCK_OP.lock_id_, DEFAULT_IN_TRANS_LOCK_OP.owner_id_, DEFAULT_IN_TRANS_LOCK_OP.lock_mode_, @@ -387,7 +380,6 @@ TEST_F(TestLockMemtable, replay) // 2.3 unlock not complete lock LOG_INFO("TestLockMemtable::replay 2.3"); MyTxCtx ctx2; - ObStoreCtx unlock_store_ctx; start_tx(TRANS_ID2, ctx2); get_store_ctx(ctx2, unlock_store_ctx); ctx2.tx_ctx_.change_to_leader(); @@ -462,6 +454,12 @@ TEST_F(TestLockMemtable, recover) ObOBJLock *obj_lock = NULL; min_commited_scn.set_min(); flushed_scn.set_min(); + share::SCN commit_version; + share::SCN commit_scn; + commit_version.set_base(); + commit_scn.set_base(); + MyTxCtx ctx2; + ObStoreCtx unlock_store_ctx; // 1. recover in trans lock // 1.1 recover LOG_INFO("TestLockMemtable::recover 1.1"); @@ -477,10 +475,14 @@ TEST_F(TestLockMemtable, recover) LOG_INFO("TestLockMemtable::recover 1.3"); memtable_.remove_lock_record(DEFAULT_IN_TRANS_LOCK_OP); // 1.4 check exist - LOG_INFO("TestLockMemtable::recover 1.4"); - ret = memtable_.obj_lock_map_.get_obj_lock_with_ref_(DEFAULT_TABLET_LOCK_ID, - obj_lock); - ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); + // We move obj lock garbage collect process to gc thread + // which is backend, so you can still see obj lock here. + // See ObOBJLockGarbageCollector for details. + // + // LOG_INFO("TestLockMemtable::recover 1.4"); + // ret = memtable_.obj_lock_map_.get_obj_lock_with_ref_(DEFAULT_TABLET_LOCK_ID, + // obj_lock); + // ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); // 2. recover out trans lock // 2.1 recover @@ -489,10 +491,6 @@ TEST_F(TestLockMemtable, recover) ASSERT_EQ(OB_SUCCESS, ret); // 2.2 commit LOG_INFO("TestLockMemtable::recover 2.2"); - share::SCN commit_version; - share::SCN commit_scn; - commit_version.set_base(); - commit_scn.set_base(); ret = memtable_.update_lock_status(DEFAULT_OUT_TRANS_LOCK_OP, commit_version, commit_scn, @@ -500,8 +498,6 @@ TEST_F(TestLockMemtable, recover) ASSERT_EQ(OB_SUCCESS, ret); // 2.3 unlock LOG_INFO("TestLockMemtable::recover 2.3"); - MyTxCtx ctx2; - ObStoreCtx unlock_store_ctx; start_tx(TRANS_ID2, ctx2); get_store_ctx(ctx2, unlock_store_ctx); ctx2.tx_ctx_.change_to_leader(); @@ -591,10 +587,6 @@ TEST_F(TestLockMemtable, pre_check_lock) memtable_.remove_lock_record(DEFAULT_IN_TRANS_LOCK_OP); // 1.7 check again LOG_INFO("TestLockMemtable::pre_check_lock 1.7"); - ret = memtable_.obj_lock_map_.get_obj_lock_with_ref_(DEFAULT_TABLET_LOCK_ID, - obj_lock); - - ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); ret = mem_ctx->check_lock_exist(DEFAULT_IN_TRANS_LOCK_OP.lock_id_, DEFAULT_IN_TRANS_LOCK_OP.owner_id_, DEFAULT_IN_TRANS_LOCK_OP.lock_mode_, @@ -932,46 +924,50 @@ TEST_F(TestLockMemtable, out_trans_multi_source) ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); // 8 COMPACT + // This case is invalid after ObOBJLockGarbageCollector + // starts using. In short, when switch to leader, gc thread + // will try to execute a force compact, to avoid this case. + // We don't need to compact residual unlock op anymore. // 8.1 create an unlock op and committed it. - LOG_INFO("TestLockMemtable::out_trans_multi_source 8.1"); - scn = share::SCN::plus(share::SCN::min_scn(), 2); - is_replay = true; - ret = ctx2.tx_ctx_.notify_data_source_(NotifyType::TX_END, scn, is_replay, mds_array_unlock); - ASSERT_EQ(OB_SUCCESS, ret); - memtable_.obj_lock_map_.print(); + // LOG_INFO("TestLockMemtable::out_trans_multi_source 8.1"); + // scn = share::SCN::plus(share::SCN::min_scn(), 2); + // is_replay = true; + // ret = ctx2.tx_ctx_.notify_data_source_(NotifyType::TX_END, scn, is_replay, mds_array_unlock); + // ASSERT_EQ(OB_SUCCESS, ret); + // memtable_.obj_lock_map_.print(); // 8.2 commit the unlock op - LOG_INFO("TestLockMemtable::out_trans_multi_source 8.2"); - is_commit = true; - commit_version = share::SCN::plus(share::SCN::min_scn(), 2); - commit_scn = share::SCN::plus(share::SCN::min_scn(), 2); - mem_ctx = store_ctx2.mvcc_acc_ctx_.mem_ctx_; - ret = mem_ctx->clear_table_lock_(is_commit, - commit_version, - commit_scn); - ASSERT_EQ(OB_SUCCESS, ret); - memtable_.obj_lock_map_.print(); + // LOG_INFO("TestLockMemtable::out_trans_multi_source 8.2"); + // is_commit = true; + // commit_version = share::SCN::plus(share::SCN::min_scn(), 2); + // commit_scn = share::SCN::plus(share::SCN::min_scn(), 2); + // mem_ctx = store_ctx2.mvcc_acc_ctx_.mem_ctx_; + // ret = mem_ctx->clear_table_lock_(is_commit, + // commit_version, + // commit_scn); + // ASSERT_EQ(OB_SUCCESS, ret); + // memtable_.obj_lock_map_.print(); // 8.3 compact unlock op if lock conflict occur. - LOG_INFO("TestLockMemtable::out_trans_multi_source 8.3"); - ObTableLockOp conflict_lock_op = lock_op; - conflict_lock_op.lock_mode_ = DEFAULT_COFLICT_LOCK_MODE; - ObLockParam param; - bool is_try_lock = true; - int64_t expired_time = 0; - param.is_try_lock_ = is_try_lock; - param.expired_time_ = expired_time; - ret = memtable_.lock(param, - store_ctx, - conflict_lock_op); - ASSERT_EQ(OB_SUCCESS, ret); + // LOG_INFO("TestLockMemtable::out_trans_multi_source 8.3"); + // ObTableLockOp conflict_lock_op = lock_op; + // conflict_lock_op.lock_mode_ = DEFAULT_COFLICT_LOCK_MODE; + // ObLockParam param; + // bool is_try_lock = true; + // int64_t expired_time = 0; + // param.is_try_lock_ = is_try_lock; + // param.expired_time_ = expired_time; + // ret = memtable_.lock(param, + // store_ctx, + // conflict_lock_op); + // ASSERT_EQ(OB_SUCCESS, ret); // 8.4 check unlock op exist - LOG_INFO("TestLockMemtable::out_trans_multi_source 8.4"); - commit_version = share::SCN::plus(share::SCN::min_scn(), 2); - commit_scn = share::SCN::plus(share::SCN::min_scn(), 2); - ret = memtable_.update_lock_status(unlock_op, - commit_version, - commit_scn, - COMMIT_LOCK_OP_STATUS); - ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); + // LOG_INFO("TestLockMemtable::out_trans_multi_source 8.4"); + // commit_version = share::SCN::plus(share::SCN::min_scn(), 2); + // commit_scn = share::SCN::plus(share::SCN::min_scn(), 2); + // ret = memtable_.update_lock_status(unlock_op, + // commit_version, + // commit_scn, + // COMMIT_LOCK_OP_STATUS); + // ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); } } // tablelock diff --git a/mittest/mtlenv/tablelock/test_obj_lock_map.cpp b/mittest/mtlenv/tablelock/test_obj_lock_map.cpp index cc973883d1..0579d9af27 100644 --- a/mittest/mtlenv/tablelock/test_obj_lock_map.cpp +++ b/mittest/mtlenv/tablelock/test_obj_lock_map.cpp @@ -160,9 +160,9 @@ TEST_F(TestObjLockMap, lock) lock_map_.lock_map_.revert(obj_lock); obj_lock = NULL; // 1.4 obj lock release check - ret = lock_map_.get_obj_lock_with_ref_(DEFAULT_TABLET_LOCK_ID, - obj_lock); - + ret = lock_map_.unlock(DEFAULT_OUT_TRANS_UNLOCK_OP, + is_try_lock, + expired_time); ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); lock_map_.reset(); @@ -241,9 +241,9 @@ TEST_F(TestObjLockMap, unlock) min_commited_scn = lock_map_.get_min_ddl_committed_scn(flushed_scn); ASSERT_EQ(min_commited_scn, share::SCN::max_scn()); // 1.8 obj lock release check - ret = lock_map_.get_obj_lock_with_ref_(DEFAULT_TABLET_LOCK_ID, - obj_lock); - + ret = lock_map_.unlock(DEFAULT_OUT_TRANS_UNLOCK_OP, + is_try_lock, + expired_time); ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); lock_map_.reset(); @@ -274,9 +274,9 @@ TEST_F(TestObjLockMap, recover) // 1.3 commit lock_map_.remove_lock_record(DEFAULT_IN_TRANS_LOCK_OP); // 1.4 check exist - ret = lock_map_.get_obj_lock_with_ref_(DEFAULT_TABLET_LOCK_ID, - obj_lock); - + ret = lock_map_.unlock(DEFAULT_OUT_TRANS_UNLOCK_OP, + is_try_lock, + expired_time); ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); // 2 recover out trans lock @@ -315,9 +315,9 @@ TEST_F(TestObjLockMap, recover) // 2.6 check exist min_commited_scn = lock_map_.get_min_ddl_committed_scn(flushed_scn); ASSERT_EQ(min_commited_scn, share::SCN::max_scn()); - ret = lock_map_.get_obj_lock_with_ref_(DEFAULT_TABLET_LOCK_ID, - obj_lock); - + ret = lock_map_.unlock(DEFAULT_OUT_TRANS_UNLOCK_OP, + is_try_lock, + expired_time); ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); lock_map_.reset(); diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index 4f39109b94..ca05b7b016 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -25,6 +25,7 @@ ob_unittest_observer(test_ob_black_list_service test_ob_black_list_service.cpp) ob_unittest_observer(test_ob_minor_freeze test_ob_minor_freeze.cpp) ob_unittest_observer(test_ob_simple_cluster test_ob_simple_cluster.cpp) ob_unittest_observer(test_ob_table_lock_service test_ob_table_lock_service.cpp) +ob_unittest_observer(test_ob_obj_lock_garbage_collector test_ob_obj_lock_garbage_collector.cpp) ob_unittest_observer(test_observer_expand_shrink test_observer_expand_shrink.cpp) ob_unittest_observer(test_replay_from_middle test_replay_from_middle.cpp) ob_unittest_observer(test_special_tablet_flush test_special_tablet_flush.cpp) diff --git a/mittest/simple_server/test_ob_obj_lock_garbage_collector.cpp b/mittest/simple_server/test_ob_obj_lock_garbage_collector.cpp new file mode 100644 index 0000000000..bcfad3a97a --- /dev/null +++ b/mittest/simple_server/test_ob_obj_lock_garbage_collector.cpp @@ -0,0 +1,639 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include +#define USING_LOG_PREFIX TABLELOCK +#define protected public +#define private public + +#include "env/ob_simple_cluster_test_base.h" +#include "env/ob_simple_server_restart_helper.h" +#include "lib/mysqlclient/ob_mysql_result.h" +#include "share/schema/ob_tenant_schema_service.h" +#include "share/schema/ob_part_mgr_util.h" +#include "storage/tablelock/ob_table_lock_service.h" +#include "storage/tablelock/ob_lock_memtable.h" +#include "storage/tx_storage/ob_ls_service.h" +#include "storage/tx/ob_trans_service.h" +#include "mittest/mtlenv/tablelock/table_lock_common_env.h" + +#undef private +#undef protected + +static const char *TEST_FILE_NAME = "test_ob_obj_lock_garbage_collector"; +static const char *BORN_CASE_NAME = "ObOBJLockGCBeforeRestartTest"; +static const char *RESTART_CASE_NAME = "ObOBJLockGCAfterRestartTest"; + +namespace oceanbase +{ +namespace transaction +{ +namespace tablelock +{ +// We modify the exection interval of the obj_lcoK_garbage_collector here, +// to make sure that the empty obj locks can be recycled in time. +// int64_t ObOBJLockGarbageCollector::GARBAGE_COLLECT_EXEC_INTERVAL = 1_s; +// int64_t ObOBJLockGarbageCollector::GARBAGE_COLLECT_PRECISION = 100_ms; +} // namespace tablelock +} // namespace transaction +namespace unittest +{ +#define EXE_SQL(sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define EXE_SQL_FMT(...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define WRITE_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define WRITE_SQL_FMT_BY_CONN(conn, ...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define READ_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), read_res)); + +class ObOBJLockGarbageCollectorTestBase : public ObSimpleClusterTestBase +{ +public: + ObOBJLockGarbageCollectorTestBase() : ObSimpleClusterTestBase(TEST_FILE_NAME) {} + void get_ls(const uint64_t tenant_id, const ObLSID ls_id, ObLS *&ls) + { + LOG_INFO("get_ls start"); + ls = nullptr; + share::ObTenantSwitchGuard tenant_guard; + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id)); + + ObLSService *ls_svr = MTL(ObLSService*); + ASSERT_NE(nullptr, ls_svr); + ObLSHandle handle; + ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, handle, ObLSGetMod::STORAGE_MOD)); + ASSERT_NE(nullptr, ls = handle.get_ls()); + LOG_INFO("get_ls end"); + } + + void get_table_id(const char* tname, uint64_t &table_id) + { + int ret = OB_SUCCESS; + static bool need_init = true; + if (need_init) { + need_init = false; + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2("sys", "oceanbase")); + } + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + { + ObSqlString sql; + table_id = 0; + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("select table_id from __all_table where table_name='%s'", tname)); + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr())); + sqlclient::ObMySQLResult *result = res.get_result(); + ASSERT_NE(nullptr, result); + ASSERT_EQ(OB_SUCCESS, result->next()); + ASSERT_EQ(OB_SUCCESS, result->get_uint("table_id", table_id)); + } + } + } + + void get_table_tablets(const uint64_t table_id, + ObTabletIDArray &tablet_list) + { + int ret = OB_SUCCESS; + int64_t latest_schema_version = OB_INVALID_VERSION; + ObRefreshSchemaStatus schema_status; + const uint64_t tenant_id = OB_SYS_TENANT_ID; + ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = nullptr; + ObMultiVersionSchemaService *schema_service = nullptr; + share::ObTenantSwitchGuard tenant_guard; + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy(); + + tablet_list.reset(); + ret = tenant_guard.switch_to(tenant_id); + ASSERT_EQ(OB_SUCCESS, ret); + schema_service = MTL(ObTenantSchemaService *)->get_schema_service(); + ret = schema_service->get_schema_version_in_inner_table( + sql_proxy, schema_status, latest_schema_version); + ret = + schema_service->async_refresh_schema(tenant_id, latest_schema_version); + ASSERT_EQ(OB_SUCCESS, ret); + ret = schema_service->get_tenant_schema_guard(tenant_id, schema_guard); + ASSERT_EQ(OB_SUCCESS, ret); + ret = schema_guard.get_table_schema(tenant_id, table_id, table_schema); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_NE(nullptr, table_schema); + if (PARTITION_LEVEL_ZERO == table_schema->get_part_level()) { + ret = tablet_list.push_back(table_schema->get_tablet_id()); + ASSERT_EQ(OB_SUCCESS, ret); + } else { + ObCheckPartitionMode check_partition_mode = CHECK_PARTITION_MODE_NORMAL; + share::schema::ObPartitionSchemaIter partition_iter(*table_schema, + check_partition_mode); + ObTabletID tablet_id; + while (OB_SUCC(partition_iter.next_tablet_id(tablet_id))) { + ret = tablet_list.push_back(tablet_id); + ASSERT_EQ(OB_SUCCESS, ret); + } + ASSERT_EQ(OB_ITER_END, ret); + } + } + + // This function is not atomic, it means that it may get obj lock even + // if it was recycled just now. However, we can coontrol the obj lock + // by the execution logic in the test case to avoid this situation. + void table_has_obj_lock(const uint64_t table_id, bool &has_obj_lock) + { + int ret = OB_SUCCESS; + ObLS *ls = nullptr; + ObLockIDIterator lock_id_iter; + ObLockID target_lock_id; + ObLockID curr_lock_id; + + get_ls(OB_SYS_TENANT_ID, share::ObLSID(share::ObLSID::SYS_LS_ID), ls); + ASSERT_NE(nullptr, ls); + ASSERT_EQ(OB_SUCCESS, ls->get_lock_id_iter(lock_id_iter)); + ASSERT_EQ(true, lock_id_iter.is_ready()); + + ASSERT_EQ(OB_SUCCESS, + transaction::tablelock::get_lock_id(table_id, target_lock_id)); + has_obj_lock = false; + + do { + if (OB_SUCC(lock_id_iter.get_next(curr_lock_id))) { + if (target_lock_id == curr_lock_id) { + has_obj_lock = true; + break; + } + } + } while (OB_SUCC(ret)); + } + + void wakeup_gc_thread() + { + ObTableLockService *tablelock_service = nullptr; + tablelock_service = MTL(transaction::tablelock::ObTableLockService *); + ASSERT_NE(nullptr, tablelock_service); + ASSERT_EQ(OB_SUCCESS, tablelock_service->garbage_collect_right_now()); + } + + void get_lock_memtable(ObLockMemtable *&lock_memtable) + { + ObLS *ls = nullptr; + ObTableHandleV2 table_handle; + get_ls(OB_SYS_TENANT_ID, share::ObLSID(share::ObLSID::SYS_LS_ID), ls); + ASSERT_NE(nullptr, ls); + ASSERT_EQ(OB_SUCCESS, ls->get_lock_table()->get_lock_memtable(table_handle)); + ASSERT_EQ(OB_SUCCESS, table_handle.get_lock_memtable(lock_memtable)); + } + + void init_test_lock_op() + { + // table_id = 1 and trans_id = 1 are valid in real observer, + // so we modify them to a differnt value to avoid conflict + DEFAULT_TABLE = 123456; + DEFAULT_TRANS_ID = 123456; + init_default_lock_test_value(); + } +}; + +class ObOBJLockGCBeforeRestartTest : public ObOBJLockGarbageCollectorTestBase { +}; +class ObOBJLockGCAfterRestartTest : public ObOBJLockGarbageCollectorTestBase { +}; + +TEST_F(ObOBJLockGCBeforeRestartTest, create_table) +{ + LOG_INFO("ObOBJLockGCBeforeRestartTest::create_table"); + // 1. CREATE ONE PART TABLE + // 2. CREATE MULTI PART TABLE + int ret = OB_SUCCESS; + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy(); + // 1. ONE PART TABLE + LOG_INFO("create_table one part table start"); + { + ObSqlString sql; + int64_t affected_rows = 0; + ASSERT_EQ( + OB_SUCCESS, + sql.assign_fmt( + "create table t_one_part (id int, data int, primary key(id))")); + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + } + LOG_INFO("create_table one part table succ"); + + LOG_INFO("insert data start"); + { + ObSqlString sql; + int64_t affected_rows = 0; + ASSERT_EQ(OB_SUCCESS, + sql.assign_fmt("insert into t_one_part values(%d, %d)", 1, 1)); + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + } + LOG_INFO("check row count"); + { + int64_t row_cnt = 0; + ObSqlString sql; + ASSERT_EQ(OB_SUCCESS, + sql.assign_fmt("select count(*) row_cnt from t_one_part")); + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr())); + sqlclient::ObMySQLResult *result = res.get_result(); + ASSERT_NE(nullptr, result); + ASSERT_EQ(OB_SUCCESS, result->next()); + ASSERT_EQ(OB_SUCCESS, result->get_int("row_cnt", row_cnt)); + } + ASSERT_EQ(row_cnt, 1); + } + + // 2. MULTI PART TABLE + LOG_INFO("create_table multi part table start"); + { + ObSqlString sql; + int64_t affected_rows = 0; + ASSERT_EQ( + OB_SUCCESS, + sql.assign_fmt( + "create table t_multi_part (id int, data int, primary key(id)) " + "partition by range(id) (partition p0 values less than (100), " + "partition p1 values less than (200), partition p2 values less " + "than MAXVALUE)")); + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + } + LOG_INFO("create_table multi part table succ"); + + LOG_INFO("insert data start"); + { + ObSqlString sql; + int64_t affected_rows = 0; + ASSERT_EQ(OB_SUCCESS, + sql.assign_fmt("insert into t_multi_part values(%d, %d)", 1, 1)); + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + ASSERT_EQ( + OB_SUCCESS, + sql.assign_fmt("insert into t_multi_part values(%d, %d)", 101, 101)); + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + ASSERT_EQ( + OB_SUCCESS, + sql.assign_fmt("insert into t_multi_part values(%d, %d)", 202, 202)); + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + } + + LOG_INFO("check row count"); + { + int64_t row_cnt = 0; + ObSqlString sql; + ASSERT_EQ(OB_SUCCESS, + sql.assign_fmt("select count(*) row_cnt from t_multi_part")); + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr())); + sqlclient::ObMySQLResult *result = res.get_result(); + ASSERT_NE(nullptr, result); + ASSERT_EQ(OB_SUCCESS, result->next()); + ASSERT_EQ(OB_SUCCESS, result->get_int("row_cnt", row_cnt)); + } + ASSERT_EQ(row_cnt, 3); + } +} + +TEST_F(ObOBJLockGCBeforeRestartTest, obj_lock_gc_with_tablelock_service) +{ + LOG_INFO("ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service"); + int ret = OB_SUCCESS; + ObTableLockOwnerID OWNER_ONE = 1; + ObTableLockOwnerID OWNER_TWO = 2; + uint64_t table_id = 0; + ObTableLockMode lock_mode = EXCLUSIVE; + share::ObTenantSwitchGuard tenant_guard; + bool has_obj_lock; + + ret = tenant_guard.switch_to(OB_SYS_TENANT_ID); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_NE(nullptr, MTL(ObTableLockService*)); + + // 1. LOCK TABLE AND UNLOCK TABLE + // 1.1 lock one part table + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 1.1"); + get_table_id("t_one_part", table_id); + ret = MTL(ObTableLockService*)->lock_table(table_id, + lock_mode, + OWNER_ONE); + ASSERT_EQ(OB_SUCCESS, ret); + + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + // 1.2 lock multi part table + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 1.2"); + get_table_id("t_multi_part", table_id); + ret = MTL(ObTableLockService*)->lock_table(table_id, + lock_mode, + OWNER_TWO); + ASSERT_EQ(OB_SUCCESS, ret); + // 1.3 check obj lock status + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 1.3"); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + // 1.4 unlock one part table + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 1.4"); + get_table_id("t_one_part", table_id); + ret = MTL(ObTableLockService*)->unlock_table(table_id, + lock_mode, + OWNER_ONE); + ASSERT_EQ(OB_SUCCESS, ret); + // 1.5 check obj lock status + // the obj lock is still be there, due to gc thread + // will try to recycle empty obj lock every 10 mins + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 1.5"); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + // 1.6 unlock multi part table + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 1.6"); + get_table_id("t_multi_part", table_id); + ret = MTL(ObTableLockService*)->unlock_table(table_id, + lock_mode, + OWNER_TWO); + ASSERT_EQ(OB_SUCCESS, ret); + // 1.7 check obj lock status + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 1.7"); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + // 1.8 wake up gc thread to clear empty obj locks + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 1.8"); + wakeup_gc_thread(); + // wait gc thread to recycle obj lock + sleep(2); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_FALSE(has_obj_lock); + get_table_id("t_one_part", table_id); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_FALSE(has_obj_lock); + + // 2. UNLOCK TABLE AND LOCK TABLE + // 2.1 unlock one part table + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 2.1"); + get_table_id("t_one_part", table_id); + ret = MTL(ObTableLockService*)->unlock_table(table_id, + SHARE, + OWNER_ONE); + ASSERT_EQ(OB_OBJ_LOCK_NOT_EXIST, ret); + // 2.2 check obj lock status + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 2.2"); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_FALSE(has_obj_lock); + // 2.3 lock one part table + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 2.3"); + ret = MTL(ObTableLockService*)->lock_table(table_id, + SHARE, + OWNER_ONE); + ASSERT_EQ(OB_SUCCESS, ret); + // 2.4 check obj lock status + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 2.4"); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + + // 3. LOCK WITH DIFFERNT OWNER AND UNLOCK + // 3.1 lock one part table with different owner + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 3.1"); + get_table_id("t_one_part", table_id); + ret = MTL(ObTableLockService*)->lock_table(table_id, + SHARE, + OWNER_TWO); + ASSERT_EQ(OB_SUCCESS, ret); + // 3.2 check obj lock status + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 3.2"); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + // 3.3 unlock previous lock which is owned by owner one + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 3.3"); + ret = MTL(ObTableLockService*)->unlock_table(table_id, + SHARE, + OWNER_ONE); + ASSERT_EQ(OB_SUCCESS, ret); + // 3.4 check obj lock status + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 3.4"); + wakeup_gc_thread(); + // wait gc thread to recycle obj lock + sleep(2); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + // 3.5 unlock current lock which is owned by owner two + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 3.5"); + ret = MTL(ObTableLockService*)->unlock_table(table_id, + SHARE, + OWNER_TWO); + ASSERT_EQ(OB_SUCCESS, ret); + // 3.6 check obj lock status + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 3.6"); + wakeup_gc_thread(); + // wait gc thread to recycle obj lock + sleep(2); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_FALSE(has_obj_lock); + + // 4. LOCK TABLE FOR RESTART + LOG_INFO( + "ObOBJLockGCBeforeRestartTest::obj_lock_gc_with_tablelock_service 4"); + ret = MTL(ObTableLockService*)->lock_table(table_id, + lock_mode, + OWNER_ONE); + ASSERT_EQ(OB_SUCCESS, ret); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_TRUE(has_obj_lock); +} + +TEST_F(ObOBJLockGCBeforeRestartTest, op_list_gc_with_mock_lock_map) +{ + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map"); + ObLockMemtable *lock_memtable = nullptr; + ObOBJLock *obj_lock = nullptr; + bool has_obj_lock; + share::SCN commit_scn; + share::SCN commit_version; + share::ObTenantSwitchGuard tenant_guard; + + commit_scn.set_base(); + commit_version.set_base(); + + get_lock_memtable(lock_memtable); + ObOBJLockMap &obj_lock_map = lock_memtable->obj_lock_map_; + init_test_lock_op(); + + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(OB_SYS_TENANT_ID)); + // 1. REPLAY UNLOCK OP AND LOCK OP, + // THEN COMMIT UNLOCK OP BEFORE LOCK OP + // 1.1 recover unlock op + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 1.1"); + ASSERT_EQ(OB_SUCCESS, + obj_lock_map.recover_obj_lock(DEFAULT_OUT_TRANS_UNLOCK_OP)); + // 1.2 check obj lock exists + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 1.2"); + ASSERT_EQ(OB_SUCCESS, obj_lock_map.get_obj_lock_with_ref_( + DEFAULT_TABLE_LOCK_ID, obj_lock)); + ASSERT_NE(nullptr, obj_lock); + // 1.3 recover lock op + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 1.3"); + ASSERT_EQ(OB_SUCCESS, + obj_lock_map.recover_obj_lock(DEFAULT_OUT_TRANS_LOCK_OP)); + // 1.4 verify obj lock status by log + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 1.4"); + obj_lock->print(); + // 1.5 commit unlock op + // We will try to compact lock ops if there's paired and committed + // lock op in the op_list when commit unlock op. However, this unlock + // op cannot be compactted here, because the lock op in the op_list + // is still running. You can find that it tried to compact but failed + // (by is_compcat = false) from the log. + // This situation will occur during replyaing in the followers. + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 1.5"); + ASSERT_EQ(OB_SUCCESS, obj_lock_map.update_lock_status( + DEFAULT_OUT_TRANS_UNLOCK_OP, commit_version, + commit_scn, COMMIT_LOCK_OP_STATUS)); + // 1.6 commit lock op + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 1.6"); + ASSERT_EQ(OB_SUCCESS, obj_lock_map.update_lock_status( + DEFAULT_OUT_TRANS_LOCK_OP, commit_version, + commit_scn, COMMIT_LOCK_OP_STATUS)); + // 1.7 verify obj lock status by log + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 1.7"); + obj_lock->print(); + // revert obj lock + obj_lock_map.lock_map_.revert(obj_lock); + // 1.8 check obj lock status + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 1.8"); + table_has_obj_lock(DEFAULT_TABLE, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + // 1.9 wake up gc thread + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 1.9"); + wakeup_gc_thread(); + // wait gc thread to recycle obj lock + sleep(2); + table_has_obj_lock(DEFAULT_TABLE, has_obj_lock); + ASSERT_FALSE(has_obj_lock); + + // 2. REPLAY UNLOCK OP AND LOCK OP, + // THEN COMMIT LOCK OP BEFORE UNLOCK OP + // The lock ops will be compacted when the unlock op is committed, + // so there's no need to gc it in this situationl. + // 2.1 recover unlock op + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 2.1"); + ASSERT_EQ(OB_SUCCESS, + obj_lock_map.recover_obj_lock(DEFAULT_OUT_TRANS_UNLOCK_OP)); + // 2.2 check obj lock exists + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 2.2"); + ASSERT_EQ(OB_SUCCESS, obj_lock_map.get_obj_lock_with_ref_( + DEFAULT_TABLE_LOCK_ID, obj_lock)); + ASSERT_NE(nullptr, obj_lock); + // 2.3 recover lock op + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 2.3"); + ASSERT_EQ(OB_SUCCESS, + obj_lock_map.recover_obj_lock(DEFAULT_OUT_TRANS_LOCK_OP)); + // 2.4 verify obj lock status by log + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 2.4"); + obj_lock->print(); + // 2.5 commit lock op + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 2.5"); + ASSERT_EQ(OB_SUCCESS, obj_lock_map.update_lock_status( + DEFAULT_OUT_TRANS_LOCK_OP, commit_version, + commit_scn, COMMIT_LOCK_OP_STATUS)); + // 2.6 commit unlock op + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 2.6"); + ASSERT_EQ(OB_SUCCESS, obj_lock_map.update_lock_status( + DEFAULT_OUT_TRANS_UNLOCK_OP, commit_version, + commit_scn, COMMIT_LOCK_OP_STATUS)); + // 2.7 verify obj lock status by log + // You will find that obj lock is still there, due to we move the + // gc process to the gc thread in the backend. However, the obj lock + // is empty, i.e. there's no lock ops in it. Because the compaction + // process will execute directly if the lock op which will be committed + // is an out trans unlock lock op. + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 2.7"); + obj_lock->print(); + // revert obj lock + obj_lock_map.lock_map_.revert(obj_lock); + // 2.8 check obj lock status + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 2.8"); + table_has_obj_lock(DEFAULT_TABLE, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + // 2.9 wake up gc thread + LOG_INFO("ObOBJLockGCAfterRestartTest::op_list_gc_with_mock_lock_map 2.9"); + wakeup_gc_thread(); + // wait gc thread to recycle obj lock + sleep(2); + table_has_obj_lock(DEFAULT_TABLE, has_obj_lock); + ASSERT_FALSE(has_obj_lock); +} + +TEST_F(ObOBJLockGCAfterRestartTest, obj_lock_gc_after_restart) +{ + // You may find that gc thread tried to compact + // the obj lock of this table in force compaction + // mode from the log file. It means that the gc + // thread start successfully before leader comes + // back to work. + // (The gc thread will compact table lock ops in + // force mode only when it's called during the + // period when a follower is switching to leader) + LOG_INFO("ObOBJLockGCAfterRestartTest::obj_lock_gc_after_restart"); + uint64_t table_id; + bool has_obj_lock; + // 1. check obj lock status of table t_one_part + LOG_INFO("ObOBJLockGCAfterRestartTest::obj_lock_gc_after_restart 1"); + get_table_id("t_one_part", table_id); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_TRUE(has_obj_lock); + // 2. check obj lock status of table t_multi_part + LOG_INFO("ObOBJLockGCAfterRestartTest::obj_lock_gc_after_restart 2"); + get_table_id("t_multi_part", table_id); + table_has_obj_lock(table_id, has_obj_lock); + ASSERT_FALSE(has_obj_lock); +} +} // namespace unittest +} // namespace oceanbase + + +int main(int argc, char **argv) +{ + // std::string gtest_file_name = std::string(TEST_FILE_NAME) + "_gtest.log"; + // oceanbase::unittest::init_gtest_output(gtest_file_name); + ObSimpleServerRestartHelper restart_helper(argc, + argv, + TEST_FILE_NAME, + BORN_CASE_NAME, + RESTART_CASE_NAME); + restart_helper.set_sleep_sec(10); // sleep 10s for schema restore + OB_LOGGER.set_mod_log_levels("storage.tablelock:debug"); // it seems doesn't work + OB_LOGGER.set_enable_async_log(false); + return restart_helper.run(); +} diff --git a/src/logservice/ob_log_base_type.h b/src/logservice/ob_log_base_type.h index f0639b3e2e..c161a4b7c8 100644 --- a/src/logservice/ob_log_base_type.h +++ b/src/logservice/ob_log_base_type.h @@ -90,6 +90,8 @@ enum ObLogBaseType // for dup table trans DUP_TABLE_LOG_BASE_TYPE = 26, + // for obj lock garbage collect service + OBJ_LOCK_GARBAGE_COLLECT_SERVICE_LOG_BASE_TYPE = 27, // pay attention!!! // add log type in log_base_type_to_string // max value @@ -158,6 +160,8 @@ int log_base_type_to_string(const ObLogBaseType log_type, strncpy(str ,"PADDING_LOG_ENTRY", str_len); } else if (log_type == DUP_TABLE_LOG_BASE_TYPE) { strncpy(str ,"DUP_TABLE", str_len); + } else if (log_type == OBJ_LOCK_GARBAGE_COLLECT_SERVICE_LOG_BASE_TYPE) { + strncpy(str ,"OBJ_LOCK_GARBAGE_COLLECT_SERVICE", str_len); } else { ret = OB_INVALID_ARGUMENT; } diff --git a/src/observer/mysql/ob_query_retry_ctrl.cpp b/src/observer/mysql/ob_query_retry_ctrl.cpp index c1ea672517..ff06863bd1 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.cpp +++ b/src/observer/mysql/ob_query_retry_ctrl.cpp @@ -209,7 +209,8 @@ public: v.no_more_test_ = true; v.retry_type_ = RETRY_TYPE_NONE; if (OB_ERR_INSUFFICIENT_PX_WORKER == v.err_ || - OB_ERR_EXCLUSIVE_LOCK_CONFLICT == v.err_) { + OB_ERR_EXCLUSIVE_LOCK_CONFLICT == v.err_ || + OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT == v.err_) { v.client_ret_ = v.err_; } else if (is_try_lock_row_err(v.session_.get_retry_info().get_last_query_retry_err())) { // timeout caused by locking, should return OB_ERR_EXCLUSIVE_LOCK_CONFLICT diff --git a/src/sql/engine/cmd/ob_lock_table_executor.cpp b/src/sql/engine/cmd/ob_lock_table_executor.cpp index 0dc2c4b513..30a601a61b 100644 --- a/src/sql/engine/cmd/ob_lock_table_executor.cpp +++ b/src/sql/engine/cmd/ob_lock_table_executor.cpp @@ -12,7 +12,6 @@ #define USING_LOG_PREFIX SQL_ENG #include "sql/engine/cmd/ob_lock_table_executor.h" - #include "sql/resolver/ddl/ob_lock_table_stmt.h" #include "sql/engine/ob_exec_context.h" @@ -35,10 +34,38 @@ int ObLockTableExecutor::execute(ObExecContext &ctx, } else if (is_mysql_mode()) { LOG_DEBUG("mysql mode do nothing"); } else { - if (OB_FAIL(ObSqlTransControl::lock_table(ctx, - stmt.get_table_id(), - stmt.get_lock_mode()))) { - LOG_WARN("fail lock table", K(ret), K(stmt.get_lock_mode()), K(stmt.get_table_id())); + const common::ObIArray &table_items = stmt.get_table_items(); + if (OB_UNLIKELY(table_items.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("there's no table in the stmt", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < table_items.count(); ++i) { + TableItem *table_item = table_items.at(i); + + // handle compatibility + // If the version of cluster is updated than 4.1, it can use + // 'wait n' or 'no wait' grammar. Otherwise, it should follow + // the previous logic (i.e. try lock until trx / sql timeout) + int64_t wait_lock_seconds; + if (GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_4_1_0_0) { + wait_lock_seconds = stmt.get_wait_lock_seconds(); + } else { + wait_lock_seconds = -1; + } + + if (OB_FAIL(ObSqlTransControl::lock_table( + ctx, table_item->ref_id_, table_item->part_ids_, + stmt.get_lock_mode(), wait_lock_seconds))) { + if ((OB_TRY_LOCK_ROW_CONFLICT == ret || + OB_ERR_EXCLUSIVE_LOCK_CONFLICT == ret || + OB_ERR_SHARED_LOCK_CONFLICT == ret) && + wait_lock_seconds >= 0) { + ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT; + } + LOG_WARN("fail lock table", K(ret), K(stmt.get_lock_mode()), + K(wait_lock_seconds), K(table_item->ref_id_), + K(table_item->part_ids_)); + } } bool explicit_trans = session->has_explicit_start_trans(); bool ac = false; @@ -57,6 +84,5 @@ int ObLockTableExecutor::execute(ObExecContext &ctx, return ret; } - -} // sql -} // oceanbase +} // namespace sql +} // namespace oceanbase diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 6019be9d6e..3668e2c319 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -16,6 +16,7 @@ #define USING_LOG_PREFIX SQL_EXE #include "share/ob_schema_status_proxy.h" // ObSchemaStatusProxy +#include "share/schema/ob_tenant_schema_service.h" #include "sql/ob_sql_trans_control.h" #include "sql/engine/ob_physical_plan.h" #include "sql/engine/ob_physical_plan_ctx.h" @@ -1090,7 +1091,9 @@ int ObSqlTransControl::acquire_tx_if_need_(transaction::ObTransService *txs, ObS int ObSqlTransControl::lock_table(ObExecContext &exec_ctx, const uint64_t table_id, - const ObTableLockMode lock_mode) + const ObIArray &part_ids, + const ObTableLockMode lock_mode, + const int64_t wait_lock_seconds) { int ret = OB_SUCCESS; ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx); @@ -1106,26 +1109,58 @@ int ObSqlTransControl::lock_table(ObExecContext &exec_ctx, OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid()), *session); } ObTxParam tx_param; - ObLockTableRequest arg; OZ (build_tx_param_(session, tx_param)); // calculate lock table timeout int64_t lock_timeout_us = 0; { int64_t stmt_expire_ts = 0; int64_t tx_expire_ts = 0; + int64_t lock_wait_expire_ts = 0; OX (stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session)); OZ (get_trans_expire_ts(*session, tx_expire_ts)); - OX (lock_timeout_us = MAX(200L, MIN(stmt_expire_ts, tx_expire_ts) - ObTimeUtility::current_time())); + + if (wait_lock_seconds < 0) { + // It means that there's no opt about wait or no wait, + // so we just use the deafult timeout config here. + OX (lock_timeout_us = MAX(200L, MIN(stmt_expire_ts, tx_expire_ts) - + ObTimeUtility::current_time())); + } else { + // The priority of stmt_expire_ts and tx_expire_ts is higher than + // wait N. So if the statement or transaction is timeout, it should + // return error code, rather than wait until N seconds. + lock_wait_expire_ts = + MIN3(session->get_query_start_time() + wait_lock_seconds * 1000 * 1000, stmt_expire_ts, tx_expire_ts); + OX (lock_timeout_us = lock_wait_expire_ts - ObTimeUtility::current_time()); + lock_timeout_us = lock_timeout_us < 0 ? 0 : lock_timeout_us; + } } - arg.table_id_ = table_id; - arg.owner_id_ = 0; - arg.lock_mode_ = lock_mode; - arg.op_type_ = ObTableLockOpType::IN_TRANS_COMMON_LOCK; - arg.timeout_us_ = lock_timeout_us; - OZ (lock_service->lock_table(*session->get_tx_desc(), - tx_param, - arg), - tx_param, table_id, lock_mode, lock_timeout_us); + if (part_ids.empty()) { + ObLockTableRequest arg; + arg.table_id_ = table_id; + arg.owner_id_ = 0; + arg.lock_mode_ = lock_mode; + arg.op_type_ = ObTableLockOpType::IN_TRANS_COMMON_LOCK; + arg.timeout_us_ = lock_timeout_us; + + OZ (lock_service->lock_table(*session->get_tx_desc(), + tx_param, + arg), + tx_param, table_id, lock_mode, lock_timeout_us); + } else { + ObLockPartitionRequest arg; + arg.table_id_ = table_id; + arg.owner_id_ = 0; + arg.lock_mode_ = lock_mode; + arg.op_type_ = ObTableLockOpType::IN_TRANS_COMMON_LOCK; + arg.timeout_us_ = lock_timeout_us; + for (int64_t i = 0; i < part_ids.count() && OB_SUCC(ret); ++i) { + arg.part_object_id_ = part_ids.at(i); + OZ(lock_service->lock_partition_or_subpartition(*session->get_tx_desc(), + tx_param, arg), + tx_param, table_id, lock_mode, lock_timeout_us); + } + } + return ret; } @@ -1148,6 +1183,7 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id, || !addr.is_valid() || max_stale_time_us <= 0) { ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ls_id), K(addr), K(max_stale_time_us)); } else if (observer::ObServer::get_instance().get_self() == addr) { storage::ObLSService *ls_svr = MTL(storage::ObLSService *); diff --git a/src/sql/ob_sql_trans_control.h b/src/sql/ob_sql_trans_control.h index 425bbd27b0..542ac0a35f 100644 --- a/src/sql/ob_sql_trans_control.h +++ b/src/sql/ob_sql_trans_control.h @@ -233,7 +233,9 @@ public: static int get_trans_result(ObExecContext &exec_ctx, transaction::ObTxExecResult &trans_result); static int lock_table(ObExecContext &exec_ctx, const uint64_t table_id, - const transaction::tablelock::ObTableLockMode lock_mode); + const ObIArray &part_ids, + const transaction::tablelock::ObTableLockMode lock_mode, + const int64_t wait_lock_seconds); static void clear_xa_branch(const transaction::ObXATransID &xid, transaction::ObTxDesc *&tx_desc); static int check_ls_readable(const uint64_t tenant_id, const share::ObLSID &ls_id, diff --git a/src/sql/resolver/ddl/ob_lock_table_resolver.cpp b/src/sql/resolver/ddl/ob_lock_table_resolver.cpp index d5a7157896..4405281e61 100644 --- a/src/sql/resolver/ddl/ob_lock_table_resolver.cpp +++ b/src/sql/resolver/ddl/ob_lock_table_resolver.cpp @@ -14,6 +14,7 @@ #include "ob_lock_table_resolver.h" #include "ob_lock_table_stmt.h" #include "sql/resolver/dml/ob_dml_stmt.h" +#include "sql/parser/ob_parser_utils.h" namespace oceanbase { @@ -55,9 +56,9 @@ int ObLockTableResolver::resolve_mysql_mode(const ParseNode &parse_tree) int ObLockTableResolver::resolve_oracle_mode(const ParseNode &parse_tree) { int ret = OB_SUCCESS; - ObLockTableStmt *lock_stmt = NULL; + ObLockTableStmt *lock_stmt = nullptr; - if (2 != parse_tree.num_child_) { + if (3 != parse_tree.num_child_) { ret = OB_INVALID_ARGUMENT; LOG_WARN("wrong node number", K(ret), K(parse_tree.num_child_)); } else if (OB_ISNULL(parse_tree.children_) @@ -72,62 +73,66 @@ int ObLockTableResolver::resolve_oracle_mode(const ParseNode &parse_tree) stmt_ = lock_stmt; } - // 1. resolve table items + // 1. resolve table item if (OB_SUCC(ret)) { - ParseNode *table_node = parse_tree.children_[TABLE]; + ParseNode *table_node = parse_tree.children_[TABLE_LIST]; if (OB_ISNULL(table_node)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid parse tree", K(ret)); - } else if (OB_FAIL(resolve_table_relation_node(*table_node))) { + } else if (OB_FAIL(resolve_table_list(*table_node))) { LOG_WARN("resolve table failed", K(ret)); } } + // 2. resolve lock mode if (OB_SUCC(ret)) { if (OB_FAIL(resolve_lock_mode(*parse_tree.children_[LOCK_MODE]))) { - LOG_WARN("resolve where clause failed", K(ret)); + LOG_WARN("resolve lock mode failed", K(ret)); + } + } + // 3. resolve wait + if (OB_SUCC(ret)) { + // this node maybe null if user didn't input opt about wait + if (OB_NOT_NULL(parse_tree.children_[WAIT])) { + if (OB_FAIL(resolve_wait_lock(*parse_tree.children_[WAIT]))) { + LOG_WARN("resolve wait opt for table lock failed", K(ret)); + } } } - return ret; } -// TODO: yanyuan.cxf only deal with one table name. -int ObLockTableResolver::resolve_table_relation_node(const ParseNode &parse_tree) +int ObLockTableResolver::resolve_table_list(const ParseNode &table_list) { int ret = OB_SUCCESS; - // TODO: yanyuan.cxf release TableItem - ObString table_name; - ObString database_name; - const ObTableSchema *table_schema = NULL; - int64_t tenant_id = session_info_->get_effective_tenant_id(); - uint64_t database_id = OB_INVALID_ID; ObLockTableStmt *lock_stmt = get_lock_table_stmt(); + TableItem *table_item = nullptr; - if (OB_ISNULL(lock_stmt)) { + if (OB_UNLIKELY(T_TABLE_REFERENCES != table_list.type_ && + T_RELATION_FACTOR != table_list.type_) || + OB_UNLIKELY(OB_ISNULL(table_list.children_)) || + OB_UNLIKELY(table_list.num_child_ < 1)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(table_list.type_), K(table_list.num_child_)); + } else if (OB_ISNULL(lock_stmt)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("lock stmt should not be null", K(ret)); - } else if (OB_ISNULL(schema_checker_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("schema_checker should not be null", K(ret)); - } else if (OB_FAIL(ObDDLResolver::resolve_table_relation_node(&parse_tree, - table_name, - database_name))) { - LOG_WARN("failed to resolve table relation node", K(ret)); - } else if (OB_FAIL(schema_checker_->get_database_id(tenant_id, - database_name, - database_id))) { - LOG_WARN("failed to get database id", K(ret)); - } else if (OB_FAIL(schema_checker_->get_table_schema(tenant_id, - database_name, - table_name, - false, - table_schema))){ - LOG_WARN("failed to get table schema", K(ret)); - } else if (OB_ISNULL(table_schema)) { - LOG_WARN("null table schema", K(ret)); - } else { - lock_stmt->set_table_id(table_schema->get_table_id()); + LOG_WARN("invalid lock table stmt", K(lock_stmt)); + } + + for (int64_t i = 0; OB_SUCC(ret) && i < table_list.num_child_; ++i) { + const ParseNode *table_node = table_list.children_[i]; + const ObTableSchema *table_schema = nullptr; + if (OB_ISNULL(table_node)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table node is null"); + } else if (OB_FAIL(ObDMLResolver::resolve_table(*table_node, table_item))) { + LOG_WARN("failed to resolve table", K(ret)); + } else if (table_item->is_function_table() || table_item->is_json_table()) {//兼容oracle行为 + ret = OB_WRONG_TABLE_NAME; + LOG_WARN("invalid table name", K(ret)); + } else { + LOG_DEBUG("succ to add lock table item", KPC(table_item)); + } } return ret; } @@ -145,5 +150,22 @@ int ObLockTableResolver::resolve_lock_mode(const ParseNode &parse_tree) return ret; } +int ObLockTableResolver::resolve_wait_lock(const ParseNode &parse_tree) +{ + int ret = OB_SUCCESS; + ObLockTableStmt *lock_stmt = get_lock_table_stmt(); + int64_t wait_lock_seconds = parse_tree.value_; + if (OB_ISNULL(lock_stmt)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("lock stmt should not be null"); + } else if (wait_lock_seconds < 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("lock wait time should not be negative", K(wait_lock_seconds)); + } else { + lock_stmt->set_wait_lock_seconds(parse_tree.value_); + } + return ret; +} + } // namespace sql } // namespace oceanbase diff --git a/src/sql/resolver/ddl/ob_lock_table_resolver.h b/src/sql/resolver/ddl/ob_lock_table_resolver.h index f44aff449c..a9eaa953b1 100644 --- a/src/sql/resolver/ddl/ob_lock_table_resolver.h +++ b/src/sql/resolver/ddl/ob_lock_table_resolver.h @@ -14,7 +14,7 @@ #define OCEANBASE_SQL_RESOLVER_DML_OB_LOCK_TABLE_RESOLVER_ #include "sql/resolver/ddl/ob_lock_table_stmt.h" -#include "sql/resolver/ddl/ob_ddl_resolver.h" +#include "sql/resolver/dml/ob_dml_resolver.h" namespace oceanbase { @@ -25,15 +25,15 @@ class ObLockTableStmt; // NOTE: yanyuan.cxf LOCK TABLE is dml at oracle, but it does not have // SQL plan, so we treat it as ddl operator. -class ObLockTableResolver : public ObDDLResolver +class ObLockTableResolver : public ObDMLResolver { public: - static const int64_t TABLE = 0; /* 0. table node */ - static const int64_t LOCK_MODE = 1; /* 1. lock mode node */ - static const int64_t WAIT_NODE = 2; /* 2. wait node? */ + static const int64_t TABLE_LIST = 0; + static const int64_t LOCK_MODE = 1; + static const int64_t WAIT = 2; public: explicit ObLockTableResolver(ObResolverParams ¶ms) - : ObDDLResolver(params) + : ObDMLResolver(params) {} virtual ~ObLockTableResolver() {} @@ -42,8 +42,9 @@ public: private: int resolve_mysql_mode(const ParseNode &parse_tree); int resolve_oracle_mode(const ParseNode &parse_tree); - int resolve_table_relation_node(const ParseNode &parse_tree); + int resolve_table_list(const ParseNode &table_list); int resolve_lock_mode(const ParseNode &parse_tree); + int resolve_wait_lock(const ParseNode &parse_tree); DISALLOW_COPY_AND_ASSIGN(ObLockTableResolver); }; diff --git a/src/sql/resolver/ddl/ob_lock_table_stmt.h b/src/sql/resolver/ddl/ob_lock_table_stmt.h index a331f76b59..83d5ade22a 100644 --- a/src/sql/resolver/ddl/ob_lock_table_stmt.h +++ b/src/sql/resolver/ddl/ob_lock_table_stmt.h @@ -12,37 +12,36 @@ #ifndef OCEANBASE_SQL_RESOLVER_DML_OB_LOCK_TABLE_STMT_ #define OCEANBASE_SQL_RESOLVER_DML_OB_LOCK_TABLE_STMT_ -#include "sql/resolver/ddl/ob_ddl_stmt.h" +#include "sql/resolver/dml/ob_dml_stmt.h" #include "sql/resolver/ob_cmd.h" +#include "share/schema/ob_schema_struct.h" namespace oceanbase { namespace sql { -class ObLockTableStmt : public ObStmt, public ObICmd +class ObLockTableStmt : public ObDMLStmt, public ObICmd { public: explicit ObLockTableStmt() - : ObStmt(stmt::T_LOCK_TABLE), + : ObDMLStmt(stmt::T_LOCK_TABLE), lock_mode_(0), - table_id_(0) + wait_lock_seconds_(-1) {} virtual ~ObLockTableStmt() {} virtual int get_cmd_type() const { return get_stmt_type(); } void set_lock_mode(const int64_t lock_mode) { lock_mode_ = lock_mode; } - void set_table_id(const uint64_t table_id) { table_id_ = table_id; } + void set_wait_lock_seconds(const int64_t wait_lock_seconds) { wait_lock_seconds_ = wait_lock_seconds; } int64_t get_lock_mode() const { return lock_mode_; } - uint64_t get_table_id() const { return table_id_; } + int64_t get_wait_lock_seconds() const { return wait_lock_seconds_; } private: int64_t lock_mode_; - uint64_t table_id_; + int64_t wait_lock_seconds_; DISALLOW_COPY_AND_ASSIGN(ObLockTableStmt); }; - - } // namespace sql } // namespace oceanbase diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index 54e23cb77b..265900e04d 100644 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -173,6 +173,7 @@ int ObLS::init(const share::ObLSID &ls_id, REGISTER_TO_LOGSERVICE(logservice::DDL_LOG_BASE_TYPE, &ls_ddl_log_handler_); REGISTER_TO_LOGSERVICE(logservice::KEEP_ALIVE_LOG_BASE_TYPE, &keep_alive_ls_handler_); REGISTER_TO_LOGSERVICE(logservice::GC_LS_LOG_BASE_TYPE, &gc_handler_); + REGISTER_TO_LOGSERVICE(logservice::OBJ_LOCK_GARBAGE_COLLECT_SERVICE_LOG_BASE_TYPE, &lock_table_); REGISTER_TO_LOGSERVICE(logservice::RESERVED_SNAPSHOT_LOG_BASE_TYPE, &reserved_snapshot_clog_handler_); REGISTER_TO_LOGSERVICE(logservice::MEDIUM_COMPACTION_LOG_BASE_TYPE, &medium_compaction_clog_handler_); @@ -641,6 +642,7 @@ void ObLS::destroy() UNREGISTER_FROM_LOGSERVICE(logservice::DDL_LOG_BASE_TYPE, &ls_ddl_log_handler_); UNREGISTER_FROM_LOGSERVICE(logservice::KEEP_ALIVE_LOG_BASE_TYPE, &keep_alive_ls_handler_); UNREGISTER_FROM_LOGSERVICE(logservice::GC_LS_LOG_BASE_TYPE, &gc_handler_); + UNREGISTER_FROM_LOGSERVICE(logservice::OBJ_LOCK_GARBAGE_COLLECT_SERVICE_LOG_BASE_TYPE, &lock_table_); UNREGISTER_FROM_LOGSERVICE(logservice::RESERVED_SNAPSHOT_LOG_BASE_TYPE, &reserved_snapshot_clog_handler_); UNREGISTER_FROM_LOGSERVICE(logservice::MEDIUM_COMPACTION_LOG_BASE_TYPE, &medium_compaction_clog_handler_); if (ls_meta_.ls_id_ == IDS_LS) { diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index c83fa6cc45..cb9b1efcda 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -486,6 +486,12 @@ public: // int get_lock_op_iter(const ObLockID &lock_id, // ObLockOpIterator &iter); DELEGATE_WITH_RET(lock_table_, get_lock_op_iter, int); + // check and clear lock ops and obj locks in this ls (or lock_table) + // @param[in] force_compact, if it's set to true, the gc thread will + // force compact unlock op which is committed, even though there's + // no paired lock op. + // int check_and_clear_obj_lock(const bool force_compact) + DELEGATE_WITH_RET(lock_table_, check_and_clear_obj_lock, int); // set the member_list of log_service // @param [in] member_list, the member list to be set. diff --git a/src/storage/tablelock/ob_lock_memtable.cpp b/src/storage/tablelock/ob_lock_memtable.cpp index d5863af0d5..44bd76e338 100644 --- a/src/storage/tablelock/ob_lock_memtable.cpp +++ b/src/storage/tablelock/ob_lock_memtable.cpp @@ -593,6 +593,18 @@ int ObLockMemtable::get_lock_op_iter(const ObLockID &lock_id, return ret; } +int ObLockMemtable::check_and_clear_obj_lock(const bool force_compact) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + TABLELOCK_LOG(WARN, "ObLockMemtable not inited.", K(ret)); + } else if (OB_FAIL(obj_lock_map_.check_and_clear_obj_lock(force_compact))) { + TABLELOCK_LOG(WARN, "check and clear obj lock failed", K(ret)); + } + return ret; +} + int ObLockMemtable::scan( const ObTableIterParam ¶m, ObTableAccessContext &context, diff --git a/src/storage/tablelock/ob_lock_memtable.h b/src/storage/tablelock/ob_lock_memtable.h index 810ef2fcd1..b3e9192c6d 100644 --- a/src/storage/tablelock/ob_lock_memtable.h +++ b/src/storage/tablelock/ob_lock_memtable.h @@ -103,6 +103,14 @@ public: int get_lock_op_iter(const ObLockID &lock_id, ObLockOpIterator &iter); + // Iterate obj lock in lock map, and check 2 status of it: + // 1. Check whether the lock ops in the obj lock can be compacted. + // If it can be compacted (i.e. there're paired lock op and unlock + // op), remove them from the obj lock and recycle resources. + // 2. Check whether the obj lock itself is empty. + // If it's empty (i.e. no lock ops in it), remove it from the lock + // map and recycle resources. + int check_and_clear_obj_lock(const bool force_compact); // ================ INHERITED FROM ObIMemtable =============== // We need to inherient the memtable method for merge process to iterate the // lock for dumping the lock table. diff --git a/src/storage/tablelock/ob_lock_table.cpp b/src/storage/tablelock/ob_lock_table.cpp index 6919925473..e480739971 100644 --- a/src/storage/tablelock/ob_lock_table.cpp +++ b/src/storage/tablelock/ob_lock_table.cpp @@ -16,6 +16,7 @@ #include "storage/tx/ob_trans_define_v4.h" #include "storage/tablelock/ob_table_lock_common.h" #include "storage/tablelock/ob_lock_table.h" +#include "storage/tablelock/ob_table_lock_service.h" #include "common/ob_tablet_id.h" // ObTabletID #include "share/ob_rpc_struct.h" // ObBatchCreateTabletArg @@ -727,6 +728,57 @@ int ObLockTable::admin_update_lock_op(const ObTableLockOp &op_info, return ret; } +int ObLockTable::check_and_clear_obj_lock(const bool force_compact) +{ + int ret = OB_SUCCESS; + ObTableHandleV2 handle; + ObLockMemtable *lock_memtable = nullptr; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObLockTable is not inited", K(ret)); + } else if (OB_FAIL(get_lock_memtable(handle))) { + LOG_WARN("get lock memtable failed", K(ret)); + } else if (OB_FAIL(handle.get_lock_memtable(lock_memtable))) { + LOG_WARN("get lock memtable from lock handle failed", K(ret)); + } else if (OB_FAIL(lock_memtable->check_and_clear_obj_lock(force_compact))) { + LOG_WARN("check and clear obj lock failed", K(ret)); + } + return ret; +} + +int ObLockTable::switch_to_leader() +{ + int ret = OB_SUCCESS; + ObTableLockService::ObOBJLockGarbageCollector *obj_lock_gc = nullptr; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObLockTable is not inited", K(ret)); + } else if (OB_FAIL(MTL(ObTableLockService *) + ->get_obj_lock_garbage_collector(obj_lock_gc))) { + LOG_WARN("can not get ObOBJLockGarbageCollector", K(ret)); + } else if (OB_ISNULL(obj_lock_gc)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ObOBJLockGarbageCollector is null", K(ret)); + } else { + if (OB_NOT_NULL(parent_)) { + LOG_INFO("start to check and clear obj lock when switch to leader", K(ret), + K(parent_->get_ls_id())); + } + ret = obj_lock_gc->obj_lock_gc_thread_pool_.commit_task_ignore_ret( + [this]() { return check_and_clear_obj_lock(true); }); + } + + if (OB_FAIL(ret)) { + if (OB_ISNULL(parent_)) { + LOG_WARN("parent ls of ObLockTable is null", K(ret)); + } else { + LOG_WARN("collect obj lock garbage when switch to leader failed", K(ret), + K(parent_->get_ls_id())); + } + } + return ret; +} + } // tablelock } // transaction } // oceanbase diff --git a/src/storage/tablelock/ob_lock_table.h b/src/storage/tablelock/ob_lock_table.h index 8b5ce8aa66..11691627f8 100644 --- a/src/storage/tablelock/ob_lock_table.h +++ b/src/storage/tablelock/ob_lock_table.h @@ -17,6 +17,7 @@ #include "lib/worker.h" #include "storage/ob_i_table.h" #include "storage/tablelock/ob_obj_lock.h" +#include "logservice/ob_log_base_type.h" namespace oceanbase { @@ -62,7 +63,9 @@ struct ObLockParam; class ObTableLockOp; class ObLockMemtable; -class ObLockTable +class ObLockTable : public logservice::ObIReplaySubHandler, + public logservice::ObIRoleChangeSubHandler, + public logservice::ObICheckpointSubHandler { public: ObLockTable() @@ -129,6 +132,24 @@ public: const share::SCN &commit_version, const share::SCN &commit_scn, const ObTableLockOpStatus status); + // check and clear paired lock ops which can be compacted, + // and clear empty obj locks to recycle resources. + // See the ObLockMemtable::check_and_clear_obj_lock for deatails. + int check_and_clear_obj_lock(const bool force_compact); + // for replay + int replay(const void *buffer, + const int64_t nbytes, + const palf::LSN &lsn, + const share::SCN &scn) override { return OB_SUCCESS; } + // for checkpoint + share::SCN get_rec_scn() override { return share::SCN::max_scn(); } + int flush(share::SCN &rec_scn) override { return OB_SUCCESS; } + // for role change + void switch_to_follower_forcedly() override{}; + int switch_to_leader() override; + int switch_to_follower_gracefully() override { return OB_SUCCESS; } + int resume_leader() override { return OB_SUCCESS; } + private: // We use the method to recover the lock_table for reboot. int restore_lock_table_(storage::ObITable &sstable); diff --git a/src/storage/tablelock/ob_obj_lock.cpp b/src/storage/tablelock/ob_obj_lock.cpp index 3d94210419..b46ce0efe6 100644 --- a/src/storage/tablelock/ob_obj_lock.cpp +++ b/src/storage/tablelock/ob_obj_lock.cpp @@ -181,8 +181,6 @@ int ObOBJLock::slow_lock( ObTableLockOpLinkNode *lock_op_node = NULL; uint64_t tenant_id = MTL_ID(); bool conflict_with_dml_lock = false; - bool unused_is_compacted = false; - const bool is_force_compact = true; ObMemAttr attr(tenant_id, "ObTableLockOp"); // 1. check lock conflict. // 2. record lock op. @@ -193,8 +191,7 @@ int ObOBJLock::slow_lock( } else if (OB_FAIL(check_allow_lock_(lock_op, lock_mode_in_same_trans, conflict_tx_set, - conflict_with_dml_lock, - allocator))) { + conflict_with_dml_lock))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { LOG_WARN("check allow lock failed", K(ret), K(lock_op)); } @@ -399,6 +396,8 @@ int ObOBJLock::update_lock_status(const ObTableLockOp &lock_op, allocator, unused_is_compacted))) { LOG_WARN("compact tablelock failed", K(tmp_ret), K(lock_op)); + } else { + drop_op_list_if_empty_(lock_op.lock_mode_, op_list, allocator); } } if (OB_SUCC(ret) && @@ -453,8 +452,6 @@ int ObOBJLock::fast_lock( { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - bool is_compacted = false; - const bool is_force_compact = true; { // lock first time RDLockGuard guard(rwlock_); @@ -469,31 +466,6 @@ int ObOBJLock::fast_lock( LOG_DEBUG("succeed create lock ", K(lock_op)); } } - // compact if need. - if (OB_TRY_LOCK_ROW_CONFLICT == ret) { - WRLockGuard guard(rwlock_); - if (is_deleted_) { - ret = OB_EAGAIN; - need_retry = false; - } else if(OB_TMP_FAIL(compact_tablelock_(allocator, is_compacted, is_force_compact))) { - // compact the obj lock to make sure the lock op that need compact will - // not block the lock operation next time. - LOG_WARN("compact tablelock failed", K(tmp_ret), K(lock_op)); - } - } - if (OB_TRY_LOCK_ROW_CONFLICT == ret && is_compacted) { - RDLockGuard guard(rwlock_); - if (OB_FAIL(try_fast_lock_(lock_op, - lock_mode_in_same_trans, - need_retry, - conflict_tx_set))) { - if (OB_TRY_LOCK_ROW_CONFLICT != ret) { - LOG_WARN("try fast lock failed", KR(ret), K(lock_op)); - } - } else { - LOG_DEBUG("succeed create lock ", K(lock_op)); - } - } // 1. need retry basic conditions if (ret == OB_TRY_LOCK_ROW_CONFLICT && !param.is_try_lock_) { need_retry = true; @@ -590,7 +562,6 @@ int ObOBJLock::lock( } } LOG_DEBUG("ObOBJLock::lock finish", K(ret), K(conflict_tx_set)); - return ret; } @@ -749,6 +720,17 @@ int ObOBJLock::get_table_lock_store_info( return ret; } +int ObOBJLock::compact_tablelock(ObMalloc &allocator, + bool &is_compacted, + const bool is_force) { + int ret = OB_SUCCESS; + WRLockGuard guard(rwlock_); + if (OB_FAIL(compact_tablelock_(allocator, is_compacted, is_force))) { + LOG_WARN("compact table lock failed", K(ret), K(is_compacted), K(is_force)); + } + return ret; +} + bool ObOBJLockMap::GetTableLockStoreInfoFunctor::operator() ( ObOBJLock *obj_lock) { @@ -828,6 +810,50 @@ int ObOBJLockMap::get_lock_op_iter(const ObLockID &lock_id, return ret; } +int ObOBJLockMap::check_and_clear_obj_lock(const bool force_compact) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + ObLockIDIterator lock_id_iter; + ObLockID lock_id; + ObOBJLock *obj_lock = nullptr; + bool is_compacted = false; + if (OB_FAIL(get_lock_id_iter(lock_id_iter))) { + TABLELOCK_LOG(WARN, "get lock id iterator failed", K(ret)); + } else { + do { + if (OB_FAIL(lock_id_iter.get_next(lock_id))) { + if (OB_ITER_END != ret) { + TABLELOCK_LOG(WARN, "fail to get next obj lock", K(ret)); + } + } else if (OB_FAIL(get_obj_lock_with_ref_(lock_id, obj_lock))) { + if (ret != OB_OBJ_LOCK_NOT_EXIST) { + TABLELOCK_LOG(WARN, "get obj lock failed", K(ret), K(lock_id)); + } else { + // Concurrent deletion may occur here. If it is found + // that the obj lock cannot be gotten, it will continue + // to iterate the remaining obj lock. + TABLELOCK_LOG(WARN, "obj lock has been deleted", K(ret), K(lock_id)); + ret = OB_SUCCESS; + continue; + } + } else { + if (OB_TMP_FAIL( + obj_lock->compact_tablelock(allocator_, is_compacted, force_compact))) { + TABLELOCK_LOG(WARN, "compact table lock failed", K(ret), K(tmp_ret), + K(lock_id)); + } + drop_obj_lock_if_empty_(lock_id, obj_lock); + if (OB_NOT_NULL(obj_lock)) { + lock_map_.revert(obj_lock); + } + } + } while (OB_SUCC(ret)); + } + ret = OB_ITER_END == ret ? OB_SUCCESS : ret; + return ret; +} + bool ObOBJLockMap::GetMinCommittedDDLLogtsFunctor::operator() ( ObOBJLock *obj_lock) { @@ -848,7 +874,6 @@ bool ObOBJLockMap::GetMinCommittedDDLLogtsFunctor::operator() ( return bool_ret; } - int ObOBJLock::check_op_allow_lock_(const ObTableLockOp &lock_op) { int ret = OB_SUCCESS; @@ -919,42 +944,6 @@ int ObOBJLock::check_allow_unlock_( return ret; } -int ObOBJLock::check_allow_lock_( - const ObTableLockOp &lock_op, - const ObTableLockMode &lock_mode_in_same_trans, - ObTxIDSet &conflict_tx_set, - bool &conflict_with_dml_lock, - ObMalloc &allocator) -{ - int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - bool is_compacted = false; - const bool is_force_compact = true; - if (OB_FAIL(check_allow_lock_(lock_op, - lock_mode_in_same_trans, - conflict_tx_set, - conflict_with_dml_lock))) { - if (OB_TRY_LOCK_ROW_CONFLICT != ret) { - LOG_WARN("check allow lock failed", K(ret), K(lock_op)); - } else if (OB_TMP_FAIL(compact_tablelock_(allocator, is_compacted, is_force_compact))) { - // compact the obj lock to make sure the lock op that need compact will - // not block the lock operation next time. - LOG_WARN("compact tablelock failed", K(tmp_ret), K(lock_op)); - } else if (!is_compacted) { - // do nothing - } else if (OB_FAIL(check_allow_lock_(lock_op, - lock_mode_in_same_trans, - conflict_tx_set, - conflict_with_dml_lock))) { - if (OB_TRY_LOCK_ROW_CONFLICT != ret) { - LOG_WARN("check allow lock failed", K(ret), K(lock_op)); - } - } - } - - return ret; -} - int ObOBJLock::check_allow_lock_( const ObTableLockOp &lock_op, const ObTableLockMode &lock_mode_in_same_trans, @@ -1018,43 +1007,17 @@ int ObOBJLock::check_allow_lock( const bool only_check_dml_lock) { int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - bool is_compacted = false; - const bool is_force_compact = true; if (OB_UNLIKELY(!lock_op.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument.", K(ret), K(lock_op)); } else { - // prevent from create new lock op. - // may be we need compact the lock. - { - RDLockGuard guard(rwlock_); - ret = check_allow_lock_(lock_op, - lock_mode_in_same_trans, - conflict_tx_set, - conflict_with_dml_lock, - include_finish_tx, - only_check_dml_lock); - } - // compact if need. - if (OB_TRY_LOCK_ROW_CONFLICT == ret) { - WRLockGuard guard(rwlock_); - if(OB_TMP_FAIL(compact_tablelock_(allocator, is_compacted, is_force_compact))) { - // compact the obj lock to make sure the lock op that need compact will - // not block the lock operation next time. - LOG_WARN("compact tablelock failed", K(tmp_ret), K(lock_op)); - } - } - // recheck if compacted - if (OB_TRY_LOCK_ROW_CONFLICT == ret && is_compacted) { - RDLockGuard guard(rwlock_); - ret = check_allow_lock_(lock_op, - lock_mode_in_same_trans, - conflict_tx_set, - conflict_with_dml_lock, - include_finish_tx, - only_check_dml_lock); - } + RDLockGuard guard(rwlock_); + ret = check_allow_lock_(lock_op, + lock_mode_in_same_trans, + conflict_tx_set, + conflict_with_dml_lock, + include_finish_tx, + only_check_dml_lock); } return ret; } @@ -1588,6 +1551,7 @@ int ObOBJLock::compact_tablelock_(ObTableLockOpList *&op_list, // do nothing } } + drop_op_list_if_empty_(unlock_op.lock_mode_, op_list, allocator); if (OB_OBJ_LOCK_NOT_EXIST == ret) { // compact finished succeed ret = OB_SUCCESS; @@ -1724,12 +1688,11 @@ int ObOBJLock::get_op_list(const ObTableLockMode mode, void ObOBJLock::drop_op_list_if_empty_( const ObTableLockMode mode, - const ObTableLockOpList *op_list, + ObTableLockOpList *&op_list, ObMalloc &allocator) { int ret = OB_SUCCESS; int map_index = 0; - ObTableLockOpList *tmp_op_list = NULL; if (OB_ISNULL(op_list) || OB_UNLIKELY(!is_lock_mode_valid(mode))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(op_list), K(mode)); @@ -1738,13 +1701,12 @@ void ObOBJLock::drop_op_list_if_empty_( ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid lock mode", K(ret), K(mode), K(map_index)); } else if (op_list->get_size() == 0) { - if (OB_ISNULL(tmp_op_list = map_[map_index])) { - // empty list, do nothing - } else { - tmp_op_list->~ObTableLockOpList(); - allocator.free(tmp_op_list); - map_[map_index] = NULL; - } + op_list->~ObTableLockOpList(); + allocator.free(op_list); + map_[map_index] = NULL; + // We have to set op_list to NULL to avoid + // visit the op_list by the pointer again + op_list = NULL; } } @@ -1931,10 +1893,6 @@ int ObOBJLockMap::lock( } else { LOG_DEBUG("succeed create lock ", K(lock_op)); } - if (OB_FAIL(ret) && OB_NOT_NULL(obj_lock)) { - // drop map, should never fail. - drop_obj_lock_if_empty_(lock_op.lock_id_, obj_lock); - } if (OB_NOT_NULL(obj_lock)) { lock_map_.revert(obj_lock); } @@ -1979,10 +1937,6 @@ int ObOBJLockMap::unlock( } else { LOG_DEBUG("succeed create unlock op ", K(lock_op)); } - if (OB_FAIL(ret) && OB_NOT_NULL(obj_lock)) { - // drop map, should never fail. - (void)drop_obj_lock_if_empty_(lock_op.lock_id_, obj_lock); - } if (OB_NOT_NULL(obj_lock)) { lock_map_.revert(obj_lock); } @@ -2019,8 +1973,6 @@ void ObOBJLockMap::remove_lock_record(const ObTableLockOp &lock_op) } } else { obj_lock->remove_lock_op(lock_op, allocator_); - // TODO: GC lock with gc thread. - drop_obj_lock_if_empty_(lock_op.lock_id_, obj_lock); lock_map_.revert(obj_lock); } LOG_DEBUG("ObOBJLockMap::remove_lock_record finish.", K(ret)); @@ -2085,10 +2037,6 @@ int ObOBJLockMap::recover_obj_lock(const ObTableLockOp &lock_op) } else { LOG_DEBUG("succeed create lock ", K(lock_op)); } - if (OB_FAIL(ret) && OB_NOT_NULL(obj_lock)) { - // drop map, should never fail. - drop_obj_lock_if_empty_(lock_op.lock_id_, obj_lock); - } if (OB_NOT_NULL(obj_lock)) { lock_map_.revert(obj_lock); } @@ -2199,8 +2147,7 @@ void ObOBJLockMap::drop_obj_lock_if_empty_( WRLockGuard guard(obj_lock->rwlock_); // lock and delete flag make sure no one insert a new op. // but maybe have deleted by another concurrent thread. - if (obj_lock->size_without_lock() == 0 && - !obj_lock->is_deleted()) { + if (obj_lock->size_without_lock() == 0 && !obj_lock->is_deleted()) { obj_lock->set_deleted(); if (OB_FAIL(get_obj_lock_with_ref_(lock_id, recheck_ptr))) { if (ret != OB_OBJ_LOCK_NOT_EXIST) { @@ -2211,6 +2158,8 @@ void ObOBJLockMap::drop_obj_lock_if_empty_( KP(recheck_ptr)); } else if (OB_FAIL(lock_map_.del(lock_id, obj_lock))) { LOG_WARN("remove obj lock from map failed. ", K(ret), K(lock_id)); + } else { + LOG_DEBUG("remove obj lock successfully", K(ret), K(lock_id), KPC(obj_lock)); } } } @@ -2222,7 +2171,6 @@ void ObOBJLockMap::drop_obj_lock_if_empty_( K(obj_lock)); } - } } } diff --git a/src/storage/tablelock/ob_obj_lock.h b/src/storage/tablelock/ob_obj_lock.h index ec3463a00f..64ec5b224a 100644 --- a/src/storage/tablelock/ob_obj_lock.h +++ b/src/storage/tablelock/ob_obj_lock.h @@ -121,7 +121,9 @@ public: int get_table_lock_store_info( ObIArray &store_arr, const share::SCN &freeze_scn); - + int compact_tablelock(ObMalloc &allocator, + bool &is_compacted, + const bool is_force = false); void reset(ObMalloc &allocator); void reset_without_lock(ObMalloc &allocator); int size_without_lock() const; @@ -139,12 +141,6 @@ public: private: void print_() const; void reset_(ObMalloc &allocator); - int check_allow_lock_( - const ObTableLockOp &lock_op, - const ObTableLockMode &lock_mode_in_same_trans, - ObTxIDSet &conflict_tx_set, - bool &conflict_with_dml_lock, - ObMalloc &allocator); int check_allow_lock_( const ObTableLockOp &lock_op, const ObTableLockMode &lock_mode_in_same_trans, @@ -203,7 +199,7 @@ private: ObTableLockOpList *&op_list); void drop_op_list_if_empty_( const ObTableLockMode mode, - const ObTableLockOpList *op_list, + ObTableLockOpList *&op_list, ObMalloc &allocator); void delete_lock_op_from_list_( const ObTableLockOp &lock_op, @@ -357,6 +353,8 @@ public: // @param[out] iter, the iterator returned. int get_lock_op_iter(const ObLockID &lock_id, ObLockOpIterator &iter); + // check all obj locks in the lock map, and clear it if it's empty. + int check_and_clear_obj_lock(const bool force_compact); private: class LockIDIterFunctor { diff --git a/src/storage/tablelock/ob_table_lock_service.cpp b/src/storage/tablelock/ob_table_lock_service.cpp index c55602d271..1abac18364 100644 --- a/src/storage/tablelock/ob_table_lock_service.cpp +++ b/src/storage/tablelock/ob_table_lock_service.cpp @@ -23,6 +23,7 @@ #include "share/schema/ob_tenant_schema_service.h" #include "storage/tx/ob_trans_deadlock_adapter.h" #include "storage/tx/ob_trans_service.h" +#include "storage/tx_storage/ob_ls_service.h" namespace oceanbase { @@ -91,6 +92,134 @@ ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType tas obj_id_ = obj_id; } +int64_t ObTableLockService::ObOBJLockGarbageCollector::GARBAGE_COLLECT_PRECISION = 100_ms; +int64_t ObTableLockService::ObOBJLockGarbageCollector::GARBAGE_COLLECT_EXEC_INTERVAL = 10_s; +int64_t ObTableLockService::ObOBJLockGarbageCollector::GARBAGE_COLLECT_TIMEOUT = 10_min; + +ObTableLockService::ObOBJLockGarbageCollector::ObOBJLockGarbageCollector() + : timer_(), + timer_handle_(), + last_success_timestamp_(0) {} +ObTableLockService::ObOBJLockGarbageCollector::~ObOBJLockGarbageCollector() {} + +int ObTableLockService::ObOBJLockGarbageCollector::start() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(obj_lock_gc_thread_pool_.init_and_start( + OBJ_LOCK_GC_THREAD_NUM))) { + LOG_WARN( + "fail to init and start gc thread pool for ObTableLockService::ObOBJLockGarbageCollector", + KR(ret)); + } else if (OB_FAIL(timer_.init_and_start(obj_lock_gc_thread_pool_, + GARBAGE_COLLECT_PRECISION, + "OBJLockGC"))) { + LOG_WARN("fail to init and start timer for ObTableLockService::ObOBJLockGarbageCollector", + K(ret), KPC(this)); + } else if (OB_FAIL(timer_.schedule_task_repeat( + timer_handle_, GARBAGE_COLLECT_EXEC_INTERVAL, + [this]() mutable { + int ret = OB_SUCCESS; + if (OB_FAIL(garbage_collect_for_all_ls_())) { + check_and_report_timeout_(); + LOG_WARN( + "check and clear obj lock failed, will retry later", + K(ret), K(last_success_timestamp_), KPC(this)); + } else { + last_success_timestamp_ = ObClockGenerator::getClock(); + LOG_DEBUG("check and clear obj lock successfully", K(ret), + K(last_success_timestamp_), KPC(this)); + } + return false; + }))) { + LOG_ERROR("ObTableLockService::ObOBJLockGarbageCollector schedules repeat task failed", + K(ret), KPC(this)); + } else { + LOG_INFO("ObTableLockService::ObOBJLockGarbageCollector starts successfully", K(ret), + KPC(this)); + } + return ret; +} + +void ObTableLockService::ObOBJLockGarbageCollector::stop() +{ + timer_handle_.stop(); + LOG_INFO("ObTableLockService::ObOBJLockGarbageCollector stops successfully", KPC(this)); +} + +void ObTableLockService::ObOBJLockGarbageCollector::wait() +{ + timer_handle_.wait(); + LOG_INFO("ObTableLockService::ObOBJLockGarbageCollector waits successfully", KPC(this)); +} + +int ObTableLockService::ObOBJLockGarbageCollector::garbage_collect_right_now() +{ + int ret = OB_SUCCESS; + if (!timer_.is_running()) { + ret = OB_NOT_INIT; + LOG_WARN("timer of ObTableLockService::ObOBJLockGarbageCollector is not running", K(ret)); + } else if (!timer_handle_.is_running()) { + ret = OB_NOT_INIT; + LOG_WARN("timer_handle of ObTableLockService::ObOBJLockGarbageCollector is not running", K(ret)); + } else if (OB_FAIL(timer_handle_.reschedule_after(10))) { + LOG_WARN("reschedule task for ObTableLockService::ObOBJLockGarbageCollector failed", K(ret)); + } + return ret; +} + +int ObTableLockService::ObOBJLockGarbageCollector::garbage_collect_for_all_ls_() +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + ObSharedGuard ls_iter_guard; + ObLSService *ls_service = nullptr; + ObLS *ls = nullptr; + + if (!timer_.is_running()) { + ret = OB_NOT_INIT; + LOG_WARN("timer of ObTableLockService::ObOBJLockGarbageCollector is not running", K(ret)); + } else if (!timer_handle_.is_running()) { + ret = OB_NOT_INIT; + LOG_WARN("timer_handle of ObTableLockService::ObOBJLockGarbageCollector is not running", K(ret)); + } else if (OB_ISNULL(ls_service = MTL(ObLSService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("mtl ObLSService should not be null", K(ret)); + } else if (ls_service->get_ls_iter(ls_iter_guard, + ObLSGetMod::TABLELOCK_MOD)) { + LOG_WARN("fail to get ls iterator", K(ret)); + } else { + do { + if (OB_FAIL(ls_iter_guard->get_next(ls))) { + if (OB_ITER_END != ret) { + LOG_WARN("get next iter failed", K(ret)); + } + } else if (OB_TMP_FAIL(ls->check_and_clear_obj_lock(false))) { + LOG_WARN("check and clear obj lock failed", K(ret), K(tmp_ret), + K(ls->get_ls_id())); + } else { + LOG_INFO("start to check and clear obj lock", K(ls->get_ls_id())); + } + } while (OB_SUCC(ret)); + } + ret = OB_ITER_END == ret ? OB_SUCCESS : ret; + return ret; +} + +void ObTableLockService::ObOBJLockGarbageCollector::check_and_report_timeout_() +{ + int ret = OB_SUCCESS; + int current_timestamp = ObClockGenerator::getClock(); + if (last_success_timestamp_ > current_timestamp) { + LOG_ERROR("last success timestamp is not correct", K(current_timestamp), + K(last_success_timestamp_), KPC(this)); + } else if (current_timestamp - last_success_timestamp_ > + GARBAGE_COLLECT_TIMEOUT && + last_success_timestamp_ != 0) { + LOG_ERROR("task failed too many times", K(current_timestamp), + K(last_success_timestamp_), KPC(this)); + } +} + bool ObTableLockService::ObTableLockCtx::is_timeout() const { return ObTimeUtility::current_time() >= abs_timeout_ts_; @@ -182,15 +311,18 @@ int ObTableLockService::init() int ObTableLockService::start() { + obj_lock_garbage_collector_.start(); return OB_SUCCESS; } void ObTableLockService::stop() { + obj_lock_garbage_collector_.stop(); } void ObTableLockService::wait() { + obj_lock_garbage_collector_.wait(); } void ObTableLockService::destroy() @@ -366,6 +498,10 @@ int ObTableLockService::lock_table(ObTxDesc &tx_desc, LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()), K(tx_param.is_valid()), K(arg.is_valid())); } else { + // origin_timeout_us_ and timeout_us_ are both set as timeout_us_, which + // is set by user in the 'WAIT n' option. + // Furthermore, if timeout_us_ is 0, this lock will be judged as a try + // lock semantics. It meets the actual semantics of 'NOWAIT' option. ObTableLockCtx ctx(LOCK_TABLE, arg.table_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; ctx.tx_desc_ = &tx_desc; @@ -460,6 +596,41 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc, return ret; } +int ObTableLockService::lock_partition_or_subpartition(ObTxDesc &tx_desc, + const ObTxParam &tx_param, + const ObLockPartitionRequest &arg) +{ + int ret = OB_SUCCESS; + ObPartitionLevel part_level = PARTITION_LEVEL_MAX; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("lock service is not inited", K(ret)); + } else if (OB_FAIL(get_table_partition_level_(arg.table_id_, part_level))) { + LOG_WARN("can not get table partition level", K(ret), K(arg)); + } else { + switch (part_level) { + case PARTITION_LEVEL_ONE: { + if (OB_FAIL(lock_partition(tx_desc, tx_param, arg))) { + LOG_WARN("lock partition failed", K(ret), K(arg)); + } + break; + } + case PARTITION_LEVEL_TWO: { + if (OB_FAIL(lock_subpartition(tx_desc, tx_param, arg))) { + LOG_WARN("lock subpartition failed", K(ret), K(arg)); + } + break; + } + default: { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected partition level", K(ret), K(arg), K(part_level)); + } + } + } + return ret; +} + int ObTableLockService::lock_partition(ObTxDesc &tx_desc, const ObTxParam &tx_param, const ObLockPartitionRequest &arg) @@ -623,6 +794,31 @@ int ObTableLockService::unlock_obj(ObTxDesc &tx_desc, return ret; } +int ObTableLockService::garbage_collect_right_now() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLockService is not be inited", K(ret)); + } else if (OB_FAIL(obj_lock_garbage_collector_.garbage_collect_right_now())) { + LOG_WARN("garbage collect right now failed", K(ret)); + } else { + LOG_DEBUG("garbage collect right now"); + } + return ret; +} + +int ObTableLockService::get_obj_lock_garbage_collector(ObOBJLockGarbageCollector *&obj_lock_garbage_collector) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLockService is not be inited", K(ret)); + } else { + obj_lock_garbage_collector = &obj_lock_garbage_collector_; + } + return ret; +} int ObTableLockService::process_lock_task_(ObTableLockCtx &ctx, const ObTableLockMode lock_mode, const ObTableLockOwnerID lock_owner) @@ -1072,6 +1268,33 @@ int ObTableLockService::deal_with_deadlock_(ObTableLockCtx &ctx) return ret; } +int ObTableLockService::get_table_partition_level_(const ObTableID table_id, + ObPartitionLevel &part_level) +{ + int ret = OB_SUCCESS; + ObMultiVersionSchemaService *schema_service = MTL(ObTenantSchemaService*)->get_schema_service(); + ObRefreshSchemaStatus schema_status; + ObTableSchema *table_schema = nullptr; + ObArenaAllocator allocator("TableSchema"); + schema_status.tenant_id_ = MTL_ID(); + + if (OB_ISNULL(schema_service)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("can not get schema service", K(ret)); + } else if (OB_FAIL(schema_service->get_schema_service() + ->get_table_schema(schema_status, + table_id, + INT64_MAX - 1 /* refresh the newest schema */, + *sql_proxy_, + allocator, + table_schema))) { + LOG_WARN("can not get table schema", K(ret), K(table_id)); + } else { + part_level = table_schema->get_part_level(); + } + return ret; +} + int ObTableLockService::pack_batch_request_(ObTableLockCtx &ctx, const ObTableLockTaskType task_type, const ObTableLockMode &lock_mode, @@ -1155,7 +1378,6 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch, const ObLockIDArray &lock_ids = data->second; ObLockTaskBatchRequest request; ObAddr addr; - ObTableLockTaskResult result; if (OB_FAIL(ls_array.push_back(ls_id))) { LOG_WARN("push_back lsid failed", K(ret), K(ls_id)); @@ -1176,7 +1398,7 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch, timeout_us, ctx.tx_desc_->get_tenant_id(), request))) { - LOG_WARN("failed to all async rpc", KR(ret), K(addr), + LOG_WARN("failed to call async rpc", KR(ret), K(addr), K(ctx.abs_timeout_ts_), K(request)); } else { rpc_count++; diff --git a/src/storage/tablelock/ob_table_lock_service.h b/src/storage/tablelock/ob_table_lock_service.h index e6046abc5d..5553dd4f3f 100644 --- a/src/storage/tablelock/ob_table_lock_service.h +++ b/src/storage/tablelock/ob_table_lock_service.h @@ -17,16 +17,14 @@ #include "common/ob_tablet_id.h" #include "share/ob_ls_id.h" +#include "share/ob_occam_timer.h" #include "sql/ob_sql_trans_control.h" #include "storage/tablelock/ob_table_lock_common.h" #include "storage/tablelock/ob_table_lock_rpc_proxy.h" #include "storage/tablelock/ob_table_lock_rpc_struct.h" -#include "lib/utility/ob_macro_utils.h" - namespace oceanbase { - namespace rpc { namespace frame @@ -142,6 +140,36 @@ private: K(current_savepoint_), K(need_rollback_ls_), K(tablet_list_), K(schema_version_), K(tx_is_killed_), K(stmt_savepoint_)); }; +public: + class ObOBJLockGarbageCollector + { + static const int OBJ_LOCK_GC_THREAD_NUM = 2; + public: + friend class ObLockTable; + ObOBJLockGarbageCollector(); + ~ObOBJLockGarbageCollector(); + public: + int start(); + void stop(); + void wait(); + int garbage_collect_right_now(); + + TO_STRING_KV(KP(this), + K_(last_success_timestamp)); + private: + int garbage_collect_for_all_ls_(); + void check_and_report_timeout_(); + public: + static int64_t GARBAGE_COLLECT_PRECISION; + static int64_t GARBAGE_COLLECT_EXEC_INTERVAL; + static int64_t GARBAGE_COLLECT_TIMEOUT; + private: + common::ObOccamThreadPool obj_lock_gc_thread_pool_; + common::ObOccamTimer timer_; + common::ObOccamTimerTaskRAIIHandle timer_handle_; + + int64_t last_success_timestamp_; + }; public: typedef hash::ObHashMap LockMap; @@ -149,6 +177,7 @@ public: ObTableLockService() : location_service_(nullptr), sql_proxy_(nullptr), + obj_lock_garbage_collector_(), is_inited_(false) {} ~ObTableLockService() {} int init(); @@ -225,6 +254,9 @@ public: int unlock_tablet(ObTxDesc &tx_desc, const ObTxParam &tx_param, const ObUnLockTabletRequest &arg); + int lock_partition_or_subpartition(ObTxDesc &tx_desc, + const ObTxParam &tx_param, + const ObLockPartitionRequest &arg); int lock_partition(ObTxDesc &tx_desc, const ObTxParam &tx_param, const ObLockPartitionRequest &arg); @@ -243,6 +275,8 @@ public: int unlock_obj(ObTxDesc &tx_desc, const ObTxParam &tx_param, const ObUnLockObjRequest &arg); + int garbage_collect_right_now(); + int get_obj_lock_garbage_collector(ObOBJLockGarbageCollector *&obj_lock_garbage_collector); private: bool need_retry_single_task_(const ObTableLockCtx &ctx, const int64_t ret) const; @@ -355,6 +389,7 @@ private: const ObTableLockOwnerID lock_owner); // used by deadlock detector. int deal_with_deadlock_(ObTableLockCtx &ctx); + int get_table_partition_level_(const ObTableID table_id, ObPartitionLevel &part_level); DISALLOW_COPY_AND_ASSIGN(ObTableLockService); private: @@ -364,6 +399,7 @@ private: share::ObLocationService *location_service_; common::ObMySQLProxy *sql_proxy_; + ObOBJLockGarbageCollector obj_lock_garbage_collector_; bool is_inited_; };