[GC] gc need wait readonly tx cleaned up

This commit is contained in:
yyy-hust
2023-07-25 14:48:36 +00:00
committed by ob-robot
parent 7b37f56021
commit 8c16a5e83c
12 changed files with 205 additions and 112 deletions

View File

@ -1445,9 +1445,10 @@ bool ObSimpleLogClusterTestEnv::is_upgraded(PalfHandleImplGuard &leader, const i
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_upgraded = (0 == degraded_learner_list.get_member_number());
const bool is_config_change_done = (LogConfigMgr::ConfigChangeState::INIT == leader.palf_handle_impl_->config_mgr_.state_);
has_upgraded = (0 == degraded_learner_list.get_member_number() && is_config_change_done);
sleep(1);
PALF_LOG(INFO, "wait upgrade");
PALF_LOG(INFO, "wait upgrade", K(palf_id));
}
return has_upgraded;
}

View File

@ -246,8 +246,8 @@ public:
void switch_append_to_flashback(PalfHandleImplGuard &leader, int64_t &mode_version);
void switch_flashback_to_append(PalfHandleImplGuard &leader, int64_t &mode_version);
void set_disk_options_for_throttling(PalfEnvImpl &palf_env_impl);
bool is_degraded(const PalfHandleImplGuard &leader, const int64_t degraded_server_idx);
bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id);
virtual bool is_degraded(const PalfHandleImplGuard &leader, const int64_t degraded_server_idx);
virtual bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id);
int wait_until_disk_space_to(const int64_t server_id, const int64_t expect_log_disk_space);
int update_server_log_disk(const int64_t log_disk_size);
public:

View File

@ -42,34 +42,6 @@ class TestObSimpleLogClusterArbService : public ObSimpleLogClusterTestEnv
public:
TestObSimpleLogClusterArbService() : ObSimpleLogClusterTestEnv()
{}
bool is_degraded(const PalfHandleImplGuard &leader,
const int64_t degraded_server_idx)
{
bool has_degraded = false;
while (!has_degraded) {
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_degraded = degraded_learner_list.contains(get_cluster()[degraded_server_idx]->get_addr());
sleep(1);
PALF_LOG(INFO, "wait degrade");
}
return has_degraded;
}
bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id)
{
bool has_upgraded = false;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
while (!has_upgraded) {
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_upgraded = (0 == degraded_learner_list.get_member_number());
sleep(1);
PALF_LOG(INFO, "wait upgrade");
}
return has_upgraded;
}
};
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;

View File

