[CP] Fix 4721 when fetch tablet ls tablet seq
This commit is contained in:
@ -96,6 +96,7 @@ ob_unittest_observer(test_transfer_lock_info_operator storage_ha/test_transfer_l
|
||||
ob_unittest_observer(test_mds_recover test_mds_recover.cpp)
|
||||
ob_unittest_observer(test_keep_alive_min_start_scn test_keep_alive_min_start_scn.cpp)
|
||||
ob_unittest_observer(test_ls_replica test_ls_replica.cpp)
|
||||
ob_unittest_observer(test_tablet_autoinc_mgr test_tablet_autoinc_mgr.cpp)
|
||||
# TODO(muwei.ym): open later
|
||||
ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp)
|
||||
ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp)
|
||||
|
||||
216
mittest/simple_server/test_tablet_autoinc_mgr.cpp
Normal file
216
mittest/simple_server/test_tablet_autoinc_mgr.cpp
Normal file
@ -0,0 +1,216 @@
|
||||
/**
|
||||
* Copyright (c) 2023 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#include <gtest/gtest.h>
|
||||
#include <gmock/gmock.h>
|
||||
|
||||
#define USING_LOG_PREFIX SHARE
|
||||
#define protected public
|
||||
#define private public
|
||||
|
||||
#include "env/ob_simple_server_restart_helper.h"
|
||||
#include "env/ob_simple_cluster_test_base.h"
|
||||
#include "storage_ha/test_transfer_common_fun.h"
|
||||
#include "lib/ob_errno.h"
|
||||
#include "rootserver/ob_tenant_transfer_service.h" // ObTenantTransferService
|
||||
#include "share/transfer/ob_transfer_task_operator.h" // ObTransferTaskOperator
|
||||
#include "share/location_cache/ob_location_service.h" // ObLocationService
|
||||
#include "share/ob_tablet_autoincrement_service.h"
|
||||
#include "storage/high_availability/ob_transfer_handler.h" //ObTransferHandler
|
||||
#include "lib/utility/utility.h"
|
||||
#include "storage/ls/ob_ls_tablet_service.h"
|
||||
#include "storage/ls/ob_ls.h"
|
||||
#include "storage/tablet/ob_tablet.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace unittest;
|
||||
using rootserver::ObTenantTransferService;
|
||||
namespace share
|
||||
{
|
||||
using namespace common;
|
||||
|
||||
static const int64_t TOTAL_NUM = 110;
|
||||
static uint64_t g_tenant_id;
|
||||
static ObTransferPartList g_part_list;
|
||||
static ObSEArray<ObTabletLSPair, TOTAL_NUM> g_tablet_ls_pairs;
|
||||
|
||||
class TestTabletAutoincMgr : public unittest::ObSimpleClusterTestBase
|
||||
{
|
||||
public:
|
||||
TestTabletAutoincMgr() : unittest::ObSimpleClusterTestBase("test_tablet_autoinc_mgr") {}
|
||||
int prepare_tablet_ls_pairs(ObMySQLProxy &sql_proxy, const char *table_name, ObIArray<ObTabletLSPair> &tablet_ls_pairs);
|
||||
int prepare_part_list(ObMySQLProxy &sql_proxy, const char *table_name, ObTransferPartList &part_list);
|
||||
};
|
||||
|
||||
int TestTabletAutoincMgr::prepare_tablet_ls_pairs(
|
||||
ObMySQLProxy &sql_proxy,
|
||||
const char *table_name,
|
||||
ObIArray<ObTabletLSPair> &tablet_ls_pairs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
tablet_ls_pairs.reset();
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
|
||||
if (OB_FAIL(sql.assign_fmt("select TABLET_ID, LS_ID from oceanbase.__all_tablet_to_ls where table_id in (select table_id from oceanbase.__all_table where table_id = (select table_id from oceanbase.__all_table where table_name = '%s') union select table_id from oceanbase.__all_table where data_table_id = (select table_id from oceanbase.__all_table where table_name = '%s')) order by TABLET_ID", table_name, table_name))) {
|
||||
} else if (OB_FAIL(sql_proxy.read(result, sql.ptr()))) {
|
||||
} else if (OB_ISNULL(result.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("null result", KR(ret), K(sql));
|
||||
} else {
|
||||
sqlclient::ObMySQLResult &res = *result.get_result();
|
||||
uint64_t tablet_id = ObTabletID::INVALID_TABLET_ID;
|
||||
int64_t ls_id = ObLSID::INVALID_LS_ID;
|
||||
while(OB_SUCC(ret) && OB_SUCC(res.next())) {
|
||||
EXTRACT_INT_FIELD_MYSQL(res, "TABLET_ID", tablet_id, uint64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(res, "LS_ID", ls_id, int64_t);
|
||||
if (OB_FAIL(tablet_ls_pairs.push_back(ObTabletLSPair(tablet_id, ls_id)))) {}
|
||||
}
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to generate data", K(sql));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int TestTabletAutoincMgr::prepare_part_list(
|
||||
ObMySQLProxy &sql_proxy,
|
||||
const char *table_name,
|
||||
ObTransferPartList &part_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
|
||||
if (OB_FAIL(sql.assign_fmt("select object_id from oceanbase.DBA_OBJECTS where OBJECT_NAME='%s'", table_name))) {
|
||||
} else if (OB_FAIL(sql_proxy.read(result, sql.ptr()))) {
|
||||
} else if (OB_ISNULL(result.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else {
|
||||
sqlclient::ObMySQLResult &res = *result.get_result();
|
||||
uint64_t table_id = OB_INVALID_ID;
|
||||
uint64_t part_id = OB_INVALID_ID;
|
||||
int64_t part_count = 0;
|
||||
if (OB_SUCC(ret) && OB_SUCC(res.next())) {
|
||||
EXTRACT_INT_FIELD_MYSQL(res, "object_id", table_id, uint64_t);
|
||||
}
|
||||
while(OB_SUCC(ret)) {
|
||||
part_id = OB_INVALID_ID;
|
||||
ObTransferPartInfo part_info;
|
||||
if (OB_SUCC(res.next())) {
|
||||
++part_count;
|
||||
EXTRACT_INT_FIELD_MYSQL(res, "object_id", part_id, uint64_t);
|
||||
if (OB_FAIL(part_info.init(table_id, part_id))) {
|
||||
} else if (OB_FAIL(part_list.push_back(part_info))) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_ITER_END == ret) {
|
||||
if (0 == part_count && OB_INVALID_ID != table_id && OB_INVALID_ID == part_id) {
|
||||
ObTransferPartInfo part_info(table_id, 0);
|
||||
(void)part_list.push_back(part_info);
|
||||
}
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to generate data", K(sql));
|
||||
}
|
||||
LOG_INFO("finish read sql", K(sql), K(part_list), K(table_id), K(part_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
TEST_F(TestTabletAutoincMgr, test_lob_tablet_autoinc_location_cache)
|
||||
{
|
||||
g_tenant_id = OB_INVALID_TENANT_ID;
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, create_tenant());
|
||||
ASSERT_EQ(OB_SUCCESS, get_tenant_id(g_tenant_id));
|
||||
ASSERT_TRUE(is_valid_tenant_id(g_tenant_id));
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
ObMySQLProxy &inner_sql_proxy = get_curr_observer().get_mysql_proxy();
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
|
||||
// create table and prepare basic info
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("create table t1(c1 int, c2 longtext)"));
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
||||
ASSERT_EQ(OB_SUCCESS, prepare_tablet_ls_pairs(sql_proxy, "t1", g_tablet_ls_pairs));
|
||||
ASSERT_EQ(OB_SUCCESS, prepare_part_list(sql_proxy, "t1", g_part_list));
|
||||
ASSERT_EQ(1, g_part_list.count());
|
||||
ASSERT_EQ(3, g_tablet_ls_pairs.count());
|
||||
|
||||
// refresh tablet ls cache
|
||||
ObLocationService *location_service = GCTX.location_service_;
|
||||
ASSERT_TRUE(OB_NOT_NULL(location_service));
|
||||
ObTabletLSService *tablet_ls_service = &(location_service->tablet_ls_service_);
|
||||
ObLSLocationService *ls_location_service = &(location_service->ls_location_service_);
|
||||
bool is_cache_hit = false;
|
||||
ObArray<ObTabletLSCache> old_tablet_ls_cache;
|
||||
for (int64_t i = 0; i < g_tablet_ls_pairs.count(); i++) {
|
||||
const ObTabletID &tablet_id = g_tablet_ls_pairs.at(i).tablet_id_;
|
||||
ObLSID ls_id;
|
||||
ObTabletLSCache tablet_ls_cache;
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_ls_service->get(g_tenant_id, tablet_id, INT64_MAX, is_cache_hit, ls_id));
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_ls_service->get_from_cache_(g_tenant_id, tablet_id, tablet_ls_cache));
|
||||
ASSERT_EQ(OB_SUCCESS, old_tablet_ls_cache.push_back(tablet_ls_cache));
|
||||
}
|
||||
|
||||
// create other ls by cluster table
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("create table dup_table(c1 int) duplicate_scope = 'CLUSTER' partition by hash(c1) partitions 4"));
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
||||
|
||||
share::ObTenantSwitchGuard tenant_guard;
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(g_tenant_id));
|
||||
ObTenantTransferService *tenant_transfer = MTL(ObTenantTransferService*);
|
||||
ASSERT_TRUE(OB_NOT_NULL(tenant_transfer));
|
||||
|
||||
// transfer t1 to other ls
|
||||
ObTransferTaskID task_id;
|
||||
ObMySQLTransaction trans;
|
||||
const ObLSID src_ls_id(1001);
|
||||
const ObLSID dst_ls_id(1002);
|
||||
ASSERT_EQ(OB_SUCCESS, trans.start(&inner_sql_proxy, g_tenant_id));
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_transfer->generate_transfer_task(trans, src_ls_id, dst_ls_id, g_part_list, ObBalanceTaskID(123), task_id));
|
||||
ASSERT_EQ(OB_SUCCESS, trans.end(true));
|
||||
ObTransferStatus expected_status(ObTransferStatus::COMPLETED);
|
||||
ObTransferTask task;
|
||||
ASSERT_EQ(OB_SUCCESS, wait_transfer_task(g_tenant_id, task_id, expected_status, true/*is_from_his*/, inner_sql_proxy, task));
|
||||
ASSERT_EQ(OB_SUCCESS, task.result_);
|
||||
|
||||
// restore old tablet ls cache
|
||||
for (int64_t i = 0; i < old_tablet_ls_cache.count(); i++) {
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_ls_service->update_cache_(old_tablet_ls_cache.at(i)));
|
||||
}
|
||||
|
||||
// remove source ls and clear src ls cache
|
||||
ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(src_ls_id, false));
|
||||
ASSERT_EQ(OB_SUCCESS, ls_location_service->erase_location_(GCONF.cluster_id, g_tenant_id, src_ls_id));
|
||||
|
||||
// insert lob
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("insert into t1 values (2, repeat('abcde0123456789', 1000));"));
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
||||
}
|
||||
|
||||
} // namespace rootserver
|
||||
} // namespace oceanbase
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
oceanbase::unittest::init_log_and_gtest(argc, argv);
|
||||
OB_LOGGER.set_log_level("WDIAG");
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
@ -126,7 +126,6 @@ int ObTabletAutoincMgr::fetch_new_range(const ObTabletAutoincParam ¶m,
|
||||
obrpc::ObSrvRpcProxy *srv_rpc_proxy = nullptr;
|
||||
share::ObLocationService *location_service = nullptr;
|
||||
ObAddr leader_addr;
|
||||
ObLSID ls_id;
|
||||
bool is_cache_hit = false;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -135,52 +134,39 @@ int ObTabletAutoincMgr::fetch_new_range(const ObTabletAutoincParam ¶m,
|
||||
|| OB_ISNULL(location_service = GCTX.location_service_)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("root service or location_cache is null", K(ret), KP(srv_rpc_proxy), KP(location_service));
|
||||
} else if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, 0/*expire_renew_time*/, is_cache_hit, ls_id))) {
|
||||
LOG_WARN("fail to get log stream id", K(ret), K(tablet_id));
|
||||
// try to use location cache first, if the cache is wrong, try force renew.
|
||||
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
|
||||
param.tenant_id_,
|
||||
ls_id,
|
||||
false,/*force_renew*/
|
||||
leader_addr))) {
|
||||
LOG_WARN("get leader failed", K(ret), K(ls_id));
|
||||
} else {
|
||||
obrpc::ObFetchTabletSeqArg arg;
|
||||
obrpc::ObFetchTabletSeqRes res;
|
||||
arg.cache_size_ = MAX(cache_size_, param.auto_increment_cache_size_); // TODO(shuangcan): confirm this
|
||||
arg.tenant_id_ = param.tenant_id_;
|
||||
arg.tablet_id_ = tablet_id;
|
||||
arg.ls_id_ = ls_id;
|
||||
// arg.ls_id_ will be filled by location_service->get
|
||||
|
||||
bool finish = false;
|
||||
for (int64_t retry_times = 0; OB_SUCC(ret) && !finish; retry_times++) {
|
||||
if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) {
|
||||
if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, 0/*expire_renew_time*/, is_cache_hit, arg.ls_id_))) {
|
||||
LOG_WARN("fail to get log stream id", K(ret), K(tablet_id));
|
||||
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
|
||||
param.tenant_id_,
|
||||
arg.ls_id_,
|
||||
false,/*force_renew*/
|
||||
leader_addr))) {
|
||||
LOG_WARN("get leader failed", K(ret), K(arg.ls_id_));
|
||||
} else if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) {
|
||||
LOG_WARN("fail to fetch autoinc cache for tablets", K(ret), K(retry_times), K(arg));
|
||||
} else {
|
||||
finish = true;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
const bool force_refresh_leader = OB_NOT_MASTER == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret;
|
||||
(void)location_service->renew_tablet_location(param.tenant_id_, tablet_id, ret, !is_block_renew_location(ret)/*is_nonblock*/);
|
||||
if (is_retryable(ret)) {
|
||||
ob_usleep<common::ObWaitEventIds::STORAGE_AUTOINC_FETCH_RETRY_SLEEP>(RETRY_INTERVAL);
|
||||
res.reset();
|
||||
if (OB_FAIL(THIS_WORKER.check_status())) { // overwrite ret
|
||||
LOG_WARN("failed to check status", K(ret));
|
||||
} else {
|
||||
res.reset();
|
||||
ob_usleep<common::ObWaitEventIds::STORAGE_AUTOINC_FETCH_RETRY_SLEEP>(RETRY_INTERVAL);
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && force_refresh_leader) {
|
||||
if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, INT64_MAX/*expire_renew_time*/, is_cache_hit, arg.ls_id_))) {
|
||||
LOG_WARN("fail to get log stream id", K(ret), K(ret), K(tablet_id));
|
||||
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
|
||||
param.tenant_id_,
|
||||
arg.ls_id_,
|
||||
true/*force_renew*/,
|
||||
leader_addr))) {
|
||||
LOG_WARN("force get leader failed", K(ret), K(ret), K(arg.ls_id_));
|
||||
}
|
||||
} else {
|
||||
(void)location_service->renew_tablet_location(param.tenant_id_, tablet_id, ret, true/*is_nonblock*/);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -80,7 +80,16 @@ private:
|
||||
}
|
||||
bool is_retryable(int ret)
|
||||
{
|
||||
return OB_NOT_MASTER == ret || OB_NOT_INIT == ret || OB_TIMEOUT == ret || OB_EAGAIN == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret;
|
||||
return OB_NOT_MASTER == ret || OB_NOT_INIT == ret || OB_TIMEOUT == ret || OB_EAGAIN == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret || OB_LS_LOCATION_NOT_EXIST == ret;
|
||||
}
|
||||
bool is_block_renew_location(int ret)
|
||||
{
|
||||
return OB_LOCATION_LEADER_NOT_EXIST == ret || OB_LS_LOCATION_LEADER_NOT_EXIST == ret || OB_NO_READABLE_REPLICA == ret
|
||||
|| OB_NOT_MASTER == ret || OB_RS_NOT_MASTER == ret || OB_RS_SHUTDOWN == ret || OB_PARTITION_NOT_EXIST == ret || OB_LOCATION_NOT_EXIST == ret
|
||||
|| OB_PARTITION_IS_STOPPED == ret || OB_SERVER_IS_INIT == ret || OB_SERVER_IS_STOPPING == ret || OB_TENANT_NOT_IN_SERVER == ret
|
||||
|| OB_TRANS_RPC_TIMEOUT == ret || OB_USE_DUP_FOLLOW_AFTER_DML == ret || OB_TRANS_STMT_NEED_RETRY == ret
|
||||
|| OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_LS_LOCATION_NOT_EXIST == ret || OB_PARTITION_IS_BLOCKED == ret || OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST == ret
|
||||
|| OB_GET_LOCATION_TIME_OUT == ret;
|
||||
}
|
||||
private:
|
||||
static const int64_t PREFETCH_THRESHOLD = 4;
|
||||
|
||||
Reference in New Issue
Block a user