[CP] Prohibit dup table local read on a election expired leader
This commit is contained in:
parent
a4096506d4
commit
552307f1e1
@ -35,6 +35,7 @@ ob_unittest_multi_replica(test_ob_dup_table_restart)
|
||||
ob_unittest_multi_replica(test_ob_dup_table_leader_switch)
|
||||
ob_unittest_multi_replica(test_ob_dup_table_tablet_gc)
|
||||
ob_unittest_multi_replica(test_ob_dup_table_new_gc)
|
||||
ob_unittest_multi_replica(test_max_commit_ts_read_from_dup_table)
|
||||
ob_unittest_multi_replica(test_mds_replay_from_ctx_table)
|
||||
ob_unittest_multi_replica_longer_timeout(test_multi_transfer_tx)
|
||||
ob_unittest_multi_replica(test_ob_direct_load_inc_log)
|
||||
|
@ -580,7 +580,7 @@ int ObMultiReplicaTestBase::init_test_replica_(const int zone_id)
|
||||
int ObMultiReplicaTestBase::read_cur_json_document_(rapidjson::Document &json_doc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
FILE *fp = fopen(event_file_path_.c_str(), "r");
|
||||
FILE *fp = fopen(event_file_path_.c_str(), "rb");
|
||||
if (fp == NULL) {
|
||||
if (json_doc.IsObject()) {
|
||||
fprintf(stdout, "Fail to open file! file_path = %s\n", event_file_path_.c_str());
|
||||
@ -589,11 +589,18 @@ int ObMultiReplicaTestBase::read_cur_json_document_(rapidjson::Document &json_do
|
||||
return ret;
|
||||
}
|
||||
|
||||
char read_buffer[2 * 1024 * 1024];
|
||||
char read_buffer[4 * 1024];
|
||||
rapidjson::FileReadStream rs(fp, read_buffer, sizeof(read_buffer));
|
||||
|
||||
json_doc.ParseStream(rs);
|
||||
|
||||
if (json_doc.HasParseError()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "[ObMultiReplicaTestBase] Parse EVENT JSON ERROR", K(ret),
|
||||
K(json_doc.GetParseError()));
|
||||
fprintf(stdout, "Parse Event Json Error\n");
|
||||
}
|
||||
|
||||
fclose(fp);
|
||||
|
||||
return OB_SUCCESS;
|
||||
@ -641,7 +648,7 @@ int ObMultiReplicaTestBase::wait_event_finish(const std::string &event_name,
|
||||
ret = OB_TIMEOUT;
|
||||
break;
|
||||
} else {
|
||||
ob_usleep(retry_interval_ms * 1000);
|
||||
usleep(retry_interval_ms * 1000);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
@ -668,7 +675,7 @@ int ObMultiReplicaTestBase::finish_event(const std::string &event_name,
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
FILE *fp = fopen(event_file_path_.c_str(), "w");
|
||||
char write_buffer[2 * 1024 * 1024];
|
||||
char write_buffer[4 * 1024];
|
||||
rapidjson::FileWriteStream file_w_stream(fp, write_buffer, sizeof(write_buffer));
|
||||
rapidjson::PrettyWriter<rapidjson::FileWriteStream> prettywriter(file_w_stream);
|
||||
json_doc.AddMember(rapidjson::StringRef(event_name.c_str(), event_name.size()),
|
||||
@ -678,8 +685,8 @@ int ObMultiReplicaTestBase::finish_event(const std::string &event_name,
|
||||
fclose(fp);
|
||||
}
|
||||
|
||||
fprintf(stdout, "[WAIT EVENT] write target event : EVENT_KEY = %s; EVENT_VAL = %s\n",
|
||||
event_name.c_str(), event_content.c_str());
|
||||
fprintf(stdout, "[WAIT EVENT] write target event : EVENT_KEY = %s; EVENT_VAL = %s\n",
|
||||
event_name.c_str(), event_content.c_str());
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] [WAIT EVENT] write target event",
|
||||
K(event_name.c_str()), K(event_content.c_str()));
|
||||
return ret;
|
||||
@ -897,7 +904,7 @@ int ::oceanbase::omt::ObWorkerProcessor::process_err_test()
|
||||
|
||||
if (ATOMIC_LOAD(&::oceanbase::unittest::ObMultiReplicaTestBase::block_msg_)) {
|
||||
ret = OB_EAGAIN;
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] block msg process", K(ret));
|
||||
SERVER_LOG(INFO, "[ERRSIM] block msg process", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -72,6 +72,7 @@ public:
|
||||
observer::ObSimpleServerReplica &get_curr_simple_server() { return *replica_; }
|
||||
|
||||
static int read_cur_json_document_(rapidjson::Document & json_doc);
|
||||
|
||||
static int wait_event_finish(const std::string &event_name,
|
||||
std::string &event_content,
|
||||
int64_t wait_timeout_ms,
|
||||
|
@ -234,23 +234,26 @@ namespace unittest
|
||||
common::ObString trace_id; \
|
||||
common::ObString query_sql; \
|
||||
int64_t request_time = 0; \
|
||||
int64_t snapshot = 0; \
|
||||
int64_t ret_code = OB_SUCCESS; \
|
||||
int64_t retry_cnt = 0; \
|
||||
ASSERT_EQ(true, conn != nullptr); \
|
||||
std::string sql_str = \
|
||||
"select TX_ID, TRACE_ID, REQUEST_TIME, RET_CODE, RETRY_CNT, QUERY_SQL from " \
|
||||
"oceanbase.V$OB_SQL_AUDIT where QUERY_SQL like " \
|
||||
+ std::string(" \"") + std::string(sql) + std::string("\" order by REQUEST_TIME DESC"); \
|
||||
std::string sql_str = "select TX_ID, SNAPSHOT_VERSION, TRACE_ID, REQUEST_TIME, RET_CODE, " \
|
||||
"RETRY_CNT, QUERY_SQL from " \
|
||||
"oceanbase.V$OB_SQL_AUDIT where QUERY_SQL like " \
|
||||
+ std::string(" \"") + std::string(sql) \
|
||||
+ std::string("\" order by REQUEST_TIME DESC"); \
|
||||
READ_SQL_BY_CONN(conn, process_result, sql_str.c_str()); \
|
||||
ASSERT_EQ(OB_SUCCESS, process_result->next()); \
|
||||
ASSERT_EQ(OB_SUCCESS, process_result->get_int("TX_ID", tx_id)); \
|
||||
ASSERT_EQ(OB_SUCCESS, process_result->get_int("SNAPSHOT_VERSION", snapshot)); \
|
||||
ASSERT_EQ(OB_SUCCESS, process_result->get_varchar("TRACE_ID", trace_id)); \
|
||||
ASSERT_EQ(OB_SUCCESS, process_result->get_int("REQUEST_TIME", request_time)); \
|
||||
ASSERT_EQ(OB_SUCCESS, process_result->get_int("RET_CODE", ret_code)); \
|
||||
ASSERT_EQ(OB_SUCCESS, process_result->get_int("RETRY_CNT", retry_cnt)); \
|
||||
ASSERT_EQ(OB_SUCCESS, process_result->get_varchar("QUERY_SQL", query_sql)); \
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] query sql_audit for tx_id", K(trace_id), K(tx_id), \
|
||||
K(request_time), K(ret_code), K(retry_cnt), K(query_sql)); \
|
||||
K(snapshot), K(request_time), K(ret_code), K(retry_cnt), K(query_sql)); \
|
||||
}
|
||||
|
||||
#define PREPARE_CONN_ENV(conn) \
|
||||
@ -408,6 +411,76 @@ public:
|
||||
storage::ObLS *ls_;
|
||||
};
|
||||
|
||||
class TestEnvTool
|
||||
{
|
||||
public:
|
||||
static void create_table_for_test_env(sqlclient::ObISQLConnection *test_conn,
|
||||
const std::string table_name,
|
||||
const int64_t part_num,
|
||||
bool is_dup_table,
|
||||
int64_t &table_ls_id_num,
|
||||
int64_t &table_id,
|
||||
ObSEArray<int64_t, 10> &tablet_id_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
const std::string dup_scope_arg = "duplicate_scope='cluster'";
|
||||
|
||||
const std::string part_str = " PARTITION BY hash(id_x) partitions " + std::to_string(part_num);
|
||||
|
||||
std::string create_table_sql =
|
||||
"CREATE TABLE " + table_name + " (" + "id_x int , id_y int, id_z int, PRIMARY KEY(id_x))";
|
||||
|
||||
if (is_dup_table) {
|
||||
create_table_sql += dup_scope_arg;
|
||||
}
|
||||
|
||||
if (part_num > 0) {
|
||||
create_table_sql += part_str;
|
||||
}
|
||||
|
||||
WRITE_SQL_BY_CONN(test_conn, create_table_sql.c_str());
|
||||
|
||||
const std::string select_table_id_str = "select table_id, duplicate_scope from "
|
||||
"oceanbase.__all_table where table_name = '"
|
||||
+ table_name + "' ";
|
||||
READ_SQL_BY_CONN(test_conn, table_info_result, select_table_id_str.c_str());
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->next());
|
||||
// int64_t table_id;
|
||||
int64_t dup_scope;
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->get_int("table_id", table_id));
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->get_int("duplicate_scope", dup_scope));
|
||||
ASSERT_EQ(true, table_id > 0);
|
||||
ASSERT_EQ(is_dup_table, dup_scope != 0);
|
||||
|
||||
std::string tablet_count_sql =
|
||||
"select count(*), ls_id from oceanbase.__all_tablet_to_ls where table_id = "
|
||||
+ std::to_string(table_id) + " group by ls_id order by count(*)";
|
||||
READ_SQL_BY_CONN(test_conn, tablet_count_result, tablet_count_sql.c_str());
|
||||
int64_t tablet_count = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_count_result->next());
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_count_result->get_int("count(*)", tablet_count));
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_count_result->get_int("ls_id", table_ls_id_num));
|
||||
ASSERT_EQ(part_num, tablet_count);
|
||||
ASSERT_EQ(true, share::ObLSID(table_ls_id_num).is_valid());
|
||||
|
||||
std::string tablet_id_sql =
|
||||
"select tablet_id from oceanbase.__all_tablet_to_ls where table_id = "
|
||||
+ std::to_string(table_id) + " and ls_id = " + std::to_string(table_ls_id_num);
|
||||
READ_SQL_BY_CONN(test_conn, tablet_id_reult, tablet_id_sql.c_str());
|
||||
while (OB_SUCC(tablet_id_reult->next())) {
|
||||
int64_t id = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_id_reult->get_int("tablet_id", id));
|
||||
ASSERT_EQ(true, ObTabletID(id).is_valid());
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_id_array.push_back(id));
|
||||
}
|
||||
ASSERT_EQ(tablet_count, tablet_id_array.count());
|
||||
ASSERT_EQ(OB_ITER_END, ret);
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace unittest
|
||||
} // namespace oceanbase
|
||||
|
||||
|
432
mittest/multi_replica/test_max_commit_ts_read_from_dup_table.cpp
Normal file
432
mittest/multi_replica/test_max_commit_ts_read_from_dup_table.cpp
Normal file
@ -0,0 +1,432 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#define USING_LOG_PREFIX SERVER
|
||||
#define protected public
|
||||
#define private public
|
||||
|
||||
#include "env/ob_fast_bootstrap.h"
|
||||
#include "env/ob_multi_replica_util.h"
|
||||
#include "lib/mysqlclient/ob_mysql_result.h"
|
||||
#include "storage/tx/ob_dup_table_lease.h"
|
||||
#include "storage/tx/ob_tx_loop_worker.h"
|
||||
#include "storage/tx/ob_tx_replay_executor.h"
|
||||
|
||||
using namespace oceanbase::transaction;
|
||||
using namespace oceanbase::storage;
|
||||
|
||||
#define CUR_TEST_CASE_NAME ObDupTableMaxCommitTsRead
|
||||
|
||||
DEFINE_MULTI_ZONE_TEST_CASE_CLASS
|
||||
|
||||
MULTI_REPLICA_TEST_MAIN_FUNCTION(test_max_commit_ts_read_from_dup_table);
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace transaction
|
||||
{
|
||||
|
||||
static bool STOP_TX_REPLAY = false;
|
||||
static bool BLOCK_DUP_TABLE_LEADER_REVOKE = false;
|
||||
static bool RETURN_NULL_GTS_CACHE = false;
|
||||
static sqlclient::ObISQLConnection *static_conn = nullptr;
|
||||
static sqlclient::ObMySQLResult *static_result = nullptr;
|
||||
static int64_t final_row_count = 0;
|
||||
|
||||
int ObTxReplayExecutor::errsim_tx_replay_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (STOP_TX_REPLAY) {
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(INFO, "[ERRSIM] errsim tx replay in test", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDupTableLSHandler::errsim_leader_revoke_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
while (BLOCK_DUP_TABLE_LEADER_REVOKE) {
|
||||
usleep(1000 * 1000);
|
||||
TRANS_LOG(INFO, "[ERRSIM] errsim wait leader revoke", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace transaction
|
||||
|
||||
namespace unittest
|
||||
{
|
||||
|
||||
using namespace oceanbase::transaction;
|
||||
using namespace oceanbase::storage;
|
||||
|
||||
struct TableBasicArg
|
||||
{
|
||||
uint64_t tenant_id_;
|
||||
|
||||
int64_t dup_ls_id_num_;
|
||||
int64_t dup_table_id_;
|
||||
ObSEArray<int64_t, 10> dup_tablet_id_array_;
|
||||
|
||||
int64_t normal_ls_id_num_;
|
||||
int64_t normal_table_id_;
|
||||
ObSEArray<int64_t, 10> normal_tablet_id_array_;
|
||||
|
||||
TO_STRING_KV(K(tenant_id_),
|
||||
K(dup_ls_id_num_),
|
||||
K(dup_table_id_),
|
||||
K(normal_ls_id_num_),
|
||||
K(normal_table_id_),
|
||||
K(dup_tablet_id_array_),
|
||||
K(normal_tablet_id_array_));
|
||||
|
||||
OB_UNIS_VERSION(1);
|
||||
};
|
||||
|
||||
OB_SERIALIZE_MEMBER(TableBasicArg,
|
||||
tenant_id_,
|
||||
dup_ls_id_num_,
|
||||
dup_table_id_,
|
||||
dup_tablet_id_array_,
|
||||
normal_ls_id_num_,
|
||||
normal_table_id_,
|
||||
normal_tablet_id_array_);
|
||||
|
||||
static TableBasicArg static_basic_arg_;
|
||||
|
||||
const std::string test_dup_table_name = "test_dup_1";
|
||||
const std::string test_normal_table_name = "test_normal_1";
|
||||
const int64_t DEFAULT_LOAD_ROW_CNT = 10;
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(1), create_test_env)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
CREATE_TEST_TENANT(test_tenant_id);
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] create test tenant success", K(test_tenant_id));
|
||||
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(sys_conn, get_curr_simple_server().get_sql_proxy());
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system set _private_buffer_size = '1B';");
|
||||
std::string ls_id_str = std::to_string(1);
|
||||
std::string target_ip = local_ip_ + ":" + std::to_string(rpc_ports_[1]);
|
||||
std::string switch_leader_sql = "alter system switch replica leader ls=" + ls_id_str + " server='"
|
||||
+ target_ip + "' tenant='tt1';";
|
||||
|
||||
WRITE_SQL_BY_CONN(sys_conn, switch_leader_sql.c_str());
|
||||
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
std::string primary_zone_sql = "ALTER TENANT " + std::string(DEFAULT_TEST_TENANT_NAME)
|
||||
+ " set primary_zone='zone1; zone2; zone3';";
|
||||
WRITE_SQL_BY_CONN(test_conn, primary_zone_sql.c_str());
|
||||
|
||||
unittest::TestEnvTool::create_table_for_test_env(
|
||||
test_conn, test_dup_table_name.c_str(), 10, true /*is_dup_table*/,
|
||||
static_basic_arg_.dup_ls_id_num_, static_basic_arg_.dup_table_id_,
|
||||
static_basic_arg_.dup_tablet_id_array_);
|
||||
|
||||
unittest::TestEnvTool::create_table_for_test_env(
|
||||
test_conn, test_normal_table_name.c_str(), 10, false /*is_dup_table*/,
|
||||
static_basic_arg_.normal_ls_id_num_, static_basic_arg_.normal_table_id_,
|
||||
static_basic_arg_.normal_tablet_id_array_);
|
||||
|
||||
GET_LS(test_tenant_id, static_basic_arg_.dup_ls_id_num_, ls_handle);
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] -------- before wait dup tablet discover", K(ret),
|
||||
K(static_basic_arg_));
|
||||
RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->dup_table_ls_handler_.get_dup_tablet_count()
|
||||
== static_basic_arg_.dup_tablet_id_array_.count(),
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
RETRY_UNTIL_TIMEOUT(
|
||||
ls_handle.get_ls()->dup_table_ls_handler_.tablets_mgr_ptr_->get_readable_tablet_set_count()
|
||||
>= 1,
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
RETRY_UNTIL_TIMEOUT(
|
||||
ls_handle.get_ls()
|
||||
->dup_table_ls_handler_.tablets_mgr_ptr_->get_need_confirm_tablet_set_count()
|
||||
== 0,
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] -------- after wait dup tablet discover", K(ret),
|
||||
K(static_basic_arg_),
|
||||
K(ls_handle.get_ls()->dup_table_ls_handler_.get_dup_tablet_count()));
|
||||
ASSERT_EQ(OB_SUCCESS, ret /*has_dup_tablet*/);
|
||||
|
||||
WRITE_SQL_BY_CONN(test_conn, "set autocommit = false;");
|
||||
WRITE_SQL_BY_CONN(test_conn, "begin;");
|
||||
|
||||
for (int i = 1; i <= DEFAULT_LOAD_ROW_CNT; i++) {
|
||||
std::string insert_dup_sql_str =
|
||||
"INSERT INTO " + test_dup_table_name + " VALUES(" + std::to_string(i) + ", 0 , 0)";
|
||||
std::string insert_normal_sql_str =
|
||||
"INSERT INTO " + test_normal_table_name + " VALUES(" + std::to_string(i) + ", 0 , 0)";
|
||||
WRITE_SQL_BY_CONN(test_conn, insert_dup_sql_str.c_str());
|
||||
WRITE_SQL_BY_CONN(test_conn, insert_normal_sql_str.c_str());
|
||||
}
|
||||
WRITE_SQL_BY_CONN(test_conn, "commit;");
|
||||
|
||||
static_basic_arg_.tenant_id_ = test_tenant_id;
|
||||
std::string tmp_str;
|
||||
ASSERT_EQ(OB_SUCCESS, EventArgSerTool<TableBasicArg>::serialize_arg(static_basic_arg_, tmp_str));
|
||||
finish_event("CREATE_TEST_TABLE", tmp_str);
|
||||
}
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(2), normal_follower_max_commit_ts_read)
|
||||
{
|
||||
std::string tmp_event_val;
|
||||
ASSERT_EQ(OB_SUCCESS, wait_event_finish("CREATE_TEST_TABLE", tmp_event_val, 30 * 60 * 1000));
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
EventArgSerTool<TableBasicArg>::deserialize_arg(static_basic_arg_, tmp_event_val));
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
|
||||
std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name;
|
||||
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str());
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->next());
|
||||
|
||||
int64_t dup_table_row_count = 0;
|
||||
const int64_t col_index = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->get_int(col_index, dup_table_row_count));
|
||||
ASSERT_EQ(dup_table_row_count, DEFAULT_LOAD_ROW_CNT);
|
||||
|
||||
finish_event("NORMAL_FOLLOWER_LOCAL_READ", "");
|
||||
}
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(1), switch_follwer_forcedly_to_zone2_and_stop_replay)
|
||||
{
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
std::string tmp_event_val;
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
wait_event_finish("NORMAL_FOLLOWER_LOCAL_READ", tmp_event_val, 30 * 60 * 1000));
|
||||
// refresh location cache
|
||||
{
|
||||
std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name;
|
||||
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str());
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->next());
|
||||
}
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
wait_event_finish("PREPARE_TO_UPDATE_ON_NEW_LEADER", tmp_event_val, 30 * 60 * 1000));
|
||||
STOP_TX_REPLAY = true;
|
||||
BLOCK_DUP_TABLE_LEADER_REVOKE = true;
|
||||
block_msg_ = true;
|
||||
|
||||
// usleep(6*1000*1000);
|
||||
finish_event("STOP_ZONE1", "");
|
||||
}
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(2), update_on_new_leader)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
std::string tmp_event_val;
|
||||
|
||||
finish_event("PREPARE_TO_UPDATE_ON_NEW_LEADER", "");
|
||||
ASSERT_EQ(OB_SUCCESS, wait_event_finish("STOP_ZONE1", tmp_event_val, 30 * 60 * 1000));
|
||||
|
||||
{
|
||||
std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name;
|
||||
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str());
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->next());
|
||||
}
|
||||
|
||||
{
|
||||
GET_LS(static_basic_arg_.tenant_id_, static_basic_arg_.dup_ls_id_num_, ls_handle);
|
||||
RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->dup_table_ls_handler_.is_master(), 20 * 1000 * 1000,
|
||||
100 * 1000);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->ls_tx_svr_.mgr_->in_leader_serving_state(),
|
||||
20 * 1000 * 1000, 100 * 1000);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
}
|
||||
|
||||
sleep(2);
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
{
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
WRITE_SQL_BY_CONN(test_conn, "set autocommit = false;");
|
||||
WRITE_SQL_BY_CONN(test_conn, "set ob_trx_timeout = 1000000000;")
|
||||
|
||||
WRITE_SQL_BY_CONN(test_conn, "begin;");
|
||||
|
||||
int64_t tmp_tx_id = 0;
|
||||
for (int i = DEFAULT_LOAD_ROW_CNT + 1; i <= DEFAULT_LOAD_ROW_CNT + 3; i++) {
|
||||
std::string insert_dup_sql_str =
|
||||
"INSERT INTO " + test_dup_table_name + " VALUES(" + std::to_string(i) + ", 0 , 0)";
|
||||
// std::string insert_normal_sql_str =
|
||||
// "INSERT INTO " + test_normal_table_name + " VALUES(" + std::to_string(i) + ", 0 , 0)";
|
||||
WRITE_SQL_BY_CONN(test_conn, insert_dup_sql_str.c_str());
|
||||
// GET_TX_ID_FROM_SQL_AUDIT(test_conn, insert_dup_sql_str, tmp_tx_id);
|
||||
// WRITE_SQL_BY_CONN(test_conn, insert_normal_sql_str.c_str());
|
||||
}
|
||||
|
||||
// GET_RUNNGING_TRX_ID(test_conn, tmp_tx_id)
|
||||
WRITE_SQL_BY_CONN(test_conn, "commit;");
|
||||
}
|
||||
|
||||
std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name;
|
||||
{
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str());
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->next());
|
||||
|
||||
int64_t dup_table_row_count = 0;
|
||||
const int64_t col_index = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->get_int(col_index, dup_table_row_count));
|
||||
ASSERT_EQ(dup_table_row_count, DEFAULT_LOAD_ROW_CNT + 3);
|
||||
}
|
||||
|
||||
finish_event("UPDATE_ON_NEW_LEADER", "");
|
||||
// ob_abort();
|
||||
}
|
||||
|
||||
void read_in_new_thread()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name
|
||||
+ " where id_x=" + std::to_string(DEFAULT_LOAD_ROW_CNT + 2);
|
||||
int64_t tmp_tx_id = 0;
|
||||
{
|
||||
share::ObTenantSwitchGuard tenant_guard;
|
||||
ObTsSourceInfoGuard info_guard;
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(static_basic_arg_.tenant_id_));
|
||||
MTL(ObTxLoopWorker *)->stop();
|
||||
usleep(1 * 1000 * 1000);
|
||||
|
||||
// share::SCN max_commit_ts_5_min =
|
||||
// share::SCN::minus(MTL(ObTransService *)->get_tx_version_mgr().get_max_commit_ts(false),
|
||||
// 3 * 1000 * 1000 * 1000L);
|
||||
// TRANS_LOG(INFO, "[ObMultiReplicaTestBase] print max commit ts", K(ret),
|
||||
// K(MTL(ObTransService *)->get_tx_version_mgr().get_max_commit_ts(false)),
|
||||
// K(max_commit_ts_5_min));
|
||||
// MTL(ObTransService *)->get_tx_version_mgr().max_commit_ts_ = max_commit_ts_5_min;
|
||||
// ((ObTsMgr*)(MTL(ObTransService
|
||||
// *)->ts_mgr_))->get_ts_source_info_opt_(static_basic_arg_.tenant_id_, info_guard, true, true);
|
||||
// int64_t tmp_gts = 0 ;
|
||||
// ASSERT_EQ(OB_SUCCESS, max_commit_ts_5_min.convert_for_gts(tmp_gts));
|
||||
// info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.gts_ =
|
||||
// max_commit_ts_5_min.get_val_for_gts();
|
||||
// info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.srr_ =
|
||||
// MonotonicTs::current_time();
|
||||
// info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.srr_.mts_ +=
|
||||
// 1*1000*1000;
|
||||
|
||||
RETURN_NULL_GTS_CACHE = true;
|
||||
const int64_t col_index = 0;
|
||||
READ_SQL_BY_CONN(static_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str());
|
||||
if (OB_FAIL(table_info_result->next())) {
|
||||
|
||||
TRANS_LOG(WARN, "[ObMultiReplicaTestBase] get next in new thread failed", K(ret),
|
||||
K(final_row_count));
|
||||
final_row_count = -1;
|
||||
|
||||
} else if (OB_FAIL(table_info_result->get_int(col_index, final_row_count))) {
|
||||
TRANS_LOG(WARN, "[ObMultiReplicaTestBase] get count(*) in new thread failed", K(ret),
|
||||
K(final_row_count));
|
||||
final_row_count = -1;
|
||||
}
|
||||
}
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] after read in new thread", K(ret), K(final_row_count),
|
||||
K(SELECT_SQL_ON_DUP_TABLE.c_str()));
|
||||
GET_TX_ID_FROM_SQL_AUDIT(static_conn, SELECT_SQL_ON_DUP_TABLE, tmp_tx_id);
|
||||
}
|
||||
|
||||
TEST_F(GET_ZONE_TEST_CLASS_NAME(1), read_from_old_leader_zone1)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
std::string tmp_event_val;
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
wait_event_finish("UPDATE_ON_NEW_LEADER", tmp_event_val, 30 * 60 * 1000, 5 * 1000));
|
||||
block_msg_ = false;
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
// refresh location cache
|
||||
{
|
||||
std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name;
|
||||
|
||||
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
|
||||
READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str());
|
||||
ASSERT_EQ(OB_SUCCESS, table_info_result->next());
|
||||
}
|
||||
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] 3 - Stop blocking msg during the read operation",
|
||||
K(ret), K(final_row_count));
|
||||
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
|
||||
static_conn = test_conn;
|
||||
WRITE_SQL_BY_CONN(test_conn, "set ob_query_timeout=20000000;");
|
||||
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] 2 - Stop blocking msg during the read operation",
|
||||
K(ret), K(final_row_count));
|
||||
std::thread read_thread(read_in_new_thread);
|
||||
std::thread::id tid = read_thread.get_id();
|
||||
uint64_t read_thread_id = *(uint64_t *)(&(tid));
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] 1 - Stop blocking msg during the read operation",
|
||||
K(ret), K(read_thread_id), K(final_row_count));
|
||||
|
||||
share::ObTenantSwitchGuard tenant_guard;
|
||||
ObTsSourceInfoGuard info_guard;
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(static_basic_arg_.tenant_id_));
|
||||
int64_t start_ts = ObTimeUtility::fast_current_time();
|
||||
while (ObTimeUtility::fast_current_time() - start_ts >= 3 * 1000 * 1000) {
|
||||
share::SCN max_commit_ts_5_min =
|
||||
share::SCN::minus(MTL(ObTransService *)->get_tx_version_mgr().get_max_commit_ts(false),
|
||||
3 * 1000 * 1000 * 1000L);
|
||||
MTL(ObTransService *)->get_tx_version_mgr().max_commit_ts_ = max_commit_ts_5_min;
|
||||
info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.gts_ =
|
||||
max_commit_ts_5_min.get_val_for_gts();
|
||||
usleep(3 * 1000);
|
||||
}
|
||||
|
||||
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] Stop blocking msg during the read operation", K(ret),
|
||||
K(read_thread_id), K(final_row_count));
|
||||
|
||||
read_thread.join();
|
||||
|
||||
ASSERT_EQ(final_row_count, 1);
|
||||
|
||||
// usleep(1000 * 1000 * 1000);
|
||||
}
|
||||
|
||||
} // namespace unittest
|
||||
|
||||
} // namespace oceanbase
|
@ -59,8 +59,12 @@ OB_NOINLINE int ObWorkerProcessor::process_err_test()
|
||||
|
||||
#ifdef ERRSIM
|
||||
ret = EN_WORKER_PROCESS_REQUEST;
|
||||
LOG_DEBUG("process err_test", K(ret));
|
||||
#endif
|
||||
|
||||
if(OB_FAIL(ret))
|
||||
{
|
||||
LOG_WARN("process err_test", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -1076,8 +1076,8 @@ bool ObDupTableLSHandler::is_dup_table_lease_valid()
|
||||
if (!is_inited() || OB_ISNULL(lease_mgr_ptr_)) {
|
||||
is_dup_lease_ls = false;
|
||||
} else if (ls_state_helper_.is_leader()) {
|
||||
is_dup_lease_ls = true;
|
||||
DUP_TABLE_LOG(INFO, "the lease is always valid for a dup ls leader", K(is_dup_lease_ls),
|
||||
is_dup_lease_ls = false;
|
||||
DUP_TABLE_LOG(INFO, "None valid lease on dup ls leader", K(is_dup_lease_ls),
|
||||
KPC(this));
|
||||
} else {
|
||||
is_dup_lease_ls = lease_mgr_ptr_->is_follower_lease_valid();
|
||||
@ -1404,6 +1404,16 @@ int ObDupTableLSHandler::switch_to_leader()
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_NOINLINE int ObDupTableLSHandler::errsim_leader_revoke_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
DUP_TABLE_LOG(WARN, "errsim leader revoke", K(ret), K(ls_id_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDupTableLSHandler::leader_revoke_(const bool is_forcedly)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1411,12 +1421,19 @@ int ObDupTableLSHandler::leader_revoke_(const bool is_forcedly)
|
||||
|
||||
bool is_logging = false;
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(errsim_leader_revoke_())) {
|
||||
DUP_TABLE_LOG(WARN, "errsim for dup table leader revoke", K(ret), K(ls_id_), K(is_forcedly));
|
||||
}
|
||||
}
|
||||
|
||||
if (is_inited_) {
|
||||
if (OB_NOT_NULL(log_operator_)) {
|
||||
log_operator_->rlock_for_log();
|
||||
is_logging = log_operator_->check_is_busy_without_lock();
|
||||
}
|
||||
if (OB_NOT_NULL(tablets_mgr_ptr_) && OB_TMP_FAIL(tablets_mgr_ptr_->leader_revoke(is_logging))) {
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(tablets_mgr_ptr_))
|
||||
if(OB_TMP_FAIL(tablets_mgr_ptr_->leader_revoke(is_logging))) {
|
||||
DUP_TABLE_LOG(WARN, "tablets_mgr switch to follower failed", K(ret), K(tmp_ret), KPC(this));
|
||||
if (!is_forcedly) {
|
||||
ret = tmp_ret;
|
||||
|
@ -264,6 +264,8 @@ private:
|
||||
|
||||
int recover_ckpt_into_memory_();
|
||||
|
||||
int errsim_leader_revoke_();
|
||||
|
||||
private:
|
||||
share::ObLSID ls_id_;
|
||||
|
||||
|
@ -1055,6 +1055,14 @@ int ObTransService::get_read_store_ctx(const ObTxReadSnapshot &snapshot,
|
||||
"ls_weak_read_ts", store_ctx.ls_->get_ls_wrs_handler()->get_ls_weak_read_ts());
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (snapshot.snapshot_ls_role_ == common::ObRole::FOLLOWER
|
||||
&& snapshot.snapshot_acquire_addr_ != GCTX.self_addr()) {
|
||||
TRANS_LOG(INFO, "get read store_ctx by a follower's max_commit_ts", K(ret), K(snapshot),
|
||||
K(ls_id), K(store_ctx));
|
||||
}
|
||||
}
|
||||
|
||||
// setup tx_table_guard
|
||||
ObTxTableGuard tx_table_guard;
|
||||
if (OB_SUCC(ret) &&
|
||||
|
Loading…
x
Reference in New Issue
Block a user