Files
oceanbase/unittest/obcdc/test_ob_log_svr_finder.cpp
2022-03-25 18:10:38 +08:00

297 lines
9.4 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG_FETCHER
#include <gtest/gtest.h>
#include "share/ob_define.h"
#define private public
#include "test_ob_log_fetcher_common_utils.h"
#include "ob_log_utils.h"
#include "ob_log_svr_finder.h"
#include "ob_log_all_svr_cache.h"
#include "lib/atomic/ob_atomic.h"
using namespace oceanbase;
using namespace common;
using namespace liboblog;
namespace oceanbase
{
namespace unittest
{
class TestObLogSvrFinder: public ::testing::Test
{
public :
virtual void SetUp() {}
virtual void TearDown() {}
public :
static const int64_t SVR_FINDER_THREAD_NUM = 1;
};
static const int64_t TEST_TIME_LIMIT = 10 * _MIN_;
void generate_part_svr_list(const int64_t count, PartSvrList *&part_svr_list)
{
part_svr_list = static_cast<PartSvrList *>(
ob_malloc(sizeof(PartSvrList) * count));
for (int64_t idx = 0; idx < count; idx++) {
new (part_svr_list + idx) PartSvrList();
}
}
// Constructing SvrFindReq, two types of requests
// 1. logid request
// 2. timestamp request
void generate_svr_finder_requset(const int64_t count,
PartSvrList *part_svr_list,
SvrFindReq *&svr_req_array)
{
svr_req_array = static_cast<SvrFindReq *>(
ob_malloc(sizeof(SvrFindReq) * count));
for (int64_t idx = 0; idx < count; idx++) {
new (svr_req_array + idx) SvrFindReq();
ObPartitionKey pkey = ObPartitionKey((uint64_t)(1000 + idx), 0, 1);
const int64_t seed = get_timestamp();
if ((seed % 100) < 50) {
svr_req_array[idx].reset_for_req_by_log_id(part_svr_list[idx], pkey, idx);
EXPECT_TRUE(svr_req_array[idx].is_state_idle());
} else {
svr_req_array[idx].reset_for_req_by_tstamp(part_svr_list[idx], pkey, seed);
EXPECT_TRUE(svr_req_array[idx].is_state_idle());
}
}
}
// build LeaderFindReq
void generate_leader_finder_request(const int64_t count, LeaderFindReq *&leader_req_array)
{
leader_req_array = static_cast<LeaderFindReq *>(
ob_malloc(sizeof(LeaderFindReq) * count));
for (int64_t idx = 0; idx < count; idx++) {
new (leader_req_array + idx) LeaderFindReq();
ObPartitionKey pkey = ObPartitionKey((uint64_t)(1000 + idx), 0, 1);
leader_req_array[idx].reset(pkey);
EXPECT_TRUE(leader_req_array[idx].is_state_idle());
}
}
void wait_svr_finer_req_end(SvrFindReq *svr_req_array,
const int64_t count,
int64_t &end_request_cnt)
{
end_request_cnt = 0;
const int64_t start_test_tstamp = get_timestamp();
while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
&& (end_request_cnt < count)) {
for (int64_t idx = 0, cnt = count; idx < cnt; ++idx) {
SvrFindReq &r = svr_req_array[idx];
if (SvrFindReq::DONE == r.get_state()) {
end_request_cnt += 1;
r.set_state_idle();
}
}
usec_sleep(100 * _MSEC_);
}
}
void wait_leader_finer_req_end(LeaderFindReq *leader_req_array,
const int64_t count,
int64_t &end_request_cnt)
{
end_request_cnt = 0;
const int64_t start_test_tstamp = get_timestamp();
while (((get_timestamp() - start_test_tstamp) < TEST_TIME_LIMIT)
&& (end_request_cnt < count)) {
for (int64_t idx = 0, cnt = count; idx < cnt; ++idx) {
LeaderFindReq &r = leader_req_array[idx];
if (LeaderFindReq::DONE == r.get_state()) {
end_request_cnt += 1;
r.set_state_idle();
}
}
usec_sleep(100 * _MSEC_);
}
}
//////////////////////Basic function tests//////////////////////////////////////////
TEST_F(TestObLogSvrFinder, init)
{
MockFetcherErrHandler1 err_handler;
MockSysTableHelperDerive1 mock_systable_helper;
// AllSvrCache init
ObLogAllSvrCache all_svr_cache;
EXPECT_EQ(OB_SUCCESS, all_svr_cache.init(mock_systable_helper, err_handler));
// SvrFinder init
ObLogSvrFinder svr_finder;
EXPECT_EQ(OB_SUCCESS, svr_finder.init(SVR_FINDER_THREAD_NUM, err_handler,
all_svr_cache, mock_systable_helper));
// sever list for partition
PartSvrList *part_svr_list = NULL;
generate_part_svr_list(SVR_FINDER_REQ_NUM, part_svr_list);
// Constructing SvrFindReq, two types of requests
// 1. logid request
// 2. timestamp request
SvrFindReq *svr_req_array = NULL;
generate_svr_finder_requset(SVR_FINDER_REQ_NUM, part_svr_list, svr_req_array);
// build LeaderFindReq
LeaderFindReq *leader_req_array = NULL;
generate_leader_finder_request(LEADER_FINDER_REQ_NUM, leader_req_array);
// push request to svr_finder
for (int64_t idx = 0; idx < SVR_FINDER_REQ_NUM; idx++) {
EXPECT_EQ(OB_SUCCESS, svr_finder.async_svr_find_req(svr_req_array + idx));
}
for (int64_t idx = 0; idx < LEADER_FINDER_REQ_NUM; idx++) {
EXPECT_EQ(OB_SUCCESS, svr_finder.async_leader_find_req(leader_req_array + idx));
}
// SvrFinder start
EXPECT_EQ(OB_SUCCESS, svr_finder.start());
// Wait for asynchronous SvrFinderReq to finish
int64_t end_svr_finder_req_cnt = 0;
wait_svr_finer_req_end(svr_req_array, SVR_FINDER_REQ_NUM, end_svr_finder_req_cnt);
// Assert
EXPECT_EQ(SVR_FINDER_REQ_NUM, end_svr_finder_req_cnt);
// Waiting for the end of the asynchronous LeaderFinderReq
int64_t end_leader_finder_req_cnt = 0;
wait_leader_finer_req_end(leader_req_array, LEADER_FINDER_REQ_NUM, end_leader_finder_req_cnt);
// Assert
EXPECT_EQ(LEADER_FINDER_REQ_NUM, end_leader_finder_req_cnt);
// Validate SvrFinderReq results
for (int64_t idx = 0; idx < SVR_FINDER_REQ_NUM; idx++) {
PartSvrList &svr_list = part_svr_list[idx];
PartSvrList::SvrItemArray svr_items = svr_list.svr_items_;
int64_t EXPECT_START_LOG_ID = 0;
int64_t EXPECT_END_LOG_ID = 0;
if (svr_req_array[idx].req_by_next_log_id_) {
EXPECT_START_LOG_ID = svr_req_array[idx].next_log_id_;
EXPECT_END_LOG_ID = EXPECT_START_LOG_ID + 10000;
} else if (svr_req_array[idx].req_by_start_tstamp_) {
EXPECT_START_LOG_ID = 0;
EXPECT_END_LOG_ID = 65536;
}
int cnt = QUERY_CLOG_HISTORY_VALID_COUNT + QUERY_META_INFO_ADD_COUNT;
EXPECT_EQ(cnt, svr_list.count());
// Validate log range
for (int64_t svr_idx = 0; svr_idx < cnt; svr_idx++) {
const PartSvrList::LogIdRange &range = svr_items[svr_idx].log_ranges_[0];
if (svr_idx < QUERY_CLOG_HISTORY_VALID_COUNT) {
// clog history record
EXPECT_EQ(EXPECT_START_LOG_ID, range.start_log_id_);
EXPECT_EQ(EXPECT_END_LOG_ID, range.end_log_id_);
} else {
// Additional records
EXPECT_EQ(0, range.start_log_id_);
EXPECT_EQ(OB_INVALID_ID, range.end_log_id_);
}
}
}
// Validate LeaderFinderReq results
ObAddr EXPECT_ADDR;
EXPECT_ADDR.set_ip_addr("127.0.0.1", 8888);
for (int64_t idx = 0; idx < LEADER_FINDER_REQ_NUM; idx++) {
LeaderFindReq &req = leader_req_array[idx];
EXPECT_TRUE(req.has_leader_);
EXPECT_EQ(EXPECT_ADDR, req.leader_);
}
// destroy
ob_free(part_svr_list);
ob_free(svr_req_array);
svr_finder.destroy();
all_svr_cache.destroy();
}
// Used to test if SvrFinder can filter INACTIVE records
TEST_F(TestObLogSvrFinder, inactive_test)
{
MockFetcherErrHandler1 err_handler;
MockSysTableHelperDerive2 mock_systable_helper;
// AllSvrCache init
ObLogAllSvrCache all_svr_cache;
EXPECT_EQ(OB_SUCCESS, all_svr_cache.init(mock_systable_helper, err_handler));
// SvrFinder init
ObLogSvrFinder svr_finder;
EXPECT_EQ(OB_SUCCESS, svr_finder.init(SVR_FINDER_THREAD_NUM, err_handler,
all_svr_cache, mock_systable_helper));
// Declaration of partition sever list
PartSvrList *part_svr_list = NULL;
generate_part_svr_list(SVR_FINDER_REQ_NUM, part_svr_list);
// Constructing SvrFindReq, two types of requests
// 1. logid request
// 2. timestamp request
SvrFindReq *svr_req_array = NULL;
generate_svr_finder_requset(SVR_FINDER_REQ_NUM, part_svr_list, svr_req_array);
// push request to svr_finder
for (int64_t idx = 0; idx < SVR_FINDER_REQ_NUM; idx++) {
EXPECT_EQ(OB_SUCCESS, svr_finder.async_svr_find_req(svr_req_array + idx));
}
// SvrFinder start
EXPECT_EQ(OB_SUCCESS, svr_finder.start());
// Wait for asynchronous SvrFinderReq to finish
int64_t end_svr_finder_req_cnt = 0;
wait_svr_finer_req_end(svr_req_array, SVR_FINDER_REQ_NUM, end_svr_finder_req_cnt);
// Assert
EXPECT_EQ(SVR_FINDER_REQ_NUM, end_svr_finder_req_cnt);
// Validate SvrFinderReq results
int cnt = (QUERY_CLOG_HISTORY_VALID_COUNT + QUERY_META_INFO_ADD_COUNT) / 2;
for (int64_t idx = 0; idx < 1; idx++) {
PartSvrList &svr_list = part_svr_list[idx];
PartSvrList::SvrItemArray svr_items = svr_list.svr_items_;
EXPECT_EQ(cnt, svr_list.count());
}
ob_free(part_svr_list);
ob_free(svr_req_array);
svr_finder.destroy();
all_svr_cache.destroy();
}
}//end of unittest
}//end of oceanbase
int main(int argc, char **argv)
{
// ObLogger::get_logger().set_mod_log_levels("ALL.*:DEBUG, TLOG.*:DEBUG");
// testing::InitGoogleTest(&argc,argv);
// testing::FLAGS_gtest_filter = "DO_NOT_RUN";
int ret = 1;
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_ob_log_svr_finder.log", true);
logger.set_log_level(OB_LOG_LEVEL_INFO);
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
return ret;
}