@ -48,36 +48,6 @@ public:
palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = disk_options;
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_ = disk_options;
}
bool is_degraded(const PalfHandleImplGuard &leader,
const int64_t degraded_server_idx,
const int64_t timeout_us) {
bool has_degraded = false;
int64_t begin_ts = common::ObClockGenerator::getClock();
while (!has_degraded && (common::ObClockGenerator::getClock() - begin_ts < timeout_us)) {
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_degraded = degraded_learner_list.contains(get_cluster()[degraded_server_idx]->get_addr());
sleep(1);
PALF_LOG(INFO, "wait degrade");
}
return has_degraded;
}
bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id, const int64_t timeout_us)
{
bool has_upgraded = false;
int64_t begin_ts = common::ObClockGenerator::getClock();
while (!has_upgraded && (common::ObClockGenerator::getClock() - begin_ts < timeout_us) ) {
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_upgraded = (0 == degraded_learner_list.get_member_number());
if (!has_upgraded) {
usleep(200 * 1000);
}
PALF_LOG(INFO, "wait upgrade", K(palf_id), K(has_upgraded));
}
return has_upgraded;
}
};
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
@ -135,7 +105,6 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_major)
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
const int64_t arb_timeout = 5 * 1000 * 1000;
PALF_LOG(INFO, "[CASE 1.1]: MAJOR degrade");
block_net(leader_idx, another_f_idx);
sleep(1);
@ -145,7 +114,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_major)
palf_list[another_f_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
int64_t begin_ts = common::ObClockGenerator::getClock();
EXPECT_TRUE(is_degraded(leader, another_f_idx, arb_timeout));
EXPECT_TRUE(is_degraded(leader, another_f_idx));
int64_t end_ts = common::ObClockGenerator::getClock();
int64_t used_time = end_ts - begin_ts;
ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
@ -166,7 +135,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_major)
unblock_net(leader_idx, another_f_idx);
begin_ts = common::ObClockGenerator::getClock();
PALF_LOG(INFO, "[CASE 1.2] begin MAJOR upgrade", K(used_time));
ASSERT_TRUE(is_upgraded(leader, id, arb_timeout));
ASSERT_TRUE(is_upgraded(leader, id));
end_ts = common::ObClockGenerator::getClock();
used_time = end_ts - begin_ts;
PALF_LOG(INFO, "[CASE 1.2] end MAJOR upgrade", K(used_time));
@ -264,7 +233,6 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_leader)
ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
const int64_t arb_timeout = 5 * 1000 * 1000;
PALF_LOG(INFO, "[CASE 2.1]: MONOR_LEADER degrade");
block_net(leader_idx, another_f_idx);
sleep(1);
@ -273,7 +241,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_leader)
leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
int64_t begin_ts = common::ObClockGenerator::getClock();
EXPECT_TRUE(is_degraded(leader, another_f_idx, arb_timeout));
EXPECT_TRUE(is_degraded(leader, another_f_idx));
int64_t end_ts = common::ObClockGenerator::getClock();
int64_t used_time = end_ts - begin_ts;
ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
@ -294,7 +262,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_leader)
unblock_net(leader_idx, another_f_idx);
begin_ts = common::ObClockGenerator::getClock();
PALF_LOG(INFO, "[CASE 2.2] begin MINOR_LEADER upgrade", K(used_time));
ASSERT_TRUE(is_upgraded(leader, id, arb_timeout));
ASSERT_TRUE(is_upgraded(leader, id));
end_ts = common::ObClockGenerator::getClock();
used_time = end_ts - begin_ts;
PALF_LOG(INFO, "[CASE 2.2] end MINOR_LEADER upgrade", K(used_time));
@ -307,11 +275,13 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_leader)
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
LogConfigVersion config_version;
const int64_t CONFIG_CHANGE_TIMEOUT_NEW = 20 * 1000 * 1000L; // 10s
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->replace_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1),
ObMember(get_cluster()[another_f_idx]->get_addr(), 1),
config_version,
CONFIG_CHANGE_TIMEOUT));
CONFIG_CHANGE_TIMEOUT_NEW));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
PALF_LOG(INFO, "[CASE 2.4]: MINOR_LEADER switch_leader");
@ -389,7 +359,6 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_follower)
ASSERT_EQ(OB_SUCCESS, get_palf_env(another_f_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
const int64_t arb_timeout = 5 * 1000 * 1000;
PALF_LOG(INFO, "[CASE 3.1]: MONOR_LEADER degrade");
block_net(leader_idx, another_f_idx);
sleep(1);
@ -400,7 +369,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_follower)
palf_list[follower_D_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128 * KB));
int64_t begin_ts = common::ObClockGenerator::getClock();
EXPECT_TRUE(is_degraded(leader, another_f_idx, arb_timeout));
EXPECT_TRUE(is_degraded(leader, another_f_idx));
int64_t end_ts = common::ObClockGenerator::getClock();
int64_t used_time = end_ts - begin_ts;
ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
@ -422,11 +391,11 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_follower)
unblock_net(leader_idx, another_f_idx);
begin_ts = common::ObClockGenerator::getClock();
PALF_LOG(INFO, "[CASE 3.2] begin MINOR_FOLLOWER upgrade", K(used_time));
ASSERT_TRUE(is_upgraded(leader, id, arb_timeout));
ASSERT_TRUE(is_upgraded(leader, id));
end_ts = common::ObClockGenerator::getClock();
used_time = end_ts - begin_ts;
PALF_LOG(INFO, "[CASE 3.2] end MINOR_FOLLOWER upgrade", K(used_time));
ASSERT_TRUE(used_time < 2 * 1000 * 1000L);
ASSERT_TRUE(used_time < 3 * 1000 * 1000L);
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
PALF_LOG(INFO, "[CASE 3.3]: MINOR_FOLLOWER replace_member");
@ -532,7 +501,6 @@ TEST_F(TestObSimpleLogThrottleArb, test_4f1a_degrade_upgrade)
LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, leader);
const int64_t arb_timeout = 5 * 1000 * 1000;
int64_t throttling_percentage = 60;
ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
@ -548,8 +516,8 @@ TEST_F(TestObSimpleLogThrottleArb, test_4f1a_degrade_upgrade)
palf_list[another_f3_idx]->palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 256 * KB));
usleep(10 * 1000);
ASSERT_TRUE(is_degraded(leader, another_f2_idx, arb_timeout));
ASSERT_TRUE(is_degraded(leader, another_f3_idx, arb_timeout));
ASSERT_TRUE(is_degraded(leader, another_f2_idx));
ASSERT_TRUE(is_degraded(leader, another_f3_idx));
int64_t end_ts = common::ObClockGenerator::getClock();
int64_t used_time = end_ts - begin_ts;
PALF_LOG(INFO, " [CASE] 4f1a degrade", K(used_time));
@ -561,7 +529,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_4f1a_degrade_upgrade)
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 256));
PALF_LOG(INFO, " [CASE] 4f1a before upgrade", K(used_time));
ASSERT_TRUE(is_upgraded(leader, id, arb_timeout));
ASSERT_TRUE(is_upgraded(leader, id));
PALF_LOG(INFO, " [CASE] end upgrade", K(used_time));
revert_cluster_palf_handle_guard(palf_list);

