check dup tablet set in trans and add dup table restart case
This commit is contained in:
parent
ffbb6c62fe
commit
c812b85881
@ -22,6 +22,7 @@ endfunction()
|
||||
|
||||
ob_unittest_multi_replica(test_ob_multi_replica_basic)
|
||||
ob_unittest_multi_replica(test_ob_dup_table_basic)
|
||||
ob_unittest_multi_replica(test_ob_dup_table_restart)
|
||||
ob_unittest_multi_replica(test_ob_dup_table_leader_switch)
|
||||
ob_unittest_multi_replica(test_ob_dup_table_tablet_gc)
|
||||
ob_unittest_multi_replica(test_mds_replay_from_ctx_table)
|
||||
|
@ -207,7 +207,7 @@ int ObMultiReplicaTestBase::wait_all_test_completed()
|
||||
int ret = OB_SUCCESS;
|
||||
std::string zone_str = "ZONE" + std::to_string(cur_zone_id_);
|
||||
if (OB_FAIL(finish_event(TEST_CASE_FINSH_EVENT_PREFIX + zone_str, zone_str))) {
|
||||
|
||||
SERVER_LOG(WARN, "write test finish event failed", K(ret), K(zone_str.c_str()));
|
||||
} else {
|
||||
for (int i = 1; i <= MAX_ZONE_COUNT && OB_SUCC(ret); i++) {
|
||||
zone_str = "ZONE" + std::to_string(i);
|
||||
@ -276,6 +276,13 @@ void ObMultiReplicaTestBase::TearDownTestCase()
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] TearDownTestCase");
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
const bool enable_failed_sleep = false;
|
||||
|
||||
int fail_cnt = ::testing::UnitTest::GetInstance()->failed_test_case_count();
|
||||
if (fail_cnt > 0 && enable_failed_sleep) {
|
||||
fprintf(stdout, "[SLEEP] FAIL %d TEST CASE, WAIT TO KILL", fail_cnt);
|
||||
usleep(30 * 60 * 1000 * 1000);
|
||||
}
|
||||
|
||||
// fprintf(stdout, ">>>>>>> AFTER RUN TEST: pid = %d\n", getpid());
|
||||
if (OB_FAIL(oceanbase::unittest::ObMultiReplicaTestBase::wait_all_test_completed())) {
|
||||
@ -285,7 +292,6 @@ void ObMultiReplicaTestBase::TearDownTestCase()
|
||||
// ret = close();
|
||||
// ASSERT_EQ(ret, OB_SUCCESS);
|
||||
}
|
||||
int fail_cnt = ::testing::UnitTest::GetInstance()->failed_test_case_count();
|
||||
if (chdir(exec_dir_.c_str()) == 0) {
|
||||
bool to_delete = true;
|
||||
if (to_delete) {
|
||||
@ -674,6 +680,8 @@ int ObMultiReplicaTestBase::finish_event(const std::string &event_name,
|
||||
fclose(fp);
|
||||
}
|
||||
|
||||
fprintf(stdout, "[WAIT EVENT] write target event : EVENT_KEY = %s; EVENT_VAL = %s\n",
|
||||
event_name.c_str(), event_content.c_str());
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] [WAIT EVENT] write target event",
|
||||
K(event_name.c_str()), K(event_content.c_str()));
|
||||
return ret;
|
||||
|
109
mittest/multi_replica/env/ob_multi_replica_util.h
vendored
109
mittest/multi_replica/env/ob_multi_replica_util.h
vendored
@ -172,6 +172,23 @@ namespace unittest
|
||||
ASSERT_EQ(res_ret, OB_SUCCESS); \
|
||||
}
|
||||
|
||||
#define RETRY_WRITE_SQL_UNTL_SUCC(conn, sql_str, timeout_us) \
|
||||
{ \
|
||||
int res_ret = OB_EAGAIN; \
|
||||
int64_t start_time = ObTimeUtility::fast_current_time(); \
|
||||
int64_t retry_cnt = 0; \
|
||||
while (OB_FAIL(res_ret)) { \
|
||||
if (ObTimeUtility::fast_current_time() - start_time > timeout_us) { \
|
||||
ret = OB_TIMEOUT; \
|
||||
break; \
|
||||
} \
|
||||
WRITE_SQL_BY_CONN_INNER(conn, sql_str, res_ret); \
|
||||
retry_cnt++; \
|
||||
} \
|
||||
SERVER_LOG(INFO, "retry to execute sql", K(ret), K(sql_str), K(retry_cnt), \
|
||||
K(start_time), K(timeout_us), K(ObTimeUtility::fast_current_time() - start_time)); \
|
||||
}
|
||||
|
||||
#define READ_SQL_BY_CONN(conn, result, sql_str) \
|
||||
sqlclient::ObMySQLResult *result = nullptr; \
|
||||
ObISQLClient::ReadResult read_res_##result; \
|
||||
@ -184,6 +201,21 @@ namespace unittest
|
||||
ASSERT_EQ(true, OB_NOT_NULL(result)); \
|
||||
}
|
||||
|
||||
#define READ_SQL_BY_CONN_WITH_RET(conn, result, sql_str, ret) \
|
||||
sqlclient::ObMySQLResult *result = nullptr; \
|
||||
ObISQLClient::ReadResult read_res_##result; \
|
||||
{ \
|
||||
ObSqlString sql; \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
||||
SERVER_LOG(INFO, "TEST READ SQL: ", K(sql)); \
|
||||
ret = conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), read_res_##result); \
|
||||
result = read_res_##result.get_result(); \
|
||||
if(OB_SUCC(ret)) {\
|
||||
ASSERT_EQ(true, OB_NOT_NULL(result)); \
|
||||
}\
|
||||
}
|
||||
|
||||
|
||||
#define GET_RUNNGING_TRX_ID(conn, tx_id) \
|
||||
{ \
|
||||
ASSERT_EQ(true, conn != nullptr); \
|
||||
@ -227,18 +259,71 @@ namespace unittest
|
||||
WRITE_SQL_BY_CONN(connection, "set ob_query_timeout = 3000000000"); \
|
||||
WRITE_SQL_BY_CONN(connection, "set autocommit=0");
|
||||
|
||||
#define RETRY_UNTIL_TIMEOUT(condition, timeout_us, retry_interval_us) \
|
||||
{ \
|
||||
int64_t start_time = ObTimeUtility::fast_current_time(); \
|
||||
while (!(condition)) { \
|
||||
if (ObTimeUtility::fast_current_time() - start_time > timeout_us) { \
|
||||
ret = OB_TIMEOUT; \
|
||||
break; \
|
||||
} \
|
||||
SERVER_LOG(INFO, "retry one time until timeout", K(condition), K(start_time), \
|
||||
K(timeout_us)); \
|
||||
ob_usleep(retry_interval_us); \
|
||||
} \
|
||||
#define RETRY_UNTIL_TIMEOUT(condition, timeout_us, retry_interval_us) \
|
||||
{ \
|
||||
int64_t start_time = ObTimeUtility::fast_current_time(); \
|
||||
while (OB_SUCC(ret) && !(condition)) { \
|
||||
if (ObTimeUtility::fast_current_time() - start_time > timeout_us) { \
|
||||
ret = OB_TIMEOUT; \
|
||||
break; \
|
||||
} \
|
||||
SERVER_LOG(INFO, "retry one time until timeout", K(condition), K(start_time), \
|
||||
K(timeout_us)); \
|
||||
ob_usleep(retry_interval_us); \
|
||||
} \
|
||||
SERVER_LOG(INFO, "retry to wait one condition successfully", K(condition), K(start_time), \
|
||||
K(timeout_us), K(ObTimeUtility::fast_current_time() - start_time)); \
|
||||
}
|
||||
|
||||
#define RETRY_OP_UNTIL_TIMEOUT(op, condition, timeout_us, retry_interval_us) \
|
||||
{ \
|
||||
int64_t start_time = ObTimeUtility::fast_current_time(); \
|
||||
op; \
|
||||
while (OB_SUCC(ret) && !(condition)) { \
|
||||
if (ObTimeUtility::fast_current_time() - start_time > timeout_us) { \
|
||||
ret = OB_TIMEOUT; \
|
||||
break; \
|
||||
} \
|
||||
SERVER_LOG(INFO, "retry opertion until timeout", K(condition), K(start_time), \
|
||||
K(timeout_us)); \
|
||||
ob_usleep(retry_interval_us); \
|
||||
op; \
|
||||
} \
|
||||
SERVER_LOG(INFO, "retry to opertion successfully", K(condition), K(start_time), K(timeout_us), \
|
||||
K(ObTimeUtility::fast_current_time() - start_time)); \
|
||||
}
|
||||
|
||||
#define WAIT_START_SERVICE_SUCCC(timeout_us, retry_interval_us) \
|
||||
{ \
|
||||
int64_t start_time = ObTimeUtility::fast_current_time(); \
|
||||
bool svr_started = false; \
|
||||
ASSERT_EQ(OB_SUCCESS, SVR_TRACER.check_in_service(replica_->get_addr(), svr_started)); \
|
||||
while (OB_SUCC(ret) && !(svr_started)) { \
|
||||
if (ObTimeUtility::fast_current_time() - start_time > timeout_us) { \
|
||||
ret = OB_TIMEOUT; \
|
||||
break; \
|
||||
} \
|
||||
SERVER_LOG(INFO, "retry to check service started until timeout", K(svr_started), \
|
||||
K(start_time), K(timeout_us)); \
|
||||
ob_usleep(retry_interval_us); \
|
||||
ASSERT_EQ(OB_SUCCESS, SVR_TRACER.refresh()); \
|
||||
ASSERT_EQ(OB_SUCCESS, SVR_TRACER.check_in_service(replica_->get_addr(), svr_started)); \
|
||||
} \
|
||||
SERVER_LOG(INFO, "start service successfully", K(svr_started), K(start_time), K(timeout_us), \
|
||||
K(ObTimeUtility::fast_current_time() - start_time)); \
|
||||
}
|
||||
|
||||
#define MINOR_FREEZE_LS(ls) \
|
||||
{ \
|
||||
TRANS_LOG(INFO, "minor freeze ls begin"); \
|
||||
share::ObTenantSwitchGuard tenant_guard; \
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(ls->get_tenant_id())); \
|
||||
oceanbase::storage::checkpoint::ObCheckpointExecutor *checkpoint_executor = \
|
||||
ls->get_checkpoint_executor(); \
|
||||
ASSERT_NE(nullptr, checkpoint_executor); \
|
||||
ASSERT_EQ(OB_SUCCESS, \
|
||||
checkpoint_executor->advance_checkpoint_by_flush(share::SCN::max_scn())); \
|
||||
TRANS_LOG(INFO, "minor freeze ls end"); \
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
439
mittest/multi_replica/test_ob_dup_table_restart.cpp
Normal file
439
mittest/multi_replica/test_ob_dup_table_restart.cpp
Normal file
@ -0,0 +1,439 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#define USING_LOG_PREFIX SERVER
|
||||
#define protected public
|
||||
#define private public
|
||||
|
||||
#include "env/ob_fast_bootstrap.h"
|
||||
#include "env/ob_multi_replica_util.h"
|
||||
#include "lib/mysqlclient/ob_mysql_result.h"
|
||||
#include "storage/tx/ob_dup_table_tablets.h"
|
||||
#include "storage/tx/ob_trans_part_ctx.h"
|
||||
#include "storage/tx/ob_tx_replay_executor.h"
|
||||
|
||||
using namespace oceanbase::transaction;
|
||||
using namespace oceanbase::storage;
|
||||
|
||||
#define CUR_TEST_CASE_NAME ObDupTableRestartCase
|
||||
|
||||
DEFINE_MULTI_ZONE_TEST_CASE_CLASS
|
||||
|
||||
APPEND_RESTART_TEST_CASE_CLASS(2, 1);
|
||||
|
||||
MULTI_REPLICA_TEST_MAIN_FUNCTION(test_dup_table_restart_);
|
||||
|
||||
#define DEFAULT_LOAD_ROW_CNT 30
|
||||
|
||||
bool STOP_TABLET_LOG_SUBMIT = false;
|
||||
bool STOP_PREPARE_LOG_SUBMIT = false;
|
||||
bool STOP_TX_REPLAY = false;
|
||||
|
||||
OB_NOINLINE int oceanbase::transaction::ObLSDupTabletsMgr::process_prepare_ser_err_test_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (STOP_TABLET_LOG_SUBMIT) {
|
||||
ret = OB_LOG_TOO_LARGE;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
SERVER_LOG(INFO, "process prepare ser errsim test", K(ret), K(STOP_TABLET_LOG_SUBMIT));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_NOINLINE int oceanbase::transaction::ObPartTransCtx::errism_submit_prepare_log_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (STOP_PREPARE_LOG_SUBMIT) {
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
SERVER_LOG(INFO, "submit prepare log errsim test", K(ret), K(STOP_PREPARE_LOG_SUBMIT));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_NOINLINE int oceanbase::transaction::ObTxReplayExecutor::errsim_tx_replay_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (STOP_TX_REPLAY) {
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
SERVER_LOG(INFO, "stop tx replay errsim test", K(ret), K(STOP_TX_REPLAY));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace unittest
|
||||
{
|
||||
|
||||
struct DupTableBasicArg
|
||||
{
|
||||
uint64_t tenant_id_;
|
||||
int64_t ls_id_num_;
|
||||
int64_t table_id_;
|
||||
int64_t tablet_count_;
|
||||
ObSEArray<int64_t, 10> tablet_id_array_;
|
||||
|
||||
TO_STRING_KV(K(tenant_id_), K(ls_id_num_), K(table_id_), K(tablet_count_), K(tablet_id_array_));
|
||||
|
||||
OB_UNIS_VERSION(1);
|
||||
};
|
||||
|
||||
OB_SERIALIZE_MEMBER(DupTableBasicArg,
|
||||
tenant_id_,
|
||||
ls_id_num_,
|
||||
table_id_,
|
||||
tablet_count_,
|
||||
tablet_id_array_);
|
||||
|
||||
static DupTableBasicArg static_basic_arg_;
|
||||
sqlclient::ObISQLConnection *static_test_conn_ = nullptr;
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(1), create_test_env)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
CREATE_TEST_TENANT(test_tenant_id);
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] create test tenant success", K(test_tenant_id));
|
||||
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
WRITE_SQL_BY_CONN(test_conn,
|
||||
"CREATE TABLE test_t1( "
|
||||
"id_x int, "
|
||||
"id_y int, "
|
||||
"id_z int, "
|
||||
"PRIMARY KEY(id_x)"
|
||||
") duplicate_scope='cluster' PARTITION BY hash(id_x) partitions 10;");
|
||||
|
||||
std::string primary_zone_sql = "ALTER TENANT " + std::string(DEFAULT_TEST_TENANT_NAME)
|
||||
+ " set primary_zone='zone1; zone3; zone2';";
|
||||
WRITE_SQL_BY_CONN(test_conn, primary_zone_sql.c_str());
|
||||
|
||||
READ_SQL_BY_CONN(test_conn, table_info_result,
|
||||
"select table_id, duplicate_scope from "
|
||||
"oceanbase.__all_table where table_name = 'test_t1' ");
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->next());
|
||||
int64_t table_id;
|
||||
int64_t dup_scope;
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->get_int("table_id", table_id));
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->get_int("duplicate_scope", dup_scope));
|
||||
ASSERT_EQ(true, table_id > 0);
|
||||
ASSERT_EQ(true, dup_scope != 0);
|
||||
|
||||
std::string tablet_count_sql =
|
||||
"select count(*), ls_id from oceanbase.__all_tablet_to_ls where table_id = "
|
||||
+ std::to_string(table_id) + " group by ls_id order by count(*)";
|
||||
READ_SQL_BY_CONN(test_conn, tablet_count_result, tablet_count_sql.c_str());
|
||||
int64_t tablet_count = 0;
|
||||
int64_t ls_id_num = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_count_result->next());
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_count_result->get_int("count(*)", tablet_count));
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_count_result->get_int("ls_id", ls_id_num));
|
||||
ASSERT_EQ(10, tablet_count);
|
||||
ASSERT_EQ(true, share::ObLSID(ls_id_num).is_valid());
|
||||
|
||||
std::string tablet_id_sql = "select tablet_id from oceanbase.__all_tablet_to_ls where table_id = "
|
||||
+ std::to_string(table_id)
|
||||
+ " and ls_id = " + std::to_string(ls_id_num);
|
||||
READ_SQL_BY_CONN(test_conn, tablet_id_reult, tablet_id_sql.c_str());
|
||||
while (OB_SUCC(tablet_id_reult->next())) {
|
||||
int64_t id = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_id_reult->get_int("tablet_id", id));
|
||||
ASSERT_EQ(true, ObTabletID(id).is_valid());
|
||||
ASSERT_EQ(OB_SUCCESS, static_basic_arg_.tablet_id_array_.push_back(id));
|
||||
}
|
||||
ASSERT_EQ(tablet_count, static_basic_arg_.tablet_id_array_.count());
|
||||
ASSERT_EQ(OB_ITER_END, ret);
|
||||
ret = OB_SUCCESS;
|
||||
|
||||
GET_LS(test_tenant_id, ls_id_num, ls_handle);
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] -------- before wait dup tablet discover", K(ret),
|
||||
K(ls_id_num), K(tablet_count));
|
||||
RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->dup_table_ls_handler_.get_dup_tablet_count()
|
||||
== tablet_count,
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
RETRY_UNTIL_TIMEOUT(
|
||||
ls_handle.get_ls()->dup_table_ls_handler_.tablets_mgr_ptr_->get_readable_tablet_set_count()
|
||||
>= 1,
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
RETRY_UNTIL_TIMEOUT(
|
||||
ls_handle.get_ls()
|
||||
->dup_table_ls_handler_.tablets_mgr_ptr_->get_need_confirm_tablet_set_count()
|
||||
== 0,
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] -------- after wait dup tablet discover", K(ret),
|
||||
K(ls_id_num), K(ls_handle.get_ls()->dup_table_ls_handler_.get_dup_tablet_count()));
|
||||
ASSERT_EQ(OB_SUCCESS, ret /*has_dup_tablet*/);
|
||||
|
||||
WRITE_SQL_BY_CONN(test_conn, "set autocommit = false;");
|
||||
WRITE_SQL_BY_CONN(test_conn, "begin;");
|
||||
|
||||
for (int i = 1; i <= DEFAULT_LOAD_ROW_CNT; i++) {
|
||||
std::string insert_sql_str = "INSERT INTO test_t1 VALUES(" + std::to_string(i) + ", 0 , 0)";
|
||||
WRITE_SQL_BY_CONN(test_conn, insert_sql_str.c_str());
|
||||
}
|
||||
|
||||
// int64_t tx_id_num;
|
||||
// GET_RUNNGING_TRX_ID(test_conn, tx_id_num);
|
||||
|
||||
WRITE_SQL_BY_CONN(test_conn, "commit;");
|
||||
|
||||
int64_t row_cnt = 0;
|
||||
READ_SQL_BY_CONN(test_conn, read_insert_result, "select count(*) from test_t1;");
|
||||
ASSERT_EQ(OB_SUCCESS, read_insert_result->next());
|
||||
ASSERT_EQ(OB_SUCCESS, read_insert_result->get_int("count(*)", row_cnt));
|
||||
ASSERT_EQ(row_cnt, DEFAULT_LOAD_ROW_CNT);
|
||||
|
||||
ASSERT_EQ(STOP_TABLET_LOG_SUBMIT, false);
|
||||
STOP_TABLET_LOG_SUBMIT = true;
|
||||
|
||||
// DupTableBasicArg static_basic_arg_;
|
||||
static_basic_arg_.tenant_id_ = test_tenant_id;
|
||||
static_basic_arg_.ls_id_num_ = ls_id_num;
|
||||
static_basic_arg_.table_id_ = table_id;
|
||||
static_basic_arg_.tablet_count_ = tablet_count;
|
||||
|
||||
std::string tmp_str;
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
EventArgSerTool<DupTableBasicArg>::serialize_arg(static_basic_arg_, tmp_str));
|
||||
finish_event("CREATE_DUP_TABLE", tmp_str);
|
||||
}
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(2), minor_ls_for_ckpt)
|
||||
{
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
std::string tmp_event_val;
|
||||
ASSERT_EQ(OB_SUCCESS, wait_event_finish("CREATE_DUP_TABLE", tmp_event_val, 30 * 60 * 1000));
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
EventArgSerTool<DupTableBasicArg>::deserialize_arg(static_basic_arg_, tmp_event_val));
|
||||
|
||||
GET_LS(static_basic_arg_.tenant_id_, static_basic_arg_.ls_id_num_, ls_handle);
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] -------- before wait dup tablet discover", K(ret),
|
||||
K(static_basic_arg_));
|
||||
RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->dup_table_ls_handler_.get_dup_tablet_count()
|
||||
== static_basic_arg_.tablet_count_,
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
RETRY_UNTIL_TIMEOUT(
|
||||
ls_handle.get_ls()->dup_table_ls_handler_.tablets_mgr_ptr_->get_readable_tablet_set_count()
|
||||
>= 1,
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
RETRY_UNTIL_TIMEOUT(
|
||||
ls_handle.get_ls()
|
||||
->dup_table_ls_handler_.tablets_mgr_ptr_->get_need_confirm_tablet_set_count()
|
||||
== 0,
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] -------- after wait dup tablet discover", K(ret),
|
||||
K(static_basic_arg_));
|
||||
ASSERT_EQ(OB_SUCCESS, ret /*has_dup_tablet*/);
|
||||
|
||||
share::SCN dup_table_max_applying_scn =
|
||||
ls_handle.get_ls()->dup_table_ls_handler_.interface_stat_.dup_table_max_applying_scn_;
|
||||
|
||||
ASSERT_EQ(true, dup_table_max_applying_scn.is_valid_and_not_min());
|
||||
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] before minor freeze ls", K(ret),
|
||||
K(dup_table_max_applying_scn), K(ls_handle.get_ls()->get_clog_checkpoint_scn()));
|
||||
|
||||
MINOR_FREEZE_LS(ls_handle.get_ls());
|
||||
RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->get_clog_checkpoint_scn() > dup_table_max_applying_scn,
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] after minor freeze ls", K(ret),
|
||||
K(dup_table_max_applying_scn), K(ls_handle.get_ls()->get_clog_checkpoint_scn()));
|
||||
|
||||
ASSERT_EQ(restart_zone(2, 1), OB_SUCCESS);
|
||||
}
|
||||
|
||||
bool refresh_and_check_ls_ts_info(ObLS *ls, const share::SCN &applied_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObAddr target_addr;
|
||||
target_addr.set_ip_addr(ObMultiReplicaTestBase::local_ip_.c_str(),
|
||||
ObMultiReplicaTestBase::rpc_ports_[2]);
|
||||
// ls->dup_table_ls_handler_.ts_sync_mgr_ptr_->update_all_ts_info_cache();
|
||||
oceanbase::transaction::DupTableTsInfo ts_info;
|
||||
ret = ls->dup_table_ls_handler_.ts_sync_mgr_ptr_->get_cache_ts_info(target_addr, ts_info);
|
||||
TRANS_LOG(INFO, "refresh and check ts info", K(ret), K(applied_scn), K(target_addr), K(ts_info));
|
||||
return ts_info.max_replayed_scn_ >= applied_scn;
|
||||
}
|
||||
|
||||
TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(2, 1), become_leader_after_restart)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
std::string tmp_event_val;
|
||||
ASSERT_EQ(OB_SUCCESS, wait_event_finish("CREATE_DUP_TABLE", tmp_event_val, 30 * 60 * 1000));
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
EventArgSerTool<DupTableBasicArg>::deserialize_arg(static_basic_arg_, tmp_event_val));
|
||||
|
||||
// ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
// common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
// ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
WAIT_START_SERVICE_SUCCC(30 * 1000 * 1000, 100 * 1000);
|
||||
|
||||
share::ObTenantSwitchGuard tenant_guard;
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(static_basic_arg_.tenant_id_));
|
||||
GET_LS(static_basic_arg_.tenant_id_, static_basic_arg_.ls_id_num_, ls_handle);
|
||||
RETRY_UNTIL_TIMEOUT(!ls_handle.get_ls()->dup_table_ls_handler_.is_master(), 20 * 1000 * 1000,
|
||||
100 * 1000);
|
||||
|
||||
std::string ls_id_str = std::to_string(static_basic_arg_.ls_id_num_);
|
||||
std::string target_ip = local_ip_ + ":" + std::to_string(rpc_ports_[1]);
|
||||
|
||||
std::string switch_leader_sql = "alter system switch replica leader ls=" + ls_id_str + " server='"
|
||||
+ target_ip + "' tenant='tt1';";
|
||||
|
||||
common::ObMySQLProxy &sys_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy();
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(sys_conn, sys_tenant_sql_proxy);
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
ASSERT_EQ(STOP_TABLET_LOG_SUBMIT, false);
|
||||
STOP_TABLET_LOG_SUBMIT = true;
|
||||
|
||||
// WRITE_SQL_BY_CONN(sys_conn, switch_leader_sql.c_str());
|
||||
RETRY_WRITE_SQL_UNTL_SUCC(sys_conn, switch_leader_sql.c_str(), 30 * 1000 * 1000);
|
||||
RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->dup_table_ls_handler_.is_master(), 20 * 1000 * 1000,
|
||||
100 * 1000);
|
||||
|
||||
int64_t update_tx_id = 0;
|
||||
WRITE_SQL_BY_CONN(test_conn, "begin;");
|
||||
std::string update_sql_str = "UPDATE test_t1 set id_z = 999 where id_x = 1";
|
||||
WRITE_SQL_BY_CONN(test_conn, update_sql_str.c_str());
|
||||
|
||||
GET_TX_ID_FROM_SQL_AUDIT(test_conn, update_sql_str, update_tx_id);
|
||||
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] execute update sql after zone2 restarted", K(ret),
|
||||
K(update_tx_id), K(update_sql_str.c_str()));
|
||||
|
||||
ASSERT_EQ(STOP_PREPARE_LOG_SUBMIT, false);
|
||||
STOP_PREPARE_LOG_SUBMIT = true;
|
||||
|
||||
transaction::ObPartTransCtx *tx_ctx = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
ls_handle.get_ls()->get_tx_ctx(transaction::ObTransID(update_tx_id), false, tx_ctx));
|
||||
share::ObLSArray fake_parts;
|
||||
ASSERT_EQ(OB_SUCCESS, fake_parts.push_back(share::ObLSID(static_basic_arg_.ls_id_num_)));
|
||||
tx_ctx->set_2pc_participants_(fake_parts);
|
||||
tx_ctx->submit_redo_commit_info_log_();
|
||||
RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 20 * 1000 * 1000, 100 * 1000);
|
||||
const share::SCN max_applied_scn = tx_ctx->exec_info_.max_applying_log_ts_;
|
||||
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] commit info log has been synced", K(max_applied_scn),
|
||||
KPC(tx_ctx));
|
||||
|
||||
bool res_bool = false;
|
||||
RETRY_OP_UNTIL_TIMEOUT(res_bool =
|
||||
refresh_and_check_ls_ts_info(ls_handle.get_ls(), max_applied_scn),
|
||||
res_bool, 20 * 1000 * 1000, 100 * 1000);
|
||||
|
||||
finish_event("RESTART_ZONE2_TO_BE_LEADER", "");
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, wait_event_finish("DISABLE_ZONE3_REPLAY", tmp_event_val, 30 * 60 * 1000));
|
||||
|
||||
// WRITE_SQL_BY_CONN(test_conn, "commit;");
|
||||
ASSERT_EQ(STOP_PREPARE_LOG_SUBMIT, true);
|
||||
STOP_PREPARE_LOG_SUBMIT = false;
|
||||
|
||||
RETRY_UNTIL_TIMEOUT(tx_ctx->ctx_tx_data_.get_state() == ObTxCommitData::COMMIT, 20 * 1000 * 1000,
|
||||
100 * 1000);
|
||||
RETRY_UNTIL_TIMEOUT(tx_ctx->is_exiting(), 20 * 1000 * 1000, 100 * 1000);
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
|
||||
finish_event("UPDATE_ON_ZONE2", "");
|
||||
}
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(3), select_after_dup_table_update)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
ObDupTableLSLeaseMgr::DEFAULT_LEASE_INTERVAL = ObDupTableLSLeaseMgr::LEASE_UNIT * 600 * 20;
|
||||
std::string tmp_event_val;
|
||||
ASSERT_EQ(OB_SUCCESS, wait_event_finish("CREATE_DUP_TABLE", tmp_event_val, 30 * 60 * 1000));
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
EventArgSerTool<DupTableBasicArg>::deserialize_arg(static_basic_arg_, tmp_event_val));
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
wait_event_finish("RESTART_ZONE2_TO_BE_LEADER", tmp_event_val, 30 * 60 * 1000));
|
||||
|
||||
GET_LS(static_basic_arg_.tenant_id_, static_basic_arg_.ls_id_num_, ls_handle);
|
||||
|
||||
// ASSERT_EQ(ls_handle.get_ls()->disable_replay(), OB_SUCCESS);
|
||||
ASSERT_EQ(STOP_TX_REPLAY, false);
|
||||
STOP_TX_REPLAY = true;
|
||||
|
||||
finish_event("DISABLE_ZONE3_REPLAY", "");
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
WRITE_SQL_BY_CONN(test_conn, "set ob_query_timeout = 60*1000*1000");
|
||||
|
||||
int64_t select_tx_id = 0;
|
||||
int64_t id_Z = 0;
|
||||
std::string select_sql_str = "SELECT id_z from test_t1 where id_x = 1";
|
||||
|
||||
// RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->dup_table_ls_handler_.get_dup_tablet_count()
|
||||
// == static_basic_arg_.tablet_count_,
|
||||
// 20 * 1000 * 1000, 100 * 1000);
|
||||
ASSERT_EQ(ls_handle.get_ls()->dup_table_ls_handler_.is_dup_table_lease_valid(), true);
|
||||
READ_SQL_BY_CONN_WITH_RET(test_conn, tmp_select_result, select_sql_str.c_str(), ret);
|
||||
GET_TX_ID_FROM_SQL_AUDIT(test_conn, select_sql_str, select_tx_id);
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] execute select sql after zone2 restarted", K(ret),
|
||||
K(select_tx_id), K(select_sql_str.c_str()));
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
ASSERT_EQ(OB_TIMEOUT, ret);
|
||||
} else {
|
||||
ASSERT_EQ(OB_SUCCESS, tmp_select_result->next());
|
||||
ASSERT_EQ(OB_SUCCESS, tmp_select_result->get_int("id_z", id_Z));
|
||||
ASSERT_EQ(id_Z, 999);
|
||||
ASSERT_EQ(ret != OB_SUCCESS, true);
|
||||
}
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, wait_event_finish("UPDATE_ON_ZONE2", tmp_event_val, 30 * 60 * 1000));
|
||||
|
||||
// ASSERT_EQ(ls_handle.get_ls()->enable_replay(), OB_SUCCESS);
|
||||
ASSERT_EQ(STOP_TX_REPLAY, true);
|
||||
STOP_TX_REPLAY = false;
|
||||
|
||||
READ_SQL_BY_CONN_WITH_RET(test_conn, select_result, select_sql_str.c_str(), ret);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(ls_handle.get_ls()->dup_table_ls_handler_.is_dup_table_lease_valid(), true);
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, select_result->next());
|
||||
ASSERT_EQ(OB_SUCCESS, select_result->get_int("id_z", id_Z));
|
||||
|
||||
ASSERT_EQ(id_Z, 999);
|
||||
}
|
||||
|
||||
} // namespace unittest
|
||||
} // namespace oceanbase
|
@ -742,6 +742,7 @@ int ObDupTableLogOperator::sync_log_succ_(const bool for_replay)
|
||||
bool contain_all_readable = false;
|
||||
int64_t logging_readable_cnt = 0;
|
||||
const int64_t all_readable_set_cnt = tablet_mgr_ptr_->get_readable_tablet_set_count();
|
||||
const share::SCN cur_sync_succ_scn = logging_scn_;
|
||||
ObDupTableLSCheckpoint::ObLSDupTableMeta tmp_dup_ls_meta;
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -800,13 +801,14 @@ int ObDupTableLogOperator::sync_log_succ_(const bool for_replay)
|
||||
}
|
||||
|
||||
if (lease_log_sync_cost_time + tablet_log_sync_cost_time + ckpt_update_cost_time > 500 * 1000) {
|
||||
DUP_TABLE_LOG(INFO, "sync log succ cost too much time", K(ret), K(logging_scn_),
|
||||
DUP_TABLE_LOG(INFO, "sync log succ cost too much time", K(ret), K(cur_sync_succ_scn),
|
||||
K(logging_lease_addrs_.count()), K(logging_tablet_set_ids_.count()), K(stat_log_),
|
||||
K(start_sync_time), K(lease_log_sync_cost_time), K(tablet_log_sync_cost_time),
|
||||
K(ckpt_update_cost_time));
|
||||
}
|
||||
|
||||
if (OB_NOT_NULL(interface_stat_ptr_)) {
|
||||
interface_stat_ptr_->dup_table_max_applying_scn_.inc_update(cur_sync_succ_scn);
|
||||
interface_stat_ptr_->dup_table_lease_log_sync_total_time_ += lease_log_sync_cost_time;
|
||||
interface_stat_ptr_->dup_table_tablet_log_sync_total_time_ += tablet_log_sync_cost_time;
|
||||
}
|
||||
|
@ -72,6 +72,8 @@ struct DupTableInterfaceStat
|
||||
int64_t dup_table_redo_sync_succ_cnt_;
|
||||
int64_t dup_table_redo_sync_fail_cnt_;
|
||||
|
||||
share::SCN dup_table_max_applying_scn_;
|
||||
|
||||
int64_t dup_table_log_entry_cnt_;
|
||||
int64_t dup_table_log_entry_total_size_;
|
||||
|
||||
@ -87,6 +89,8 @@ struct DupTableInterfaceStat
|
||||
dup_table_follower_read_tablet_not_ready_cnt_ = 0;
|
||||
dup_table_follower_read_lease_expired_cnt_ = 0;
|
||||
|
||||
dup_table_max_applying_scn_.set_min();
|
||||
|
||||
dup_table_redo_sync_succ_cnt_ = 0;
|
||||
dup_table_redo_sync_fail_cnt_ = 0;
|
||||
|
||||
@ -105,6 +109,7 @@ struct DupTableInterfaceStat
|
||||
K(dup_table_follower_read_lease_expired_cnt_),
|
||||
K(dup_table_redo_sync_succ_cnt_),
|
||||
K(dup_table_redo_sync_fail_cnt_),
|
||||
K(dup_table_max_applying_scn_),
|
||||
K(dup_table_log_entry_cnt_),
|
||||
K(dup_table_log_entry_total_size_),
|
||||
K(dup_table_log_replay_total_time_),
|
||||
|
@ -24,7 +24,7 @@ namespace transaction
|
||||
{
|
||||
|
||||
const int64_t ObDupTableLSLeaseMgr::LEASE_UNIT = ObDupTableLoopWorker::LOOP_INTERVAL;
|
||||
const int64_t ObDupTableLSLeaseMgr::DEFAULT_LEASE_INTERVAL = ObDupTableLSLeaseMgr::LEASE_UNIT * 60;
|
||||
int64_t ObDupTableLSLeaseMgr::DEFAULT_LEASE_INTERVAL = ObDupTableLSLeaseMgr::LEASE_UNIT * 60;
|
||||
const int64_t ObDupTableLSLeaseMgr::MIN_LEASE_INTERVAL = ObDupTableLSLeaseMgr::LEASE_UNIT * 60;
|
||||
|
||||
int ObDupTableLSLeaseMgr::init(ObDupTableLSHandler *dup_ls_handle)
|
||||
@ -570,6 +570,9 @@ bool ObDupTableLSLeaseMgr::is_follower_lease_valid()
|
||||
|
||||
SpinRLockGuard guard(lease_lock_);
|
||||
is_follower_lease = follower_lease_info_.lease_expired_ts_ > ObTimeUtility::current_time();
|
||||
if (!is_follower_lease) {
|
||||
DUP_TABLE_LOG(INFO, DUP_TABLET_LIFE_PREFIX "lease is expired", K(follower_lease_info_));
|
||||
}
|
||||
|
||||
return is_follower_lease;
|
||||
}
|
||||
@ -822,11 +825,15 @@ int ObDupTableLSLeaseMgr::GetLeaseValidAddrFunctor::operator()(
|
||||
+----------------------------------------------------+
|
||||
*/
|
||||
|
||||
if (hash_pair.second.lease_expired_ts_ > cur_time_
|
||||
// include a granted logging lease
|
||||
|| hash_pair.second.cache_lease_req_.is_ready()) {
|
||||
if (MTL(ObTransService *)->get_server() == hash_pair.first) {
|
||||
DUP_TABLE_LOG(INFO, "we need not push back self into lease valid array", K(ret),
|
||||
K(hash_pair.first), K(hash_pair.second));
|
||||
} else if (hash_pair.second.lease_expired_ts_ > cur_time_
|
||||
// include a granted logging lease
|
||||
|| hash_pair.second.cache_lease_req_.is_ready()) {
|
||||
if (OB_FAIL(addr_arr_.push_back(hash_pair.first))) {
|
||||
DUP_TABLE_LOG(WARN, "push back lease valid array failed", K(ret));
|
||||
DUP_TABLE_LOG(WARN, "push back lease valid array failed", K(ret), K(hash_pair.first),
|
||||
K(hash_pair.second));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,7 @@ public:
|
||||
|
||||
public:
|
||||
static const int64_t LEASE_UNIT;
|
||||
static const int64_t DEFAULT_LEASE_INTERVAL;
|
||||
static int64_t DEFAULT_LEASE_INTERVAL;
|
||||
static const int64_t MIN_LEASE_INTERVAL;
|
||||
|
||||
TO_STRING_KV(K(leader_lease_map_.size()), K(follower_lease_info_));
|
||||
|
@ -1112,6 +1112,23 @@ int ObLSDupTabletsMgr::prepare_serialize_src_set_with_related_set_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
#ifdef ERRSIM
|
||||
ERRSIM_POINT_DEF(EN_DUP_TABLE_LOG_PREPARE_SERIALIZE)
|
||||
#endif
|
||||
|
||||
OB_NOINLINE int ObLSDupTabletsMgr::process_prepare_ser_err_test_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
#ifdef ERRSIM
|
||||
ret = EN_DUP_TABLE_LOG_PREPARE_SERIALIZE;
|
||||
#endif
|
||||
|
||||
DUP_TABLE_LOG(INFO, "errsim prepare serialize err test", K(ret),K(ls_id_));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSDupTabletsMgr::prepare_serialize(int64_t &max_ser_size,
|
||||
DupTabletSetIDArray &unique_id_array,
|
||||
const int64_t max_log_buf_len)
|
||||
@ -1123,6 +1140,12 @@ int ObLSDupTabletsMgr::prepare_serialize(int64_t &max_ser_size,
|
||||
|
||||
unique_id_array.reuse();
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(process_prepare_ser_err_test_())) {
|
||||
DUP_TABLE_LOG(WARN, "errsim for dup tablet log prepare serialize", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
bool can_be_confirmed = true;
|
||||
DLIST_FOREACH(cur_map, need_confirm_new_queue_)
|
||||
|
@ -798,6 +798,8 @@ private:
|
||||
};
|
||||
|
||||
private:
|
||||
int process_prepare_ser_err_test_();
|
||||
|
||||
int lose_dup_tablet_(const common::ObTabletID &tablet_id);
|
||||
int discover_dup_tablet_(const common::ObTabletID &tablet_id, const int64_t refresh_time);
|
||||
int collect_confirmed_dup_tablet_(const share::SCN &max_replayed_scn);
|
||||
|
@ -321,7 +321,7 @@ int ObDupTableLSTsSyncMgr::get_ts_info_cache_(const common::ObAddr &addr, DupTab
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(ts_info_cache_.get_refactored(addr, ts_info))) {
|
||||
DUP_TABLE_LOG(WARN, "get ts info cache failed");
|
||||
DUP_TABLE_LOG(WARN, "get ts info cache failed", K(addr), K(ts_info));
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -119,135 +119,6 @@ int ObDupTabletScanTask::refresh_dup_tablet_schema_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDupTabletScanTask::execute_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int iter_ret = OB_SUCCESS;
|
||||
|
||||
ObSharedGuard<ObLSIterator> ls_iter_guard;
|
||||
ObLSIterator *ls_iter_ptr = nullptr;
|
||||
ObLS *cur_ls_ptr = nullptr;
|
||||
TabletIDArray tablet_id_array;
|
||||
ObTenantDupTabletSchemaHelper::TabletIDSet tenant_dup_tablet_set;
|
||||
bool need_refreh_dup_schema = true;
|
||||
share::ObLSStatusInfo dup_ls_status_info;
|
||||
|
||||
// compute scan task max execute interval
|
||||
const int64_t cur_time = ObTimeUtility::fast_current_time();
|
||||
if (cur_time - last_execute_time_ > 0) {
|
||||
if (0 != last_execute_time_) {
|
||||
max_execute_interval_ = max(max_execute_interval_, cur_time - last_execute_time_);
|
||||
last_execute_time_ = cur_time;
|
||||
} else {
|
||||
last_execute_time_ = ObTimeUtility::fast_current_time();
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_ISNULL(MTL(ObLSService *)) || OB_ISNULL(dup_loop_worker_)
|
||||
|| (OB_FAIL(MTL(ObLSService *)->get_ls_iter(ls_iter_guard, ObLSGetMod::TRANS_MOD))
|
||||
|| !ls_iter_guard.is_valid())) {
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
}
|
||||
DUP_TABLE_LOG(WARN, "invalid arguments", K(ret));
|
||||
} else if (OB_ISNULL(ls_iter_ptr = ls_iter_guard.get_ptr())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
DUP_TABLE_LOG(WARN, "invalid arguments", K(ret));
|
||||
} else {
|
||||
iter_ret = OB_SUCCESS;
|
||||
cur_ls_ptr = nullptr;
|
||||
// const int64_t gc_time = ObTimeUtility::fast_current_time();
|
||||
while (OB_SUCCESS == (iter_ret = ls_iter_ptr->get_next(cur_ls_ptr))) {
|
||||
tablet_id_array.reset();
|
||||
|
||||
ObRole ls_role;
|
||||
int64_t unused_proposal_id;
|
||||
|
||||
if (OB_ISNULL(cur_ls_ptr)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
DUP_TABLE_LOG(WARN, "invalid ls ptr", K(ret), KP(cur_ls_ptr));
|
||||
} else if (!cur_ls_ptr->get_dup_table_ls_handler()->is_master()) {
|
||||
// do nothing
|
||||
DUP_TABLE_LOG(DEBUG, "ls not leader", K(cur_ls_ptr->get_ls_id()));
|
||||
} else if (OB_FAIL(refresh_dup_tablet_schema_(need_refreh_dup_schema, tenant_dup_tablet_set, dup_ls_status_info))) {
|
||||
DUP_TABLE_LOG(INFO, "refresh dup table schema failed", K(ret));
|
||||
} else if (OB_FALSE_IT(need_refreh_dup_schema = false)) {
|
||||
// do nothing
|
||||
} else {
|
||||
// TODO
|
||||
// Only need all tablet_ids in LS.
|
||||
// No need to get tx data from tablet_meta
|
||||
storage::ObHALSTabletIDIterator ls_tablet_id_iter(cur_ls_ptr->get_ls_id(), true);
|
||||
if (OB_FAIL(cur_ls_ptr->build_tablet_iter(ls_tablet_id_iter))) {
|
||||
DUP_TABLE_LOG(WARN, "build ls tablet iter failed", K(cur_ls_ptr->get_ls_id()));
|
||||
} else if (!ls_tablet_id_iter.is_valid()) {
|
||||
DUP_TABLE_LOG(WARN, "invalid tablet id iterator", K(cur_ls_ptr->get_ls_id()));
|
||||
} else {
|
||||
ObTabletID tmp_tablet_id;
|
||||
bool is_dup_tablet = false;
|
||||
int64_t refresh_time = ObTimeUtility::fast_current_time();
|
||||
while (OB_SUCC(ls_tablet_id_iter.get_next_tablet_id(tmp_tablet_id))) {
|
||||
is_dup_tablet = false;
|
||||
ret = tenant_dup_tablet_set.exist_refactored(tmp_tablet_id);
|
||||
if (OB_HASH_EXIST == ret) {
|
||||
is_dup_tablet = true;
|
||||
ret = OB_SUCCESS;
|
||||
} else if (OB_HASH_NOT_EXIST == ret) {
|
||||
is_dup_tablet = false;
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
DUP_TABLE_LOG(
|
||||
WARN, "Failed to check whether the tablet exists in the tenant_dup_tablet_set",
|
||||
K(ret), K(cur_ls_ptr->get_ls_id()), K(tmp_tablet_id));
|
||||
}
|
||||
|
||||
if (!cur_ls_ptr->get_dup_table_ls_handler()->is_inited() && !is_dup_tablet) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(cur_ls_ptr->get_dup_table_ls_handler()->init(is_dup_tablet))
|
||||
&& OB_INIT_TWICE != ret) {
|
||||
DUP_TABLE_LOG(WARN, "init dup tablet ls handler", K(ret));
|
||||
} else if (OB_FAIL(cur_ls_ptr->get_dup_table_ls_handler()->refresh_dup_table_tablet(
|
||||
tmp_tablet_id, is_dup_tablet, refresh_time))) {
|
||||
if (is_dup_tablet || OB_NOT_INIT != ret) {
|
||||
DUP_TABLE_LOG(WARN, "refresh ls dup table tablets failed", K(ret), K(tmp_tablet_id),
|
||||
K(is_dup_tablet));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_ITER_END == ret) {
|
||||
// ret = OB_SUCCESS;
|
||||
if (OB_FAIL(cur_ls_ptr->get_dup_table_ls_handler()->gc_temporary_dup_tablets(
|
||||
refresh_time, max_execute_interval_))) {
|
||||
DUP_TABLE_LOG(WARN, "ls gc dup_tablet failed", KR(ret), K(refresh_time),
|
||||
K(max_execute_interval_));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// refresh dup_table_ls on leader and follower
|
||||
|
||||
if (!cur_ls_ptr->get_dup_table_ls_handler()->has_dup_tablet()) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(dup_loop_worker_->append_dup_table_ls(cur_ls_ptr->get_ls_id()))) {
|
||||
DUP_TABLE_LOG(WARN, "refresh dup_table ls failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DUP_TABLE_LOG(INFO, "scan all ls to find dup_tablet", KR(ret), K(tenant_dup_tablet_set.size()));
|
||||
if (tenant_dup_tablet_set.created()) {
|
||||
tenant_dup_tablet_set.destroy();
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
DUP_TABLE_LOG(WARN, "scan all ls to find dup_tablet failed", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDupTabletScanTask::execute_for_dup_ls_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1140,6 +1011,13 @@ bool ObDupTableLSHandler::is_dup_table_lease_valid()
|
||||
is_dup_lease_ls = false;
|
||||
}
|
||||
|
||||
DUP_TABLE_LOG(DEBUG,
|
||||
"is dup table lease valid",
|
||||
KP(lease_mgr_ptr_),
|
||||
K(ls_state_helper_.is_leader()),
|
||||
K(has_dup_tablet()),
|
||||
K(is_dup_lease_ls));
|
||||
|
||||
return is_dup_lease_ls;
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,6 @@ public:
|
||||
uint64_t hash() const { return tenant_id_; }
|
||||
|
||||
private:
|
||||
int execute_();
|
||||
int execute_for_dup_ls_();
|
||||
int refresh_dup_tablet_schema_(bool need_refresh,
|
||||
ObTenantDupTabletSchemaHelper::TabletIDSet &tenant_dup_tablet_set,
|
||||
|
@ -6439,7 +6439,7 @@ int ObPartTransCtx::errism_dup_table_redo_sync_()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::errism_submit_prepare_log_()
|
||||
OB_NOINLINE int ObPartTransCtx::errism_submit_prepare_log_()
|
||||
{
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -288,7 +288,7 @@ bool ObLSTxLogAdapter::has_dup_tablet()
|
||||
if (OB_ISNULL(dup_table_ls_handler_)) {
|
||||
has_dup = false;
|
||||
} else {
|
||||
has_dup = dup_table_ls_handler_->has_dup_tablet();
|
||||
has_dup = dup_table_ls_handler_->check_tablet_set_exist();
|
||||
}
|
||||
return has_dup;
|
||||
}
|
||||
|
@ -224,10 +224,30 @@ int ObTxReplayExecutor::do_replay_(const char *buf,
|
||||
return ret;
|
||||
}
|
||||
|
||||
#ifdef ERRSIM
|
||||
ERRSIM_POINT_DEF(EN_TX_REPLAY)
|
||||
#endif
|
||||
|
||||
OB_NOINLINE int ObTxReplayExecutor::errsim_tx_replay_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
#ifdef ERRSIM
|
||||
ret = EN_TX_REPLAY;
|
||||
#endif
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(INFO, "errsim tx replay in observer", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxReplayExecutor::prepare_replay_(const char *buf, const int64_t &size, const int skip_pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(log_block_.init(buf, size, skip_pos, log_block_header_))) {
|
||||
if (OB_FAIL(errsim_tx_replay_())) {
|
||||
TRANS_LOG(WARN, "errsim for tx replay", K(ret), K(log_ts_ns_), K(lsn_));
|
||||
} else if (OB_FAIL(log_block_.init(buf, size, skip_pos, log_block_header_))) {
|
||||
TRANS_LOG(ERROR, "TxLogBlock init error", K(log_block_), K(log_block_header_));
|
||||
}
|
||||
return ret;
|
||||
|
@ -94,6 +94,8 @@ private:
|
||||
int before_replay_redo_();
|
||||
void finish_replay_(const int retcode);
|
||||
|
||||
int errsim_tx_replay_();
|
||||
|
||||
int replay_redo_();
|
||||
int replay_rollback_to_();
|
||||
int replay_active_info_();
|
||||
|
Loading…
x
Reference in New Issue
Block a user