Files
oceanbase/mittest/simple_server/freeze/test_frequently_freeze.cpp
2024-09-20 09:08:56 +00:00

347 lines
12 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define USING_LOG_PREFIX STORAGE
#define protected public
#define private public
#include "env/ob_simple_cluster_test_base.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "storage/tx_storage/ob_ls_service.h"
#undef private
#undef protected
using namespace oceanbase::transaction;
using namespace oceanbase::storage;
using namespace oceanbase::unittest;
class TestRunCtx
{
public:
uint64_t tenant_id_ = 0;
int64_t time_sec_ = 0;
};
TestRunCtx RunCtx;
namespace oceanbase
{
namespace storage
{
class TestFrequentlyFreeze : public ObSimpleClusterTestBase
{
public:
TestFrequentlyFreeze() : ObSimpleClusterTestBase("test_frequently_freeze_") {}
void fill_data(const int64_t idx);
void async_tablet_freeze(const int64_t idx);
void sync_tablet_freeze(const int64_t idx);
void ls_freeze(const bool is_sync);
void check_async_freeze_tablets_empty();
void get_ls(const share::ObLSID &ls_id, ObLS *&ls);
};
#define EXE_SQL(sql_str) \
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
#define EXE_SQL_FMT(...) \
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
#define WRITE_SQL_BY_CONN(conn, sql_str) \
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
#define WRITE_SQL_FMT_BY_CONN(conn, ...) \
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
#define DEF_VAL_FOR_SQL \
int ret = OB_SUCCESS; \
ObSqlString sql; \
int64_t affected_rows = 0;
const int64_t SINGLE_TABLE_PARTITION_COUNT = 4000;
const int64_t TABLET_FREEZE_THREAD_COUNT = 10;
int64_t FIRST_TABLET_ID = 0;
int64_t LAST_TABLET_ID = 0;
void TestFrequentlyFreeze::get_ls(const share::ObLSID &ls_id, ObLS *&ls)
{
ls = nullptr;
ObLSIterator *ls_iter = nullptr;
ObLSHandle ls_handle;
ObLSService *ls_svr = MTL(ObLSService *);
ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
ASSERT_NE(nullptr, ls = ls_handle.get_ls());
}
void TestFrequentlyFreeze::fill_data(const int64_t idx)
{
DEF_VAL_FOR_SQL
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
sqlclient::ObISQLConnection *connection = nullptr;
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
ASSERT_NE(nullptr, connection);
WRITE_SQL_BY_CONN(connection, "set ob_trx_timeout = 3000000000");
WRITE_SQL_BY_CONN(connection, "set ob_trx_idle_timeout = 3000000000");
WRITE_SQL_BY_CONN(connection, "set ob_query_timeout = 3000000000");
WRITE_SQL_FMT_BY_CONN(connection, "create sequence seq_%ld cache 1000000 noorder;", idx);
STORAGE_LOG(INFO, "start fill data", K(idx));
for (int i = 0; i < 5; i++) {
fprintf(stdout, "start fill data. thread_idx = %ld, round = %d\n", idx, i);
WRITE_SQL_FMT_BY_CONN(
connection,
"Insert /*+ parallel(4) enable_parallel_dml */ into gl_t_%ld select seq_%ld.nextval, 'ob' from "
"table(generator(20000));",
idx,
idx);
fprintf(stdout, "finish fill data. thread_idx = %ld, round = %d\n", idx, i);
}
}
void TestFrequentlyFreeze::async_tablet_freeze(const int64_t idx)
{
ObLS *ls = nullptr;
(void)get_ls(share::ObLSID(1001), ls);
int64_t int_tablet_id_to_freeze = FIRST_TABLET_ID + idx;
fprintf(stdout, "async tablet freeze start. thread = %ld\n", idx);
STORAGE_LOG(INFO, "start async tablet freeze", K(idx));
while (int_tablet_id_to_freeze <= LAST_TABLET_ID) {
const bool is_sync = false;
ObTabletID tablet_to_freeze(int_tablet_id_to_freeze);
STORAGE_LOG(INFO, "start tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(tablet_to_freeze,
is_sync,
0,
false,
ObFreezeSourceFlag::TEST_MODE));
usleep(100 * 1000);
STORAGE_LOG(INFO, "finish tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
int_tablet_id_to_freeze += TABLET_FREEZE_THREAD_COUNT;
}
fprintf(stdout, "async tablet freeze finish. thread = %ld\n", idx);
STORAGE_LOG(INFO, "one async tablet freeze task finish", K(idx));
}
void TestFrequentlyFreeze::sync_tablet_freeze(const int64_t idx)
{
ObLS *ls = nullptr;
(void)get_ls(share::ObLSID(1001), ls);
ObTabletID tablet_to_freeze(FIRST_TABLET_ID + idx);
fprintf(stdout, "sync tablet freeze start. thread = %ld\n", idx);
STORAGE_LOG(INFO, "start sync tablet freeze", K(idx));
while (tablet_to_freeze.id() <= LAST_TABLET_ID) {
const bool is_sync = true;
const int64_t abs_timeout_ts = ObClockGenerator::getClock() + 600LL * 1000LL * 1000LL;
STORAGE_LOG(INFO, "start tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(tablet_to_freeze,
is_sync,
abs_timeout_ts,
false,
ObFreezeSourceFlag::TEST_MODE));
::sleep(2);
tablet_to_freeze = ObTabletID(tablet_to_freeze.id() + TABLET_FREEZE_THREAD_COUNT);
STORAGE_LOG(INFO, "finish tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
}
fprintf(stdout, "sync tablet freeze finish. thread = %ld\n", idx);
STORAGE_LOG(INFO, "one async tablet freeze task finish", K(idx));
}
void TestFrequentlyFreeze::ls_freeze(const bool is_sync)
{
ObLS *ls = nullptr;
(void)get_ls(share::ObLSID(1001), ls);
fprintf(stdout, "ls freeze start. is_sync = %d\n", is_sync);
STORAGE_LOG(INFO, "start ls freeze", K(is_sync));
const int64_t abs_timeout_ts = ObClockGenerator::getClock() + 600LL * 1000LL * 1000LL;
for (int i = 0; i < 4; i++) {
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(-1,
is_sync,
abs_timeout_ts,
ObFreezeSourceFlag::TEST_MODE));
sleep(200);
}
fprintf(stdout, "ls freeze finish. is_sync = %d\n", is_sync);
}
void TestFrequentlyFreeze::check_async_freeze_tablets_empty()
{
ObLS *ls = nullptr;
(void)get_ls(share::ObLSID(1001), ls);
bool async_freeze_finished = false;
int64_t max_retry_times = 100;
while (max_retry_times-- > 0) {
if (ls->get_freezer()->get_async_freeze_tablets().empty()) {
async_freeze_finished = true;
break;
}
sleep(1);
}
ASSERT_EQ(true, async_freeze_finished);
fprintf(stdout, "check async freeze tablets finish. \n");
}
TEST_F(TestFrequentlyFreeze, observer_start)
{
SERVER_LOG(INFO, "observer_start succ");
}
TEST_F(TestFrequentlyFreeze, add_tenant)
{
ASSERT_EQ(OB_SUCCESS, create_tenant());
ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_));
ASSERT_NE(0, RunCtx.tenant_id_);
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
}
TEST_F(TestFrequentlyFreeze, create_table)
{
DEF_VAL_FOR_SQL
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
sqlclient::ObISQLConnection *connection = nullptr;
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
ASSERT_NE(nullptr, connection);
WRITE_SQL_BY_CONN(connection, "set ob_trx_timeout = 3000000000");
WRITE_SQL_BY_CONN(connection, "set ob_trx_idle_timeout = 3000000000");
WRITE_SQL_BY_CONN(connection, "set ob_query_timeout = 3000000000");
FIRST_TABLET_ID = 200000 + 1;
WRITE_SQL_FMT_BY_CONN(
connection,
"create table gl_t_%d (sid int primary key, sname varchar(10)) partition by hash(sid) partitions %ld",
0,
SINGLE_TABLE_PARTITION_COUNT);
WRITE_SQL_FMT_BY_CONN(
connection,
"create table gl_t_%d (sid int primary key, sname varchar(10)) partition by hash(sid) partitions %ld",
1,
SINGLE_TABLE_PARTITION_COUNT);
WRITE_SQL_FMT_BY_CONN(
connection,
"create table gl_t_%d (sid int primary key, sname varchar(10)) partition by hash(sid) partitions %ld",
2,
SINGLE_TABLE_PARTITION_COUNT);
LAST_TABLET_ID = 200000 + SINGLE_TABLE_PARTITION_COUNT * 3;
}
/**
* 本case主要用于验证在超多Tablet且都存在少量写入的场景下触发频繁的冻结是否会有问题。
* 主要的流程为:
* 1. 创建了三个具有4000个分区的表,分区方式是主键的hash
* 2. 每个表都插入了少量数据,按主键递增的方式插入,插入行数为分区数的5倍,尽量保证每个分区都有数据
* 3. 启动了20个线程在插入数据的过程中执行tablet冻结,其中10个异步冻结,10个同步冻结。异步冻结会重复做10次,同步冻结只做了1次
* 总共预计是会执行13万次Tablet级冻结。
* 4. 启动了1个线程执行同步日志流冻结,每隔50秒一次,总计执行20次
* 5. 启动了1个线程执行异步日志流冻结,每隔50秒一次,总结执行20次
* 6. 最后确认ObFreezer中没有残留的待异步的冻结的分区
*/
TEST_F(TestFrequentlyFreeze, frequently_freeze)
{
int ret = OB_SUCCESS;
std::vector<std::thread> worker_threads;
for (int i = 0; i < 3; i++) {
worker_threads.push_back(std::thread([i, this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_) { this->fill_data(i); };
}));
}
::sleep(30);
for (int i = 0; i < TABLET_FREEZE_THREAD_COUNT; i++) {
worker_threads.push_back(std::thread([i, this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_)
{
for (int k = 0; k < 10; k++) {
this->async_tablet_freeze(i);
sleep(10);
}
};
}));
}
for (int i = 0; i < TABLET_FREEZE_THREAD_COUNT; i++) {
worker_threads.push_back(std::thread([i, this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_)
{
this->sync_tablet_freeze(i);
};
}));
}
worker_threads.push_back(std::thread([this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_) { this->ls_freeze(true); };
}));
sleep(100);
worker_threads.push_back(std::thread([this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_) { this->ls_freeze(false); };
}));
for (auto &my_thread : worker_threads) {
my_thread.join();
}
MTL_SWITCH(RunCtx.tenant_id_) { check_async_freeze_tablets_empty(); }
}
} // namespace storage
} // namespace oceanbase
int main(int argc, char **argv)
{
int64_t c = 0;
int64_t time_sec = 0;
char *log_level = (char*)"WDIAG";
while(EOF != (c = getopt(argc,argv,"t:l:"))) {
switch(c) {
case 't':
time_sec = atoi(optarg);
break;
case 'l':
log_level = optarg;
oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false;
break;
default:
break;
}
}
oceanbase::unittest::init_log_and_gtest(argc, argv);
OB_LOGGER.set_log_level(log_level);
LOG_INFO("main>>>");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}