View File

@ -354,7 +354,8 @@ ObGCHandler::ObGCHandler() : is_inited_(false),
rwlock_(common::ObLatchIds::GC_HANDLER_LOCK),
ls_(NULL),
gc_seq_invalid_member_(-1),
gc_start_ts_(OB_INVALID_TIMESTAMP)
gc_start_ts_(OB_INVALID_TIMESTAMP),
block_tx_ts_(OB_INVALID_TIMESTAMP)
{
}
@ -369,6 +370,7 @@ void ObGCHandler::reset()
gc_seq_invalid_member_ = -1;
ls_ = NULL;
gc_start_ts_ = OB_INVALID_TIMESTAMP;
block_tx_ts_ = OB_INVALID_TIMESTAMP;
is_inited_ = false;
}
@ -389,6 +391,67 @@ int ObGCHandler::init(ObLS *ls)
return ret;
}
int ObGCHandler::execute_pre_remove()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "GC handler not init");
} else {
WLockGuard wlock_guard(rwlock_);
int64_t ls_id = ls_->get_ls_id().id();
bool is_tenant_dropping_or_dropped = false;
bool need_check_readonly_tx = true;
const uint64_t tenant_id = MTL_ID();
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = check_if_tenant_is_dropping_or_dropped_(tenant_id, is_tenant_dropping_or_dropped))) {
CLOG_LOG(WARN, "check_if_tenant_has_been_dropped_ failed", K(tmp_ret), K(tenant_id), K(ls_id));
} else if (is_tenant_dropping_or_dropped) {
need_check_readonly_tx = false;
CLOG_LOG(INFO, "tenant is dropping or dropped, no longer need to check read_only tx", K(ls_id), K(tenant_id));
}
if (OB_SUCC(ret) && need_check_readonly_tx) {
//follower or not in member list replica need block_tx here
if (OB_INVALID_TIMESTAMP == block_tx_ts_) {
if (OB_FAIL(ls_->block_tx_start())) {
CLOG_LOG(WARN, "failed to block_tx_start", K(ls_id), KPC(this));
} else {
block_tx_ts_ = ObClockGenerator::getClock();
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ls_->check_all_readonly_tx_clean_up())) {
if (OB_EAGAIN == ret) {
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (! tenant_config.is_valid()) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "tenant_config is not valid", K(ret), K(tenant_id));
} else {
const int64_t ls_gc_wait_readonly_tx_time = tenant_config->_ls_gc_wait_readonly_tx_time;
const int64_t cur_time = ObClockGenerator::getClock();
if (block_tx_ts_ + ls_gc_wait_readonly_tx_time < cur_time) {
CLOG_LOG(WARN, "Attention!!! Wait enough time before readonly tx been cleaned up", K(ls_id), KPC(this));
ret = OB_SUCCESS;
} else {
CLOG_LOG(WARN, "[WAIT_REASEON]need wait before readonly tx been cleaned up", K(ls_id), KPC(this));
}
}
} else {
CLOG_LOG(WARN, "check_all_readonly_tx_clean_up failed", K(ls_id), K(ret));
}
} else {
CLOG_LOG(INFO, "check_all_readonly_tx_clean_up success", K(ls_id), K(ret));
}
}
}
}
return ret;
}
void ObGCHandler::execute_pre_gc_process(ObGarbageCollector::LSStatus &ls_status)
{
switch (ls_status)
@ -409,6 +472,8 @@ void ObGCHandler::execute_pre_gc_process(ObGarbageCollector::LSStatus &ls_status
int ObGCHandler::check_ls_can_offline(const share::ObLSStatus &ls_status)
{
//the inspection should be performed by leader,and get_gc_state should be invoked before get_palf_role
//to guarantee correctness
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -416,12 +481,18 @@ int ObGCHandler::check_ls_can_offline(const share::ObLSStatus &ls_status)
} else {
RLockGuard rlock_guard(rwlock_);
ObLSID ls_id = ls_->get_ls_id();
ObRole role;
LSGCState gc_state = INVALID_LS_GC_STATE;
if (OB_FAIL(ls_->get_gc_state(gc_state))) {
CLOG_LOG(WARN, "get_gc_state failed", K(ls_id), K(gc_state));
} else if (!is_valid_ls_gc_state(gc_state)) {
ret = OB_STATE_NOT_MATCH;
CLOG_LOG(WARN, "ls check gc state invalid", K(ls_id), K(gc_state));
} else if (OB_FAIL(get_palf_role_(role))) {
CLOG_LOG(WARN, "get_palf_role_ failed", K(ls_id));
} else if (ObRole::LEADER != role) {
ret = OB_STATE_NOT_MATCH;
CLOG_LOG(WARN, "follower can not advance gc state", K(ls_id), K(gc_state));
} else if (is_ls_offline_finished_(gc_state)) {
CLOG_LOG(INFO, "ls check_ls_can_offline success", K(ls_id), K(gc_state));
} else if (is_ls_blocked_state_(gc_state)) {
@ -817,7 +888,7 @@ int ObGCHandler::check_if_tenant_in_archive_(bool &in_archive)
return MTL(ObArchiveService*)->check_tenant_in_archive(in_archive);
}
void ObGCHandler::submit_log_(const ObGCLSLOGType log_type)
int ObGCHandler::submit_log_(const ObGCLSLOGType log_type, bool &is_success)
{
int ret = OB_SUCCESS;
ObGCLSLog gc_log(log_type);
@ -826,6 +897,7 @@ void ObGCHandler::submit_log_(const ObGCLSLOGType log_type)
int64_t buffer_size = gc_log.get_serialize_size();
ObGCLSLogCb cb;
const bool need_nonblock = false;
is_success = false;
SCN ref_scn;
palf::LSN lsn;
SCN scn;
@ -857,6 +929,7 @@ void ObGCHandler::submit_log_(const ObGCLSLOGType log_type)
if (cb.is_succeed()) {
(void)update_ls_gc_state_after_submit_log_(log_type, scn);
is_finished = true;
is_success = true;
CLOG_LOG(INFO, "write GC ls log success", K(ret), K(log_type));
} else if (cb.is_failed()) {
is_finished = true;
@ -874,6 +947,7 @@ void ObGCHandler::submit_log_(const ObGCLSLOGType log_type)
mtl_free(buffer);
buffer = nullptr;
}
return ret;
}
void ObGCHandler::update_ls_gc_state_after_submit_log_(const ObGCLSLOGType log_type,
@ -908,6 +982,7 @@ void ObGCHandler::block_ls_transfer_in_(const SCN &block_scn)
//TODO: @keqing.llt transfer功能完成之前,先用杀事务代替transfer out
} else if (OB_FAIL(ls_->block_tx_start())) {
CLOG_LOG(WARN, "block_tx_start failed", K(ls_id), K(ret));
} else if (FALSE_IT(block_tx_ts_ = ObClockGenerator::getClock())) {
} else if (OB_FAIL(ls_->set_gc_state(LSGCState::LS_BLOCKED))) {
CLOG_LOG(WARN, "set_gc_state block failed", K(ls_id), K(ret));
} else {
@ -966,6 +1041,7 @@ void ObGCHandler::handle_gc_ls_dropping_(const ObGarbageCollector::LSStatus &ls_
CLOG_LOG(WARN, "GC handler not init");
} else {
WLockGuard wlock_guard(rwlock_);
bool is_success = false;
ObRole role;
ObLSID ls_id = ls_->get_ls_id();
LSGCState gc_state = INVALID_LS_GC_STATE;
@ -983,13 +1059,21 @@ void ObGCHandler::handle_gc_ls_dropping_(const ObGarbageCollector::LSStatus &ls_
} else if (!is_valid_ls_gc_state(gc_state)) {
CLOG_LOG(WARN, "ls check gc state invalid", K(ls_id), K(gc_state));
} else if (is_ls_offline_finished_(gc_state)) {
(void)set_block_tx_if_necessary_();
CLOG_LOG(INFO, "handle_gc_ls_dropping already finished", K(ls_id), K(gc_state));
} else if (is_ls_blocked_state_(gc_state)) {
(void)set_block_tx_if_necessary_();
// trigger kill all tx
(void)is_tablet_clear_(ls_status);
} else {
(void)submit_log_(ObGCLSLOGType::BLOCK_TABLET_TRANSFER_IN);
(void)is_tablet_clear_(ls_status);
if (OB_FAIL(submit_log_(ObGCLSLOGType::BLOCK_TABLET_TRANSFER_IN, is_success))) {
CLOG_LOG(WARN, "failed to submit BLOCK_TABLET_TRANSFER_IN log", K(ls_id), K(gc_state));
} else if (is_success) {
(void)is_tablet_clear_(ls_status);
CLOG_LOG(INFO, "BLOCK_TABLET_TRANSFER_IN log has callback on_success", K(ls_id), K(gc_state));
} else {
CLOG_LOG(WARN, "BLOCK_TABLET_TRANSFER_IN log has not callback on_success", K(ls_id), K(gc_state));
}
}
CLOG_LOG(INFO, "ls handle_gc_ls_dropping_ finished", K(ls_id), K(role), K(gc_state));
}
@ -1011,6 +1095,8 @@ void ObGCHandler::handle_gc_ls_offline_(ObGarbageCollector::LSStatus &ls_status)
gc_start_ts_ = ObTimeUtility::current_time();
}
bool is_success = false;
(void)set_block_tx_if_necessary_();
if (OB_FAIL(get_palf_role_(role))) {
CLOG_LOG(WARN, "get_palf_role_ failed", K(ls_id));
} else if (ObRole::LEADER != role) {
@ -1028,10 +1114,16 @@ void ObGCHandler::handle_gc_ls_offline_(ObGarbageCollector::LSStatus &ls_status)
} else if (is_ls_offline_state_(gc_state)) {
(void)try_check_and_set_wait_gc_(ls_status);
} else {
(void)submit_log_(ObGCLSLOGType::OFFLINE_LS);
(void)try_check_and_set_wait_gc_(ls_status);
if (OB_FAIL(submit_log_(ObGCLSLOGType::OFFLINE_LS, is_success))) {
CLOG_LOG(WARN, "failed to submit OFFLINE_LS log", K(ls_id), K(gc_state));
} else if (is_success) {
CLOG_LOG(INFO, "OFFLINE_LS has callback on_success", K(ls_id), K(gc_state));
(void)try_check_and_set_wait_gc_(ls_status);
} else {
CLOG_LOG(WARN, "OFFLINE_LS has not callback on_success", K(ls_id), K(gc_state));
}
}
CLOG_LOG(INFO, "ls handle_gc_ls_offline finished", K(ls_id), K(role), K(gc_state));
CLOG_LOG(INFO, "ls handle_gc_ls_offline finished", K(ls_id), K(role), K(gc_state), K(is_success));
}
}
@ -1051,6 +1143,14 @@ int ObGCHandler::diagnose(GCDiagnoseInfo &diagnose_info) const
return ret;
}
void ObGCHandler::set_block_tx_if_necessary_()
{
//for restart or switch_leader, block_tx_ts_ in memory may be cleaned
if (OB_INVALID_TIMESTAMP == block_tx_ts_) {
block_tx_ts_ = ObClockGenerator::getClock();
}
}
//---------------ObGarbageCollector---------------//
void ObGarbageCollector::GCCandidate::set_ls_status(const share::ObLSStatus &ls_status)
{
@ -1187,25 +1287,27 @@ void ObGarbageCollector::run1()
CLOG_LOG(INFO, "Garbage Collector start to run");
lib::set_thread_name("GCCollector");
const int64_t gc_interval = GC_INTERVAL;
while (!has_set_stop()) {
if (!ObServerCheckpointSlogHandler::get_instance().is_started()) {
// tablets are not ready for read
usleep(5000 * 1000); // 5s
} else if (!stop_create_new_gc_task_) {
ObGCCandidateArray gc_candidates;
int64_t gc_interval = GC_INTERVAL;
CLOG_LOG(INFO, "Garbage Collector is running", K(seq_), K(gc_interval));
gc_candidates.reset();
(void)gc_check_member_list_(gc_candidates);
(void)execute_gc_(gc_candidates);
gc_candidates.reset();
(void)gc_check_ls_status_(gc_candidates);
(void)execute_gc_(gc_candidates);
ob_usleep(gc_interval);
seq_++;
if (ObServerCheckpointSlogHandler::get_instance().is_started()) {
if (!stop_create_new_gc_task_) {
CLOG_LOG(INFO, "Garbage Collector is running", K(seq_), K(gc_interval));
ObGCCandidateArray gc_candidates;
gc_candidates.reset();
(void)gc_check_member_list_(gc_candidates);
(void)execute_gc_(gc_candidates);
gc_candidates.reset();
(void)gc_check_ls_status_(gc_candidates);
(void)execute_gc_(gc_candidates);
seq_++;
}
// safe destroy task
(void) safe_destroy_handler_.handle();
} else {
CLOG_LOG(INFO, "Garbage Collector is not running, waiting for ObServerCheckpointSlogHandler",
K(seq_), K(gc_interval));
}
// safe destroy task
(void) safe_destroy_handler_.handle();
ob_usleep(gc_interval);
}
}
@ -1507,18 +1609,20 @@ void ObGarbageCollector::execute_gc_(ObGCCandidateArray &gc_candidates)
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
tmp_ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "ls not exist", K(tmp_ret), K(id));
} else if (OB_ISNULL(gc_handler = ls->get_gc_handler())) {
tmp_ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "gc_handler is NULL", K(tmp_ret), K(id));
} else if (is_need_gc_ls_status_(ls_status)) {
ObSwitchLeaderAdapter switch_leader_adapter;
if (OB_FAIL(switch_leader_adapter.remove_from_election_blacklist(id.id(), self_addr_))) {
CLOG_LOG(WARN, "remove_from_election_blacklist failed", K(ret), K(id), K_(self_addr));
if (OB_SUCCESS != (tmp_ret = (gc_handler->execute_pre_remove()))) {
CLOG_LOG(WARN, "failed to execute_pre_remove", K(tmp_ret), K(id), K_(self_addr));
} else if (OB_SUCCESS != (tmp_ret = switch_leader_adapter.remove_from_election_blacklist(id.id(), self_addr_))) {
CLOG_LOG(WARN, "remove_from_election_blacklist failed", K(tmp_ret), K(id), K_(self_addr));
} else if (OB_SUCCESS != (tmp_ret = ls_service_->remove_ls(id, false))) {
CLOG_LOG(WARN, "remove_ls failed", K(tmp_ret), K(id));
} else {
CLOG_LOG(INFO, "remove_ls success", K(id), K(gc_reason));
}
} else if (OB_ISNULL(gc_handler = ls->get_gc_handler())) {
tmp_ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "gc_handler is NULL", K(tmp_ret), K(id));
} else {
CLOG_LOG(INFO, "begin execute_pre_gc_process", K(id), K(ls_status));
(void)gc_handler->execute_pre_gc_process(ls_status);

View File

@ -102,11 +102,14 @@ struct GCDiagnoseInfo
~GCDiagnoseInfo() { reset(); }
LSGCState gc_state_;
int64_t gc_start_ts_;
int64_t block_tx_ts_;
TO_STRING_KV(K(gc_state_),
K(gc_start_ts_));
K(gc_start_ts_),
K(block_tx_ts_));
void reset() {
gc_state_ = LSGCState::INVALID_LS_GC_STATE;
gc_start_ts_ = OB_INVALID_TIMESTAMP;
block_tx_ts_ = OB_INVALID_TIMESTAMP;
}
};
@ -244,9 +247,11 @@ public:
int init(storage::ObLS *ls);
void reset();
void execute_pre_gc_process(ObGarbageCollector::LSStatus &ls_status);
int execute_pre_remove();
int check_ls_can_offline(const share::ObLSStatus &ls_status);
int gc_check_invalid_member_seq(const int64_t gc_seq, bool &need_gc);
static bool is_valid_ls_gc_state(const LSGCState &state);
int diagnose(GCDiagnoseInfo &diagnose_info) const;
// for replay
@ -266,7 +271,9 @@ public:
virtual int flush(share::SCN &scn) override;
TO_STRING_KV(K(is_inited_),
K(gc_seq_invalid_member_));
K(gc_seq_invalid_member_),
K(gc_start_ts_),
K(block_tx_ts_));
private:
typedef common::SpinRWLock RWLock;
@ -302,6 +309,7 @@ private:
};
private:
const int64_t MAX_WAIT_TIME_US_FOR_READONLY_TX = 10 * 60 * 1000 * 1000L;//10 min
const int64_t LS_CLOG_ALIVE_TIMEOUT_US = 100 * 1000; //100ms
const int64_t GET_GTS_TIMEOUT_US = 10L * 1000 * 1000; //10s
int get_gts_(const int64_t timeout_us, share::SCN &gts_scn);
@ -321,7 +329,7 @@ private:
bool &is_tenant_dropping_or_dropped);
int get_tenant_readable_scn_(share::SCN &readable_scn);
int check_if_tenant_in_archive_(bool &in_archive);
void submit_log_(const ObGCLSLOGType log_type);
int submit_log_(const ObGCLSLOGType log_type, bool &is_success);
void update_ls_gc_state_after_submit_log_(const ObGCLSLOGType log_type,
const share::SCN &scn);
void block_ls_transfer_in_(const share::SCN &block_scn);
@ -329,12 +337,14 @@ private:
int get_palf_role_(common::ObRole &role);
void handle_gc_ls_dropping_(const ObGarbageCollector::LSStatus &ls_status);
void handle_gc_ls_offline_(ObGarbageCollector::LSStatus &ls_status);
void set_block_tx_if_necessary_();
private:
bool is_inited_;
RWLock rwlock_; //for leader revoke/takeover submit log
storage::ObLS *ls_;
int64_t gc_seq_invalid_member_; //缓存gc检查当前ls不在成员列表时的轮次
int64_t gc_start_ts_;
int64_t block_tx_ts_;
};
} // namespace logservice

