Files
oceanbase/unittest/obcdc/test_ob_log_all_svr_cache.cpp
2022-03-25 18:10:38 +08:00

529 lines
15 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG_FETCHER
#include <gtest/gtest.h>
#include "share/ob_define.h"
#define private public
#include "obcdc/src/ob_log_all_svr_cache.h"
#include "obcdc/src/ob_log_systable_helper.h"
#include "ob_log_utils.h"
#include "test_ob_log_fetcher_common_utils.h"
#include "lib/atomic/ob_atomic.h"
using namespace oceanbase;
using namespace common;
using namespace liboblog;
namespace oceanbase
{
namespace unittest
{
class TestObLogAllSvrCache: public ::testing::Test
{
public :
virtual void SetUp() {}
virtual void TearDown() {}
public :
static const int64_t ALLSVR_CACHE_UPDATE_INTERVAL = 10 * _MSEC_;
};
static const int64_t SERVER_COUNT = 120;
static const int64_t FIRST_QUERY_RECORD_COUNT = 60;
static const int64_t VARY_RECORD_COUNT = 6;
typedef IObLogSysTableHelper::AllServerRecordArray AllServerRecordArray;
typedef IObLogSysTableHelper::AllServerRecord AllServerRecord;
AllServerRecord all_server_records[SERVER_COUNT];
const char *zones[4] = {"z1", "z2", "z3", "z4"};
const char *regions[4] = {"hz", "sh", "sz", "sh"};
const char *zone_types[4] = {"ReadWrite", "ReadWrite", "ReadWrite", "ReadOnly"};
void generate_data()
{
int ret = OB_SUCCESS;
ObString ip_str = "127.0.0.1";
for(int64_t idx = 0; idx < SERVER_COUNT; idx++) {
AllServerRecord &record = all_server_records[idx];
int64_t pos = 0;
if (OB_FAIL(databuff_printf(record.svr_ip_, sizeof(record.svr_ip_), pos,
"%.*s", ip_str.length(), ip_str.ptr()))) {
LOG_ERROR("save ip address fail", K(ret), K(pos),
"buf_size", sizeof(record.svr_ip_), K(ip_str));
}
record.svr_port_ = static_cast<int32_t>(idx + 8000);
int64_t index = idx % 4;
switch (index) {
case 0:
record.status_ = share::ObServerStatus::DisplayStatus::OB_SERVER_INACTIVE;
break;
case 1:
record.status_ = share::ObServerStatus::DisplayStatus::OB_SERVER_ACTIVE;
break;
case 2:
record.status_ = share::ObServerStatus::DisplayStatus::OB_SERVER_DELETING;
break;
case 3:
record.status_ = share::ObServerStatus::DisplayStatus::OB_SERVER_ACTIVE;
break;
default:
break;
}
if (OB_FAIL(record.zone_.assign(zones[index]))) {
LOG_ERROR("record zone assign fail", K(ret), K(record));
}
}
}
// To test if the cached __all_server system table data is cached correctly, the following dynamic policy is used for the returned data.
// Assume FIRST_QUERY_RECORD_COUNT=60, VARY_RECORD_COUNT=6
// 1. First query returns: records from 0 to 59
// 2. Second query returns: 6 new rows, i.e. 0 to 65 rows, 60 to 65 rows added
// 3. Third query returns: Decrease the first 6 rows, i.e. return 6 to 65 rows, decrease 0 to 5 rows
// ...
// and so on, until the end, the final validation result 60~119
class MockSysTableHelper1 : public IObLogSysTableHelper
{
public:
MockSysTableHelper1() : query_time_(1),
start_index_(0),
end_index_(FIRST_QUERY_RECORD_COUNT - 1),
is_invariable_(false) {}
virtual ~MockSysTableHelper1() {}
public:
int query_with_multiple_statement(BatchSQLQuery &batch_query)
{
UNUSED(batch_query);
return 0;
}
/// Query __all_clog_history_info_v2 based on log_id to get all servers with service log IDs greater than or equal to log_id logs
virtual int query_clog_history_by_log_id(
const common::ObPartitionKey &pkey,
const uint64_t log_id,
ClogHistoryRecordArray &records)
{
UNUSED(pkey);
UNUSED(log_id);
UNUSED(records);
return 0;
}
/// Query __all_clog_history_info_v2 for all servers with timestamp greater than or equal to timestamp log based on timestamp
virtual int query_clog_history_by_tstamp(
const common::ObPartitionKey &pkey,
const int64_t timestamp,
ClogHistoryRecordArray &records)
{
UNUSED(pkey);
UNUSED(timestamp);
UNUSED(records);
return 0;
}
/// Query __all_meta_table / __all_root_table to get information about the servers that are serving the partition
virtual int query_meta_info(
const common::ObPartitionKey &pkey,
MetaRecordArray &records)
{
UNUSED(pkey);
UNUSED(records);
return 0;
}
// Query __all_meta_table / __all_root_table for leader information
virtual int query_leader_info(
const common::ObPartitionKey &pkey,
bool &has_leader,
common::ObAddr &leader)
{
UNUSED(pkey);
UNUSED(has_leader);
UNUSED(leader);
return 0;
}
/// Query __all_server table for all active server information
virtual int query_all_server_info(AllServerRecordArray &records)
{
int ret = OB_SUCCESS;
// The first query returns records from 0 to FIRST_QUERY_RECORD_COUNT-1
if (1 == query_time_) {
start_index_ = 0;
end_index_ = FIRST_QUERY_RECORD_COUNT - 1;
} else {
if (is_invariable_) { // Return records no longer change
// do nothing
} else if (0 == (query_time_ & 0x01)) { // ADD record
if (end_index_ + VARY_RECORD_COUNT >= SERVER_COUNT) {
ATOMIC_STORE(&is_invariable_, true);
} else {
end_index_ += VARY_RECORD_COUNT;
}
} else if (1 == (query_time_ & 0x01)) { // minus records
start_index_ += VARY_RECORD_COUNT;
}
}
// make records
for (int64_t idx = start_index_; OB_SUCC(ret) && idx <= end_index_; idx++) {
AllServerRecord &record = all_server_records[idx];
if (OB_FAIL(records.push_back(record))) {
LOG_ERROR("records push error", K(ret), K(record));
}
}
LOG_INFO("query all server info", K(query_time_), K(start_index_),
K(end_index_), K(is_invariable_));
query_time_++;
return ret;
}
virtual int query_all_zone_info(AllZoneRecordArray &records)
{
UNUSED(records);
return 0;
}
virtual int query_all_zone_type(AllZoneTypeRecordArray &records)
{
int ret = OB_SUCCESS;
for (int64_t idx = 0; idx < 4; ++idx) {
AllZoneTypeRecord record;
record.zone_type_ = str_to_zone_type(zone_types[idx]);
if (OB_FAIL(record.zone_.assign(zones[idx]))) {
LOG_ERROR("record assign zone error", K(ret), K(record));
} else if (OB_FAIL(records.push_back(record))) {
LOG_ERROR("records push error", K(ret), K(record));
}
}
return ret;
}
virtual int query_cluster_info(ClusterInfo &cluster_info)
{
UNUSED(cluster_info);
return 0;
}
virtual int query_cluster_min_observer_version(uint64_t &min_observer_version)
{
UNUSED(min_observer_version);
return 0;
}
virtual int reset_connection()
{
return 0;
}
virtual int query_timezone_info_version(const uint64_t tenant_id,
int64_t &timezone_info_version)
{
UNUSED(tenant_id);
UNUSED(timezone_info_version);
return 0;
}
public:
int64_t query_time_;
int64_t start_index_;
int64_t end_index_;
bool is_invariable_;
};
class MockSysTableHelper2 : public IObLogSysTableHelper
{
public:
MockSysTableHelper2() : query_time_(1),
start_index_(0),
end_index_(FIRST_QUERY_RECORD_COUNT - 1) {}
virtual ~MockSysTableHelper2() {}
public:
virtual int query_with_multiple_statement(BatchSQLQuery &batch_query)
{
UNUSED(batch_query);
return 0;
}
/// Query __all_clog_history_info_v2 based on log_id to get all servers with service log IDs greater than or equal to log_id logs
virtual int query_clog_history_by_log_id(
const common::ObPartitionKey &pkey,
const uint64_t log_id,
ClogHistoryRecordArray &records)
{
UNUSED(pkey);
UNUSED(log_id);
UNUSED(records);
return 0;
}
/// Query __all_clog_history_info_v2 for all servers with timestamp greater than or equal to timestamp log based on timestamp
virtual int query_clog_history_by_tstamp(
const common::ObPartitionKey &pkey,
const int64_t timestamp,
ClogHistoryRecordArray &records)
{
UNUSED(pkey);
UNUSED(timestamp);
UNUSED(records);
return 0;
}
/// Query __all_meta_table / __all_root_table to get information about the servers that are serving the partition
virtual int query_meta_info(
const common::ObPartitionKey &pkey,
MetaRecordArray &records)
{
UNUSED(pkey);
UNUSED(records);
return 0;
}
// Query __all_meta_table / __all_root_table for leader information
virtual int query_leader_info(
const common::ObPartitionKey &pkey,
bool &has_leader,
common::ObAddr &leader)
{
UNUSED(pkey);
UNUSED(has_leader);
UNUSED(leader);
return 0;
}
/// Query the __all_server table to get all active server information
// First query: return a batch of servers, 1/3 of which are ACTIVE
// Second query: return the servers returned in the first query, and the ACTIVE server status is changed to INACTIVE
virtual int query_all_server_info(AllServerRecordArray &records)
{
int ret = OB_SUCCESS;
// build records
for (int64_t idx = start_index_; OB_SUCC(ret) && idx <= end_index_; idx++) {
AllServerRecord &record = all_server_records[idx];
if (2 == query_time_) {
// ACTIVE->INACTIVE
if (1 == idx % 4) {
record.status_ = share::ObServerStatus::DisplayStatus::OB_SERVER_INACTIVE;
}
}
if (OB_FAIL(records.push_back(record))) {
LOG_ERROR("records push error", K(ret), K(record));
}
}
LOG_INFO("query all server info", K(query_time_), K(start_index_), K(end_index_));
query_time_++;
return ret;
}
virtual int query_all_zone_info(AllZoneRecordArray &records)
{
int ret = OB_SUCCESS;
for (int64_t idx = 0; idx < 4; ++idx) {
AllZoneRecord record;
if (OB_FAIL(record.zone_.assign(zones[idx]))) {
LOG_ERROR("record assign zone error", K(ret), K(record));
} else if (OB_FAIL(record.region_.assign(regions[idx]))) {
LOG_ERROR("record assign error", K(ret), K(record));
} else if (OB_FAIL(records.push_back(record))) {
LOG_ERROR("records push error", K(ret), K(record));
}
}
return ret;
}
virtual int query_all_zone_type(AllZoneTypeRecordArray &records)
{
int ret = OB_SUCCESS;
for (int64_t idx = 0; idx < 4; ++idx) {
AllZoneTypeRecord record;
record.zone_type_ = str_to_zone_type(zone_types[idx]);
if (OB_FAIL(record.zone_.assign(zones[idx]))) {
LOG_ERROR("record assign zone error", K(ret), K(record));
} else if (OB_FAIL(records.push_back(record))) {
LOG_ERROR("records push error", K(ret), K(record));
}
}
return ret;
}
virtual int query_cluster_info(ClusterInfo &cluster_info)
{
UNUSED(cluster_info);
return 0;
}
virtual int query_cluster_min_observer_version(uint64_t &min_observer_version)
{
UNUSED(min_observer_version);
return 0;
}
virtual int reset_connection()
{
return 0;
}
virtual int query_timezone_info_version(const uint64_t tenant_id,
int64_t &timezone_info_version)
{
UNUSED(tenant_id);
UNUSED(timezone_info_version);
return 0;
}
public:
int64_t query_time_;
int64_t start_index_;
int64_t end_index_;
};
////////////////////// Test of basic functions //////////////////////////////////////////
TEST_F(TestObLogAllSvrCache, init)
{
generate_data();
ObLogAllSvrCache all_svr_cache;
MockSysTableHelper1 mock_systable_helper;
MockFetcherErrHandler1 err_handler;
// set update interval
all_svr_cache.set_update_interval_(ALLSVR_CACHE_UPDATE_INTERVAL);
EXPECT_EQ(OB_SUCCESS, all_svr_cache.init(mock_systable_helper, err_handler));
while (false == ATOMIC_LOAD(&mock_systable_helper.is_invariable_)) {
// do nothing
}
LOG_INFO("exit", K(mock_systable_helper.start_index_), K(mock_systable_helper.end_index_));
/// verify result
EXPECT_EQ(FIRST_QUERY_RECORD_COUNT, all_svr_cache.svr_map_.count());
int64_t end_index = SERVER_COUNT - 1;
int64_t start_index = end_index - FIRST_QUERY_RECORD_COUNT + 1;
// Test servers in the __all_server table
// Servers in the ACTIVE and DELETING states are serviceable
// Servers in the INACTIVE state are not serviceable
for (int64_t idx = start_index; idx <= end_index; idx++) {
ObAddr svr(ObAddr::IPV4, all_server_records[idx].svr_ip_, all_server_records[idx].svr_port_);
if (0 == idx % 4) {
// INACTIVE/ENCRYPTION ZONE
EXPECT_FALSE(all_svr_cache.is_svr_avail(svr));
} else {
// ACTIVE/DELETEING
EXPECT_TRUE(all_svr_cache.is_svr_avail(svr));
}
}
// test server not in __all_server table
for (int64_t idx = 0; idx < start_index; idx++) {
ObAddr svr(ObAddr::IPV4, all_server_records[idx].svr_ip_, all_server_records[idx].svr_port_);
EXPECT_FALSE(all_svr_cache.is_svr_avail(svr));
}
all_svr_cache.destroy();
}
// state change from active to inactive
TEST_F(TestObLogAllSvrCache, all_svr_cache2)
{
ObLogAllSvrCache all_svr_cache;
MockSysTableHelper2 mock_systable_helper;
MockFetcherErrHandler1 err_handler;
// No threads open, manual assignment
int ret = OB_SUCCESS;
if (OB_FAIL(all_svr_cache.svr_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) {
LOG_ERROR("init svr map fail", K(ret));
}
if (OB_FAIL(all_svr_cache.zone_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) {
LOG_ERROR("init svr map fail", K(ret));
}
all_svr_cache.cur_version_ = 0;
all_svr_cache.cur_zone_version_ = 0;
all_svr_cache.err_handler_ = &err_handler;
all_svr_cache.systable_helper_ = &mock_systable_helper;
// update __all_zone
EXPECT_EQ(OB_SUCCESS, all_svr_cache.update_zone_cache_());
// manual update and clearance
EXPECT_EQ(OB_SUCCESS, all_svr_cache.update_server_cache_());
EXPECT_EQ(OB_SUCCESS, all_svr_cache.purge_stale_records_());
/// verify result
EXPECT_EQ(FIRST_QUERY_RECORD_COUNT, all_svr_cache.svr_map_.count());
int64_t start_index = 0;
int64_t end_index = FIRST_QUERY_RECORD_COUNT - 1;
for (int64_t idx = start_index; idx <= end_index; idx++) {
ObAddr svr(ObAddr::IPV4, all_server_records[idx].svr_ip_, all_server_records[idx].svr_port_);
if (1 == idx % 4) {
EXPECT_TRUE(all_svr_cache.is_svr_avail(svr));
}
}
// Second manual update and clearance
EXPECT_EQ(OB_SUCCESS, all_svr_cache.update_server_cache_());
EXPECT_EQ(OB_SUCCESS, all_svr_cache.purge_stale_records_());
// Verify that it is ACTIVE-INACTIVE
for (int64_t idx = start_index; idx <= end_index; idx++) {
ObAddr svr(ObAddr::IPV4, all_server_records[idx].svr_ip_, all_server_records[idx].svr_port_);
if (1 == idx % 4) {
EXPECT_FALSE(all_svr_cache.is_svr_avail(svr));
}
}
all_svr_cache.destroy();
}
}//end of unittest
}//end of oceanbase
int main(int argc, char **argv)
{
// ObLogger::get_logger().set_mod_log_levels("ALL.*:DEBUG, TLOG.*:DEBUG");
// testing::InitGoogleTest(&argc,argv);
// testing::FLAGS_gtest_filter = "DO_NOT_RUN";
int ret = 1;
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_ob_log_all_svr_cache.log", true);
logger.set_log_level(OB_LOG_LEVEL_INFO);
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
return ret;
}