From 60b885ce98c8420de41adc3e1555dc1f7763f656 Mon Sep 17 00:00:00 2001 From: Hongqin-Li Date: Tue, 6 Feb 2024 16:14:31 +0000 Subject: [PATCH] [CP] Fix 4721 when fetch tablet ls tablet seq --- mittest/simple_server/CMakeLists.txt | 1 + .../simple_server/test_tablet_autoinc_mgr.cpp | 216 ++++++++++++++++++ src/share/ob_tablet_autoincrement_service.cpp | 42 ++-- src/share/ob_tablet_autoincrement_service.h | 11 +- 4 files changed, 241 insertions(+), 29 deletions(-) create mode 100644 mittest/simple_server/test_tablet_autoinc_mgr.cpp diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index 96f6d500b9..1b9a670aa9 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -96,6 +96,7 @@ ob_unittest_observer(test_transfer_lock_info_operator storage_ha/test_transfer_l ob_unittest_observer(test_mds_recover test_mds_recover.cpp) ob_unittest_observer(test_keep_alive_min_start_scn test_keep_alive_min_start_scn.cpp) ob_unittest_observer(test_ls_replica test_ls_replica.cpp) +ob_unittest_observer(test_tablet_autoinc_mgr test_tablet_autoinc_mgr.cpp) # TODO(muwei.ym): open later ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp) ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp) diff --git a/mittest/simple_server/test_tablet_autoinc_mgr.cpp b/mittest/simple_server/test_tablet_autoinc_mgr.cpp new file mode 100644 index 0000000000..6aa3438928 --- /dev/null +++ b/mittest/simple_server/test_tablet_autoinc_mgr.cpp @@ -0,0 +1,216 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#include +#include + +#define USING_LOG_PREFIX SHARE +#define protected public +#define private public + +#include "env/ob_simple_server_restart_helper.h" +#include "env/ob_simple_cluster_test_base.h" +#include "storage_ha/test_transfer_common_fun.h" +#include "lib/ob_errno.h" +#include "rootserver/ob_tenant_transfer_service.h" // ObTenantTransferService +#include "share/transfer/ob_transfer_task_operator.h" // ObTransferTaskOperator +#include "share/location_cache/ob_location_service.h" // ObLocationService +#include "share/ob_tablet_autoincrement_service.h" +#include "storage/high_availability/ob_transfer_handler.h" //ObTransferHandler +#include "lib/utility/utility.h" +#include "storage/ls/ob_ls_tablet_service.h" +#include "storage/ls/ob_ls.h" +#include "storage/tablet/ob_tablet.h" +#include "storage/tx_storage/ob_ls_service.h" + +namespace oceanbase +{ +using namespace unittest; +using rootserver::ObTenantTransferService; +namespace share +{ +using namespace common; + +static const int64_t TOTAL_NUM = 110; +static uint64_t g_tenant_id; +static ObTransferPartList g_part_list; +static ObSEArray g_tablet_ls_pairs; + +class TestTabletAutoincMgr : public unittest::ObSimpleClusterTestBase +{ +public: + TestTabletAutoincMgr() : unittest::ObSimpleClusterTestBase("test_tablet_autoinc_mgr") {} + int prepare_tablet_ls_pairs(ObMySQLProxy &sql_proxy, const char *table_name, ObIArray &tablet_ls_pairs); + int prepare_part_list(ObMySQLProxy &sql_proxy, const char *table_name, ObTransferPartList &part_list); +}; + +int TestTabletAutoincMgr::prepare_tablet_ls_pairs( + ObMySQLProxy &sql_proxy, + const char *table_name, + ObIArray &tablet_ls_pairs) +{ + int ret = OB_SUCCESS; + tablet_ls_pairs.reset(); + ObSqlString sql; + int64_t affected_rows = 0; + SMART_VAR(ObMySQLProxy::MySQLResult, result) { + if (OB_FAIL(sql.assign_fmt("select TABLET_ID, LS_ID from oceanbase.__all_tablet_to_ls where table_id in (select table_id from oceanbase.__all_table where table_id = (select table_id from oceanbase.__all_table where table_name = '%s') union select table_id from oceanbase.__all_table where data_table_id = (select table_id from oceanbase.__all_table where table_name = '%s')) order by TABLET_ID", table_name, table_name))) { + } else if (OB_FAIL(sql_proxy.read(result, sql.ptr()))) { + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null result", KR(ret), K(sql)); + } else { + sqlclient::ObMySQLResult &res = *result.get_result(); + uint64_t tablet_id = ObTabletID::INVALID_TABLET_ID; + int64_t ls_id = ObLSID::INVALID_LS_ID; + while(OB_SUCC(ret) && OB_SUCC(res.next())) { + EXTRACT_INT_FIELD_MYSQL(res, "TABLET_ID", tablet_id, uint64_t); + EXTRACT_INT_FIELD_MYSQL(res, "LS_ID", ls_id, int64_t); + if (OB_FAIL(tablet_ls_pairs.push_back(ObTabletLSPair(tablet_id, ls_id)))) {} + } + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to generate data", K(sql)); + } + } + } + return ret; +} + +int TestTabletAutoincMgr::prepare_part_list( + ObMySQLProxy &sql_proxy, + const char *table_name, + ObTransferPartList &part_list) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + SMART_VAR(ObMySQLProxy::MySQLResult, result) { + if (OB_FAIL(sql.assign_fmt("select object_id from oceanbase.DBA_OBJECTS where OBJECT_NAME='%s'", table_name))) { + } else if (OB_FAIL(sql_proxy.read(result, sql.ptr()))) { + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + } else { + sqlclient::ObMySQLResult &res = *result.get_result(); + uint64_t table_id = OB_INVALID_ID; + uint64_t part_id = OB_INVALID_ID; + int64_t part_count = 0; + if (OB_SUCC(ret) && OB_SUCC(res.next())) { + EXTRACT_INT_FIELD_MYSQL(res, "object_id", table_id, uint64_t); + } + while(OB_SUCC(ret)) { + part_id = OB_INVALID_ID; + ObTransferPartInfo part_info; + if (OB_SUCC(res.next())) { + ++part_count; + EXTRACT_INT_FIELD_MYSQL(res, "object_id", part_id, uint64_t); + if (OB_FAIL(part_info.init(table_id, part_id))) { + } else if (OB_FAIL(part_list.push_back(part_info))) { + } + } + } + if (OB_ITER_END == ret) { + if (0 == part_count && OB_INVALID_ID != table_id && OB_INVALID_ID == part_id) { + ObTransferPartInfo part_info(table_id, 0); + (void)part_list.push_back(part_info); + } + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to generate data", K(sql)); + } + LOG_INFO("finish read sql", K(sql), K(part_list), K(table_id), K(part_id)); + } + } + return ret; +} + +TEST_F(TestTabletAutoincMgr, test_lob_tablet_autoinc_location_cache) +{ + g_tenant_id = OB_INVALID_TENANT_ID; + + ASSERT_EQ(OB_SUCCESS, create_tenant()); + ASSERT_EQ(OB_SUCCESS, get_tenant_id(g_tenant_id)); + ASSERT_TRUE(is_valid_tenant_id(g_tenant_id)); + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); + ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + ObMySQLProxy &inner_sql_proxy = get_curr_observer().get_mysql_proxy(); + ObSqlString sql; + int64_t affected_rows = 0; + + // create table and prepare basic info + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("create table t1(c1 int, c2 longtext)")); + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + ASSERT_EQ(OB_SUCCESS, prepare_tablet_ls_pairs(sql_proxy, "t1", g_tablet_ls_pairs)); + ASSERT_EQ(OB_SUCCESS, prepare_part_list(sql_proxy, "t1", g_part_list)); + ASSERT_EQ(1, g_part_list.count()); + ASSERT_EQ(3, g_tablet_ls_pairs.count()); + + // refresh tablet ls cache + ObLocationService *location_service = GCTX.location_service_; + ASSERT_TRUE(OB_NOT_NULL(location_service)); + ObTabletLSService *tablet_ls_service = &(location_service->tablet_ls_service_); + ObLSLocationService *ls_location_service = &(location_service->ls_location_service_); + bool is_cache_hit = false; + ObArray old_tablet_ls_cache; + for (int64_t i = 0; i < g_tablet_ls_pairs.count(); i++) { + const ObTabletID &tablet_id = g_tablet_ls_pairs.at(i).tablet_id_; + ObLSID ls_id; + ObTabletLSCache tablet_ls_cache; + ASSERT_EQ(OB_SUCCESS, tablet_ls_service->get(g_tenant_id, tablet_id, INT64_MAX, is_cache_hit, ls_id)); + ASSERT_EQ(OB_SUCCESS, tablet_ls_service->get_from_cache_(g_tenant_id, tablet_id, tablet_ls_cache)); + ASSERT_EQ(OB_SUCCESS, old_tablet_ls_cache.push_back(tablet_ls_cache)); + } + + // create other ls by cluster table + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("create table dup_table(c1 int) duplicate_scope = 'CLUSTER' partition by hash(c1) partitions 4")); + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + + share::ObTenantSwitchGuard tenant_guard; + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(g_tenant_id)); + ObTenantTransferService *tenant_transfer = MTL(ObTenantTransferService*); + ASSERT_TRUE(OB_NOT_NULL(tenant_transfer)); + + // transfer t1 to other ls + ObTransferTaskID task_id; + ObMySQLTransaction trans; + const ObLSID src_ls_id(1001); + const ObLSID dst_ls_id(1002); + ASSERT_EQ(OB_SUCCESS, trans.start(&inner_sql_proxy, g_tenant_id)); + ASSERT_EQ(OB_SUCCESS, tenant_transfer->generate_transfer_task(trans, src_ls_id, dst_ls_id, g_part_list, ObBalanceTaskID(123), task_id)); + ASSERT_EQ(OB_SUCCESS, trans.end(true)); + ObTransferStatus expected_status(ObTransferStatus::COMPLETED); + ObTransferTask task; + ASSERT_EQ(OB_SUCCESS, wait_transfer_task(g_tenant_id, task_id, expected_status, true/*is_from_his*/, inner_sql_proxy, task)); + ASSERT_EQ(OB_SUCCESS, task.result_); + + // restore old tablet ls cache + for (int64_t i = 0; i < old_tablet_ls_cache.count(); i++) { + ASSERT_EQ(OB_SUCCESS, tablet_ls_service->update_cache_(old_tablet_ls_cache.at(i))); + } + + // remove source ls and clear src ls cache + ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(src_ls_id, false)); + ASSERT_EQ(OB_SUCCESS, ls_location_service->erase_location_(GCONF.cluster_id, g_tenant_id, src_ls_id)); + + // insert lob + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("insert into t1 values (2, repeat('abcde0123456789', 1000));")); + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); +} + +} // namespace rootserver +} // namespace oceanbase +int main(int argc, char **argv) +{ + oceanbase::unittest::init_log_and_gtest(argc, argv); + OB_LOGGER.set_log_level("WDIAG"); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/share/ob_tablet_autoincrement_service.cpp b/src/share/ob_tablet_autoincrement_service.cpp index 051dc1b486..df94bc1b42 100644 --- a/src/share/ob_tablet_autoincrement_service.cpp +++ b/src/share/ob_tablet_autoincrement_service.cpp @@ -126,7 +126,6 @@ int ObTabletAutoincMgr::fetch_new_range(const ObTabletAutoincParam ¶m, obrpc::ObSrvRpcProxy *srv_rpc_proxy = nullptr; share::ObLocationService *location_service = nullptr; ObAddr leader_addr; - ObLSID ls_id; bool is_cache_hit = false; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; @@ -135,52 +134,39 @@ int ObTabletAutoincMgr::fetch_new_range(const ObTabletAutoincParam ¶m, || OB_ISNULL(location_service = GCTX.location_service_)) { ret = OB_ERR_SYS; LOG_WARN("root service or location_cache is null", K(ret), KP(srv_rpc_proxy), KP(location_service)); - } else if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, 0/*expire_renew_time*/, is_cache_hit, ls_id))) { - LOG_WARN("fail to get log stream id", K(ret), K(tablet_id)); - // try to use location cache first, if the cache is wrong, try force renew. - } else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id, - param.tenant_id_, - ls_id, - false,/*force_renew*/ - leader_addr))) { - LOG_WARN("get leader failed", K(ret), K(ls_id)); } else { obrpc::ObFetchTabletSeqArg arg; obrpc::ObFetchTabletSeqRes res; arg.cache_size_ = MAX(cache_size_, param.auto_increment_cache_size_); // TODO(shuangcan): confirm this arg.tenant_id_ = param.tenant_id_; arg.tablet_id_ = tablet_id; - arg.ls_id_ = ls_id; + // arg.ls_id_ will be filled by location_service->get bool finish = false; for (int64_t retry_times = 0; OB_SUCC(ret) && !finish; retry_times++) { - if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) { + if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, 0/*expire_renew_time*/, is_cache_hit, arg.ls_id_))) { + LOG_WARN("fail to get log stream id", K(ret), K(tablet_id)); + } else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id, + param.tenant_id_, + arg.ls_id_, + false,/*force_renew*/ + leader_addr))) { + LOG_WARN("get leader failed", K(ret), K(arg.ls_id_)); + } else if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) { LOG_WARN("fail to fetch autoinc cache for tablets", K(ret), K(retry_times), K(arg)); } else { finish = true; } if (OB_FAIL(ret)) { - const bool force_refresh_leader = OB_NOT_MASTER == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret; + (void)location_service->renew_tablet_location(param.tenant_id_, tablet_id, ret, !is_block_renew_location(ret)/*is_nonblock*/); if (is_retryable(ret)) { - ob_usleep(RETRY_INTERVAL); - res.reset(); if (OB_FAIL(THIS_WORKER.check_status())) { // overwrite ret LOG_WARN("failed to check status", K(ret)); + } else { + res.reset(); + ob_usleep(RETRY_INTERVAL); } } - if (OB_SUCC(ret) && force_refresh_leader) { - if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, INT64_MAX/*expire_renew_time*/, is_cache_hit, arg.ls_id_))) { - LOG_WARN("fail to get log stream id", K(ret), K(ret), K(tablet_id)); - } else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id, - param.tenant_id_, - arg.ls_id_, - true/*force_renew*/, - leader_addr))) { - LOG_WARN("force get leader failed", K(ret), K(ret), K(arg.ls_id_)); - } - } else { - (void)location_service->renew_tablet_location(param.tenant_id_, tablet_id, ret, true/*is_nonblock*/); - } } } diff --git a/src/share/ob_tablet_autoincrement_service.h b/src/share/ob_tablet_autoincrement_service.h index 68552b4773..3473627104 100644 --- a/src/share/ob_tablet_autoincrement_service.h +++ b/src/share/ob_tablet_autoincrement_service.h @@ -80,7 +80,16 @@ private: } bool is_retryable(int ret) { - return OB_NOT_MASTER == ret || OB_NOT_INIT == ret || OB_TIMEOUT == ret || OB_EAGAIN == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret; + return OB_NOT_MASTER == ret || OB_NOT_INIT == ret || OB_TIMEOUT == ret || OB_EAGAIN == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret || OB_LS_LOCATION_NOT_EXIST == ret; + } + bool is_block_renew_location(int ret) + { + return OB_LOCATION_LEADER_NOT_EXIST == ret || OB_LS_LOCATION_LEADER_NOT_EXIST == ret || OB_NO_READABLE_REPLICA == ret + || OB_NOT_MASTER == ret || OB_RS_NOT_MASTER == ret || OB_RS_SHUTDOWN == ret || OB_PARTITION_NOT_EXIST == ret || OB_LOCATION_NOT_EXIST == ret + || OB_PARTITION_IS_STOPPED == ret || OB_SERVER_IS_INIT == ret || OB_SERVER_IS_STOPPING == ret || OB_TENANT_NOT_IN_SERVER == ret + || OB_TRANS_RPC_TIMEOUT == ret || OB_USE_DUP_FOLLOW_AFTER_DML == ret || OB_TRANS_STMT_NEED_RETRY == ret + || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_LS_LOCATION_NOT_EXIST == ret || OB_PARTITION_IS_BLOCKED == ret || OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST == ret + || OB_GET_LOCATION_TIME_OUT == ret; } private: static const int64_t PREFETCH_THRESHOLD = 4;