View File

@ -428,7 +428,6 @@ int ObLSRecoveryReportor::update_replayable_point_from_tenant_info_()
int ObLSRecoveryReportor::update_replayable_point_from_meta_()
{
int ret = OB_SUCCESS;
SCN replayable_point;
ObLSIterator *iter = NULL;
common::ObSharedGuard<ObLSIterator> guard;
ObLSService *ls_svr = MTL(ObLSService *);
@ -444,6 +443,7 @@ int ObLSRecoveryReportor::update_replayable_point_from_meta_()
ObLS *ls = nullptr;
SCN max_replayable_point;
while (OB_SUCC(iter->get_next(ls))) {
SCN replayable_point;
if (OB_ISNULL(ls)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls is null", KR(ret), KP(ls));
@ -455,8 +455,8 @@ int ObLSRecoveryReportor::update_replayable_point_from_meta_()
}
if (OB_ITER_END == ret) {
logservice::ObLogService *log_service = MTL(logservice::ObLogService*);
if (OB_FAIL(log_service->update_replayable_point(replayable_point))) {
LOG_WARN("logservice update_replayable_point failed", KR(ret), K(replayable_point));
if (OB_FAIL(log_service->update_replayable_point(max_replayable_point))) {
LOG_WARN("logservice update_replayable_point failed", KR(ret), K(max_replayable_point));
} else {
// do nothing
}

View File

@ -589,6 +589,11 @@ DEF_INT(_log_writer_parallelism, OB_TENANT_PARAMETER, "3",
"the number of parallel log writer threads that can be used to write redo log entries to disk. ",
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE));
DEF_TIME(_ls_gc_wait_readonly_tx_time, OB_TENANT_PARAMETER, "24h",
"[0s,)",
"The maximum waiting time for residual read-only transaction before executing log stream garbage collecting。The default value is 24h. Range: [0s, +∞)."
"Log stream garbage collecting will no longer wait for readonly transaction when the tenant is dropped. ",
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_STR(standby_db_preferred_upstream_log_region, OB_TENANT_PARAMETER, "",
"The preferred upstream log region for Standby db. "
"The Standby db will give priority to the preferred upstream log region to fetch log. "

View File

@ -697,6 +697,11 @@ public:
// @return other, there is something wrong or there is some tx not cleaned up.
// int check_all_tx_clean_up() const;
CONST_DELEGATE_WITH_RET(ls_tx_svr_, check_all_tx_clean_up, int);
// check whether all readonly tx of this ls is cleaned up.
// @return OB_SUCCESS, all the readonly tx of this ls cleaned up
// @return other, there is something wrong or there is some readonly tx not cleaned up.
// int check_all_readonly_tx_clean_up() const;
CONST_DELEGATE_WITH_RET(ls_tx_svr_, check_all_readonly_tx_clean_up, int);
// block new tx in for ls.
// @return OB_SUCCESS, ls is blocked
// @return other, there is something wrong.

View File

@ -228,6 +228,30 @@ int ObLSTxService::check_all_tx_clean_up() const
return ret;
}
ERRSIM_POINT_DEF(EN_GC_CHECK_RD_TX);
int ObLSTxService::check_all_readonly_tx_clean_up() const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(mgr_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", KR(ret), K_(ls_id));
} else if (mgr_->get_total_active_readonly_request_count() > 0) {
ret = OB_EAGAIN;
} else {
TRANS_LOG(INFO, "wait_all_readonly_tx_cleaned_up cleaned up success", K_(ls_id));
}
#ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = EN_GC_CHECK_RD_TX ? : OB_SUCCESS;
if (OB_FAIL(ret)) {
TRANS_LOG(INFO, "fake EN_GC_CHECK_RD_TX", K(ret));
}
}
#endif
return ret;
}
int ObLSTxService::block_tx()
{
int ret = OB_SUCCESS;

View File

@ -113,6 +113,9 @@ public:
// @return OB_SUCCESS, all the tx of this ls cleaned up
// @return other, there is something wrong or there is some tx not cleaned up.
int check_all_tx_clean_up() const;
// @return OB_SUCCESS, all the readonly_tx of this ls cleaned up
// @return other, there is something wrong or there is some readonly tx not cleaned up.
int check_all_readonly_tx_clean_up() const;
int block_tx();
int kill_all_tx(const bool graceful);
// for ddl check

View File

@ -293,6 +293,7 @@ _io_callback_thread_count
_lcl_op_interval
_load_tde_encrypt_engine
_log_writer_parallelism
_ls_gc_wait_readonly_tx_time
_ls_migration_wait_completing_timeout
_max_elr_dependent_trx_count
_max_malloc_sample_interval