add freezer source protection code
This commit is contained in:
parent
7ecea64efe
commit
839e3466c3
11
mittest/env/ob_simple_server_helper.cpp
vendored
11
mittest/env/ob_simple_server_helper.cpp
vendored
@ -302,7 +302,10 @@ int SimpleServerHelper::wait_checkpoint_newest(uint64_t tenant_id, ObLSID ls_id)
|
||||
} else {
|
||||
SCN checkpoint_scn;
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ls_handle.get_ls()->advance_checkpoint_by_flush(SCN::max_scn()))) {
|
||||
if (OB_FAIL(ls_handle.get_ls()->advance_checkpoint_by_flush(SCN::max_scn(),
|
||||
INT64_MAX,
|
||||
false,
|
||||
ObFreezeSourceFlag::TEST_MODE))) {
|
||||
} else if (FALSE_IT(checkpoint_scn = ls_handle.get_ls()->get_ls_meta().get_clog_checkpoint_scn())) {
|
||||
} else if (checkpoint_scn < end_scn) {
|
||||
LOG_INFO("wait ls checkpoint advance", K(tenant_id), K(ls_id), K(checkpoint_scn), K(end_scn));
|
||||
@ -327,7 +330,11 @@ int SimpleServerHelper::freeze(uint64_t tenant_id, ObLSID ls_id, ObTabletID tabl
|
||||
const bool is_sync = true;
|
||||
const bool abs_timeout_ts = ObClockGenerator::getClock() + 60LL * 1000LL * 1000LL;
|
||||
if (OB_FAIL(MTL(ObLSService*)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->tablet_freeze(tablet_id, is_sync, abs_timeout_ts, need_rewrite_meta))) {
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->tablet_freeze(tablet_id,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
need_rewrite_meta,
|
||||
ObFreezeSourceFlag::TEST_MODE))) {
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -284,7 +284,11 @@ TEST_F(TestCheckpointExecutor, calculate_checkpoint)
|
||||
ASSERT_EQ(tmp, ls2->get_ls_meta().get_clog_checkpoint_scn());
|
||||
|
||||
tmp.val_ = 12;
|
||||
ASSERT_EQ(OB_SUCCESS, checkpoint_executor2->advance_checkpoint_by_flush(tmp));
|
||||
ASSERT_EQ(OB_SUCCESS, checkpoint_executor2->advance_checkpoint_by_flush(
|
||||
tmp,
|
||||
INT64_MAX,
|
||||
false,
|
||||
ObFreezeSourceFlag::TEST_MODE));
|
||||
ASSERT_EQ(OB_SUCCESS, ls2->get_data_checkpoint()->flush(share::SCN::max_scn(), false));
|
||||
usleep(60L * 1000L); // 60ms
|
||||
checkpoint_executor2->offline();
|
||||
|
@ -262,14 +262,6 @@ TEST_F(TestDataCheckpoint, ls_freeze)
|
||||
|
||||
tmp.val_ = 2;
|
||||
ASSERT_EQ(OB_SUCCESS, checkpoint_executor->advance_checkpoint_by_flush(tmp));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->get_data_checkpoint()->flush(share::SCN::max_scn(), false));
|
||||
usleep(60L * 1000L); // 60ms
|
||||
ASSERT_EQ(0, data_checkpoint->new_create_list_.checkpoint_list_.get_size());
|
||||
ASSERT_EQ(1, data_checkpoint->active_list_.checkpoint_list_.get_size());
|
||||
ASSERT_EQ(2, data_checkpoint->prepare_list_.checkpoint_list_.get_size());
|
||||
ASSERT_EQ(true, data_checkpoint->ls_freeze_finished());
|
||||
|
||||
tmp.val_ = 4;
|
||||
ASSERT_EQ(OB_SUCCESS, checkpoint_executor->advance_checkpoint_by_flush(tmp));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->get_data_checkpoint()->flush(share::SCN::max_scn(), false));
|
||||
usleep(60L * 1000L); // 60ms
|
||||
|
@ -318,18 +318,18 @@ namespace unittest
|
||||
K(ObTimeUtility::fast_current_time() - start_time)); \
|
||||
}
|
||||
|
||||
#define MINOR_FREEZE_LS(ls) \
|
||||
{ \
|
||||
TRANS_LOG(INFO, "minor freeze ls begin"); \
|
||||
share::ObTenantSwitchGuard tenant_guard; \
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(ls->get_tenant_id())); \
|
||||
oceanbase::storage::checkpoint::ObCheckpointExecutor *checkpoint_executor = \
|
||||
ls->get_checkpoint_executor(); \
|
||||
ASSERT_NE(nullptr, checkpoint_executor); \
|
||||
ASSERT_EQ(OB_SUCCESS, \
|
||||
checkpoint_executor->advance_checkpoint_by_flush(share::SCN::max_scn())); \
|
||||
TRANS_LOG(INFO, "minor freeze ls end"); \
|
||||
}
|
||||
#define MINOR_FREEZE_LS(ls) \
|
||||
{ \
|
||||
TRANS_LOG(INFO, "minor freeze ls begin"); \
|
||||
share::ObTenantSwitchGuard tenant_guard; \
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(ls->get_tenant_id())); \
|
||||
oceanbase::storage::checkpoint::ObCheckpointExecutor *checkpoint_executor = \
|
||||
ls->get_checkpoint_executor(); \
|
||||
ASSERT_NE(nullptr, checkpoint_executor); \
|
||||
ASSERT_EQ(OB_SUCCESS, \
|
||||
checkpoint_executor->advance_checkpoint_by_flush(share::SCN::max_scn())); \
|
||||
TRANS_LOG(INFO, "minor freeze ls end"); \
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
class EventArgSerTool
|
||||
|
@ -130,7 +130,11 @@ void TestFrequentlyFreeze::async_tablet_freeze(const int64_t idx)
|
||||
const bool is_sync = false;
|
||||
ObTabletID tablet_to_freeze(int_tablet_id_to_freeze);
|
||||
STORAGE_LOG(INFO, "start tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(tablet_to_freeze, is_sync));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(tablet_to_freeze,
|
||||
is_sync,
|
||||
0,
|
||||
false,
|
||||
ObFreezeSourceFlag::TEST_MODE));
|
||||
usleep(100 * 1000);
|
||||
STORAGE_LOG(INFO, "finish tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
|
||||
|
||||
@ -152,7 +156,11 @@ void TestFrequentlyFreeze::sync_tablet_freeze(const int64_t idx)
|
||||
const bool is_sync = true;
|
||||
const int64_t abs_timeout_ts = ObClockGenerator::getClock() + 600LL * 1000LL * 1000LL;
|
||||
STORAGE_LOG(INFO, "start tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(tablet_to_freeze, is_sync, abs_timeout_ts));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(tablet_to_freeze,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
false,
|
||||
ObFreezeSourceFlag::TEST_MODE));
|
||||
::sleep(2);
|
||||
tablet_to_freeze = ObTabletID(tablet_to_freeze.id() + TABLET_FREEZE_THREAD_COUNT);
|
||||
STORAGE_LOG(INFO, "finish tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
|
||||
@ -171,7 +179,10 @@ void TestFrequentlyFreeze::ls_freeze(const bool is_sync)
|
||||
STORAGE_LOG(INFO, "start ls freeze", K(is_sync));
|
||||
const int64_t abs_timeout_ts = ObClockGenerator::getClock() + 600LL * 1000LL * 1000LL;
|
||||
for (int i = 0; i < 4; i++) {
|
||||
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(-1, is_sync, abs_timeout_ts));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(-1,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
ObFreezeSourceFlag::TEST_MODE));
|
||||
sleep(200);
|
||||
}
|
||||
fprintf(stdout, "ls freeze finish. is_sync = %d\n", is_sync);
|
||||
|
@ -334,7 +334,10 @@ void ObMinorFreezeTest::logstream_freeze()
|
||||
for (int j = 0; j < OB_DEFAULT_TABLE_COUNT; ++j) {
|
||||
int ret = OB_EAGAIN;
|
||||
while (OB_EAGAIN == ret) {
|
||||
ret = ls_handles_.at(j).get_ls()->logstream_freeze(checkpoint::INVALID_TRACE_ID, true/*is_sync*/, 0);
|
||||
ret = ls_handles_.at(j).get_ls()->logstream_freeze(checkpoint::INVALID_TRACE_ID,
|
||||
true/*is_sync*/,
|
||||
0,
|
||||
ObFreezeSourceFlag::TEST_MODE);
|
||||
|
||||
if (OB_EAGAIN == ret) {
|
||||
ob_usleep(rand() % SLEEP_TIME);
|
||||
@ -359,7 +362,12 @@ void ObMinorFreezeTest::tablet_freeze()
|
||||
for (int j = 0; j < OB_DEFAULT_TABLE_COUNT; ++j) {
|
||||
int ret = OB_EAGAIN;
|
||||
while (OB_EAGAIN == ret) {
|
||||
ret = ls_handles_.at(j).get_ls()->tablet_freeze(tablet_ids_.at(j), false, (i % 2 == 0) ? true : false, 0);
|
||||
bool need_rewrite_tablet_meta = (i % 2 == 0) ? true : false;
|
||||
ret = ls_handles_.at(j).get_ls()->tablet_freeze(tablet_ids_.at(j),
|
||||
need_rewrite_tablet_meta,
|
||||
0,
|
||||
need_rewrite_tablet_meta,
|
||||
ObFreezeSourceFlag::TEST_MODE);
|
||||
if (OB_EAGAIN == ret) {
|
||||
ob_usleep(rand() % SLEEP_TIME);
|
||||
}
|
||||
@ -380,8 +388,14 @@ void ObMinorFreezeTest::batch_tablet_freeze()
|
||||
|
||||
const int64_t start = ObTimeUtility::current_time();
|
||||
while (ObTimeUtility::current_time() - start <= freeze_duration_) {
|
||||
bool need_rewrite_tablet_meta = (i % 2 == 0) ? true : false;
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
ls_handles_.at(0).get_ls()->tablet_freeze(-1, tablet_ids_, false, (i % 2 == 0) ? true : false, 0));
|
||||
ls_handles_.at(0).get_ls()->tablet_freeze(-1,
|
||||
tablet_ids_,
|
||||
need_rewrite_tablet_meta,
|
||||
0,
|
||||
need_rewrite_tablet_meta,
|
||||
ObFreezeSourceFlag::TEST_MODE));
|
||||
i = i + 1;
|
||||
}
|
||||
}
|
||||
|
@ -221,7 +221,10 @@ TEST_F(ObLockTableBeforeRestartTest, test_commit_log)
|
||||
// wait until ls checkpoint updated.
|
||||
LOG_INFO("ObLockTableBeforeRestartTest::test_commit_log 1.4");
|
||||
// freeze data, make sure other type flushed.
|
||||
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(checkpoint::INVALID_TRACE_ID, false /*is_sync*/));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(checkpoint::INVALID_TRACE_ID,
|
||||
false, /*is_sync*/
|
||||
0,
|
||||
ObFreezeSourceFlag::TEST_MODE));
|
||||
while (ls->get_clog_checkpoint_scn() < rec_scn) {
|
||||
usleep(100 * 1000); // sleep 100 ms
|
||||
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
|
@ -179,7 +179,11 @@ void TestTabletMemtable::basic_test() {
|
||||
// *********** DO TABLET FREEZE ************
|
||||
ObFreezer *freezer = nullptr;
|
||||
ASSERT_NE(nullptr, freezer = ls->get_freezer());
|
||||
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(TABLET_ID, false /* need_rewrite_meta */, true /* is_sync */, 0));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(TABLET_ID,
|
||||
true,
|
||||
0,
|
||||
false, /* need_rewrite_meta */
|
||||
ObFreezeSourceFlag::TEST_MODE));
|
||||
ASSERT_EQ(OB_ENTRY_NOT_EXIST, protected_handle->get_active_memtable(memtable_handle));
|
||||
ASSERT_EQ(OB_SUCCESS, protected_handle->get_boundary_memtable(memtable_handle));
|
||||
ASSERT_EQ(OB_SUCCESS, memtable_handle.get_tablet_memtable(memtable));
|
||||
@ -197,7 +201,11 @@ void TestTabletMemtable::basic_test() {
|
||||
|
||||
// *********** CONCURRENT TABLET FREEZE ************
|
||||
int64_t freeze_start_time = ObClockGenerator::getClock();
|
||||
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(TABLET_ID, false /*need_rewrite_meta*/, false /*is_sync*/, INT64_MAX));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(TABLET_ID,
|
||||
false /*is_sync*/,
|
||||
0,
|
||||
false, /*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::TEST_MODE));
|
||||
|
||||
sleep(2);
|
||||
ASSERT_EQ(TabletMemtableFreezeState::ACTIVE, memtable->get_freeze_state());
|
||||
@ -239,7 +247,10 @@ void TestTabletMemtable::basic_test() {
|
||||
ASSERT_EQ(0, memtable_for_direct_load->get_write_ref());
|
||||
|
||||
// *********** DO LOGSTREAM FREEZE ************
|
||||
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(checkpoint::INVALID_TRACE_ID, true /* is_sync */));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(checkpoint::INVALID_TRACE_ID,
|
||||
true, /* is_sync */
|
||||
0,
|
||||
ObFreezeSourceFlag::TEST_MODE));
|
||||
STORAGE_LOG(INFO, "finish logstream freeze");
|
||||
|
||||
// *********** CHECK LOGSTREAM FREEZE RESULT ************
|
||||
@ -363,4 +374,4 @@ int main(int argc, char **argv)
|
||||
restart_helper.run();
|
||||
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@ -1086,7 +1086,12 @@ int ObService::handle_ls_freeze_req_(const uint64_t tenant_id,
|
||||
} else if (tablet_id.is_valid()) {
|
||||
// tablet freeze
|
||||
const bool is_sync = true;
|
||||
if (OB_FAIL(freezer->tablet_freeze(ls_id, tablet_id, is_sync, 0 /*max_retry_time_us*/, false))) {
|
||||
if (OB_FAIL(freezer->tablet_freeze(ls_id,
|
||||
tablet_id,
|
||||
is_sync,
|
||||
0 /*max_retry_time_us*/,
|
||||
false, /*rewrite_tablet_meta*/
|
||||
ObFreezeSourceFlag::USER_MINOR_FREEZE))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
@ -1097,7 +1102,7 @@ int ObService::handle_ls_freeze_req_(const uint64_t tenant_id,
|
||||
}
|
||||
} else {
|
||||
// logstream freeze
|
||||
if (OB_FAIL(freezer->ls_freeze_all_unit(ls_id))) {
|
||||
if (OB_FAIL(freezer->ls_freeze_all_unit(ls_id, ObFreezeSourceFlag::USER_MINOR_FREEZE))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
@ -1127,7 +1132,7 @@ int ObService::tenant_freeze_(const uint64_t tenant_id)
|
||||
LOG_WARN("ObTenantFreezer shouldn't be null", K(ret), K(tenant_id));
|
||||
} else if (freezer->exist_ls_freezing()) {
|
||||
LOG_INFO("exist running ls_freeze", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(freezer->tenant_freeze())) {
|
||||
} else if (OB_FAIL(freezer->tenant_freeze(ObFreezeSourceFlag::USER_MINOR_FREEZE))) {
|
||||
if (OB_ENTRY_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
|
@ -125,7 +125,10 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob
|
||||
ret = OB_BACKUP_ADVANCE_CHECKPOINT_TIMEOUT;
|
||||
LOG_WARN("backup advance checkpoint by flush timeout", K(ret), K(tenant_id), K(ls_id), K(start_scn));
|
||||
} else if (need_advance_checkpoint) {
|
||||
if (OB_FAIL(ls->advance_checkpoint_by_flush(start_scn))) {
|
||||
if (OB_FAIL(ls->advance_checkpoint_by_flush(start_scn,
|
||||
INT64_MAX, /*timeout*/
|
||||
false, /*is_tenant_freeze*/
|
||||
ObFreezeSourceFlag::BACKUP))) {
|
||||
if (OB_NO_NEED_UPDATE == ret) {
|
||||
// clog checkpoint ts has passed start log ts
|
||||
ret = OB_SUCCESS;
|
||||
|
@ -25,6 +25,7 @@ namespace checkpoint
|
||||
{
|
||||
|
||||
__thread bool ObDataCheckpoint::is_tenant_freeze_for_flush_ = false;
|
||||
__thread ObFreezeSourceFlag ObDataCheckpoint::freeze_source_ = ObFreezeSourceFlag::INVALID_SOURCE;
|
||||
|
||||
// ** ObCheckpointDList **
|
||||
void ObCheckpointDList::reset()
|
||||
@ -262,7 +263,10 @@ int ObDataCheckpoint::flush(SCN recycle_scn, int64_t trace_id, bool need_freeze)
|
||||
if (is_tenant_freeze()) {
|
||||
const bool is_sync = true;
|
||||
const bool abs_timeout_ts = ObClockGenerator::getClock() + 10LL * 1000LL * 1000LL; // retry at most 10 seconds
|
||||
if (OB_FAIL(ls_->logstream_freeze(trace_id, is_sync, abs_timeout_ts))) {
|
||||
if (OB_FAIL(ls_->logstream_freeze(trace_id,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
get_freeze_source()))) {
|
||||
STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id()));
|
||||
}
|
||||
} else if (need_freeze) {
|
||||
@ -916,10 +920,18 @@ int ObDataCheckpoint::freeze_base_on_needs_(const int64_t trace_id,
|
||||
const bool abs_timeout_ts = 0; // async freeze do not need
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (logstream_freeze) {
|
||||
if (OB_FAIL(ls_->logstream_freeze(trace_id, is_sync, abs_timeout_ts))) {
|
||||
if (OB_FAIL(ls_->logstream_freeze(trace_id,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
get_freeze_source()))) {
|
||||
STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id()));
|
||||
}
|
||||
} else if (OB_FAIL(ls_->tablet_freeze(trace_id, need_flush_tablets, is_sync))) {
|
||||
} else if (OB_FAIL(ls_->tablet_freeze(trace_id,
|
||||
need_flush_tablets,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
false, /*need_rewrite_meta*/
|
||||
get_freeze_source()))) {
|
||||
STORAGE_LOG(WARN, "batch tablet freeze failed", K(ret), K(ls_->get_ls_id()), K(need_flush_tablets));
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "storage/checkpoint/ob_common_checkpoint.h"
|
||||
#include "lib/lock/ob_spin_lock.h"
|
||||
#include "storage/checkpoint/ob_freeze_checkpoint.h"
|
||||
#include "storage/ls/ob_freezer_define.h"
|
||||
#include "share/scn.h"
|
||||
#include "share/ob_errno.h"
|
||||
|
||||
@ -146,6 +147,10 @@ public:
|
||||
static void reset_tenant_freeze() { is_tenant_freeze_for_flush_ = false; }
|
||||
static bool is_tenant_freeze() { return is_tenant_freeze_for_flush_; }
|
||||
|
||||
static void set_freeze_source(const ObFreezeSourceFlag source) { freeze_source_ = source; }
|
||||
static void reset_freeze_source() { freeze_source_ = ObFreezeSourceFlag::INVALID_SOURCE; }
|
||||
static ObFreezeSourceFlag get_freeze_source() { return freeze_source_; }
|
||||
|
||||
private:
|
||||
// traversal prepare_list to flush memtable
|
||||
// case1: some memtable flush failed when ls freeze
|
||||
@ -215,6 +220,7 @@ private:
|
||||
bool ls_freeze_finished_;
|
||||
|
||||
static __thread bool is_tenant_freeze_for_flush_;
|
||||
static __thread ObFreezeSourceFlag freeze_source_;
|
||||
};
|
||||
|
||||
// list lock for DataChcekpoint
|
||||
|
@ -65,7 +65,11 @@ int ObBatchFreezeTabletsTask::inner_process()
|
||||
LOG_WARN_RET(tmp_ret, "get invalid tablet pair", K(cur_pair));
|
||||
} else if (cur_pair.schedule_merge_scn_ > weak_read_ts) {
|
||||
// no need to force freeze
|
||||
} else if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(cur_pair.tablet_id_, true/*is_sync*/, max_retry_time_us, true/*need_rewrite_meta*/))) {
|
||||
} else if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(cur_pair.tablet_id_,
|
||||
true/*is_sync*/,
|
||||
max_retry_time_us,
|
||||
true,/*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::MAJOR_FREEZE))) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to force freeze tablet", K(param), K(cur_pair));
|
||||
++cnt_.failure_cnt_;
|
||||
} else if (FALSE_IT(++cnt_.success_cnt_)) {
|
||||
|
@ -1476,7 +1476,12 @@ int ObTenantTabletScheduler::schedule_ls_minor_merge(
|
||||
start_time_us = ObClockGenerator::getClock();
|
||||
if (need_fast_freeze_tablets.empty()) {
|
||||
// empty array. do not need freeze
|
||||
} else if (OB_TMP_FAIL(ls.tablet_freeze(checkpoint::INVALID_TRACE_ID, need_fast_freeze_tablets, is_sync))) {
|
||||
} else if (OB_TMP_FAIL(ls.tablet_freeze(checkpoint::INVALID_TRACE_ID,
|
||||
need_fast_freeze_tablets,
|
||||
is_sync,
|
||||
0, /*timeout, useless for async one*/
|
||||
false, /*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::FAST_FREEZE))) {
|
||||
LOG_WARN("failt to batch freeze tablet", KR(tmp_ret), K(ls_id), K(need_fast_freeze_tablets));
|
||||
} else {
|
||||
LOG_INFO("fast freeze by batch_tablet_freeze finish",
|
||||
|
@ -216,9 +216,17 @@ int ObDDLIncCommitClogCb::on_success()
|
||||
LOG_ERROR("ls should not be null", K(ret), K(log_basic_.get_tablet_id()));
|
||||
} else {
|
||||
const bool is_sync = false;
|
||||
(void)ls->tablet_freeze(log_basic_.get_tablet_id(), is_sync);
|
||||
(void)ls->tablet_freeze(log_basic_.get_tablet_id(),
|
||||
is_sync,
|
||||
0, /*timeout, useless for async one*/
|
||||
false, /*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::DIRECT_INC_START);
|
||||
if (log_basic_.get_lob_meta_tablet_id().is_valid()) {
|
||||
(void)ls->tablet_freeze(log_basic_.get_lob_meta_tablet_id(), is_sync);
|
||||
(void)ls->tablet_freeze(log_basic_.get_lob_meta_tablet_id(),
|
||||
is_sync,
|
||||
0, /*timeout, useless for async one*/
|
||||
false, /*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::DIRECT_INC_START);
|
||||
}
|
||||
FLOG_INFO("write ddl inc commit log success", K(ls_id_), K(scn_), K(log_basic_));
|
||||
}
|
||||
|
@ -408,9 +408,18 @@ int ObDDLIncRedoLogWriter::local_write_inc_start_log(
|
||||
} else if (OB_ISNULL(ls) || !tablet_id_.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KP(ls), K(tablet_id_));
|
||||
} else if (OB_FAIL(ls->tablet_freeze(tablet_id_, is_sync, abs_timeout_ts))) {
|
||||
} else if (OB_FAIL(ls->tablet_freeze(tablet_id_,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
false, /*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::DIRECT_INC_START))) {
|
||||
LOG_WARN("sync tablet freeze failed", K(ret), K(tablet_id_));
|
||||
} else if (lob_meta_tablet_id.is_valid() && OB_FAIL(ls->tablet_freeze(lob_meta_tablet_id, is_sync, abs_timeout_ts))) {
|
||||
} else if (lob_meta_tablet_id.is_valid() &&
|
||||
OB_FAIL(ls->tablet_freeze(lob_meta_tablet_id,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
false, /*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::DIRECT_INC_START))) {
|
||||
LOG_WARN("sync tablet freeze failed", K(ret), K(lob_meta_tablet_id));
|
||||
} else if (OB_ISNULL(cb = OB_NEW(ObDDLIncStartClogCb, ObMemAttr(MTL_ID(), "DDL_IRLW")))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
|
@ -952,7 +952,7 @@ int ObDDLIncStartReplayExecutor::init(
|
||||
struct SyncTabletFreezeHelper {
|
||||
SyncTabletFreezeHelper(ObLS *ls, const ObTabletID &tablet_id, const ObTabletID &lob_meta_tablet_id)
|
||||
: ls_(ls), tablet_id_(tablet_id), lob_meta_tablet_id_(lob_meta_tablet_id) {}
|
||||
int operator()()
|
||||
int operator()(const ObFreezeSourceFlag source)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(ls_) || !tablet_id_.is_valid()) {
|
||||
@ -962,11 +962,19 @@ struct SyncTabletFreezeHelper {
|
||||
const bool is_sync = true;
|
||||
// try freeze for ten seconds
|
||||
int64_t abs_timeout_ts = ObClockGenerator::getClock() + 10LL * 1000LL * 1000LL;
|
||||
int tmp_ret = ls_->tablet_freeze(tablet_id_, is_sync, abs_timeout_ts);
|
||||
int tmp_ret = ls_->tablet_freeze(tablet_id_,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
false, /*need_rewrite_meta*/
|
||||
source);
|
||||
int tmp_ret_lob = OB_SUCCESS;
|
||||
if (lob_meta_tablet_id_.is_valid()) {
|
||||
abs_timeout_ts = ObClockGenerator::getClock() + 10LL * 1000LL * 1000LL;
|
||||
tmp_ret_lob = ls_->tablet_freeze(lob_meta_tablet_id_, is_sync, abs_timeout_ts);
|
||||
tmp_ret_lob = ls_->tablet_freeze(lob_meta_tablet_id_,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
false, /*need_rewrite_meta*/
|
||||
source);
|
||||
}
|
||||
if (OB_SUCCESS != (tmp_ret | tmp_ret_lob)) {
|
||||
ret = OB_EAGAIN;
|
||||
@ -1001,7 +1009,7 @@ int ObDDLIncStartReplayExecutor::do_replay_(ObTabletHandle &tablet_handle)
|
||||
} else {
|
||||
SyncTabletFreezeHelper sync_tablet_freeze(
|
||||
ls_, log_->get_log_basic().get_tablet_id(), log_->get_log_basic().get_lob_meta_tablet_id());
|
||||
if (OB_FAIL(sync_tablet_freeze())) {
|
||||
if (OB_FAIL(sync_tablet_freeze(ObFreezeSourceFlag::DIRECT_INC_START))) {
|
||||
LOG_WARN("fail to sync tablet freeze", K(ret), K(ls_->get_ls_id()), K(scn_), K(log_->get_log_basic()));
|
||||
} else {
|
||||
FLOG_INFO("replay ddl inc start log success", K(ls_->get_ls_id()), K(scn_), K(log_->get_log_basic()));
|
||||
@ -1061,7 +1069,7 @@ int ObDDLIncCommitReplayExecutor::do_replay_(ObTabletHandle &tablet_handle)
|
||||
} else {
|
||||
SyncTabletFreezeHelper sync_tablet_freeze(
|
||||
ls_, log_->get_log_basic().get_tablet_id(), log_->get_log_basic().get_lob_meta_tablet_id());
|
||||
if (OB_FAIL(sync_tablet_freeze())) {
|
||||
if (OB_FAIL(sync_tablet_freeze(ObFreezeSourceFlag::DIRECT_INC_END))) {
|
||||
LOG_WARN("fail to sync tablet freeze", K(ret), K(ls_->get_ls_id()), K(scn_), K(log_->get_log_basic()));
|
||||
} else {
|
||||
FLOG_INFO("replay ddl inc commit log success", K(ls_->get_ls_id()), K(scn_), K(log_->get_log_basic()));
|
||||
|
@ -1155,7 +1155,11 @@ int ObTabletBackfillTXTask::wait_memtable_frozen_()
|
||||
} else if (!table->is_frozen_memtable()) {
|
||||
is_memtable_ready = false;
|
||||
const bool is_sync = false;
|
||||
if (OB_FAIL(ls->tablet_freeze(tablet_info_.tablet_id_, is_sync))) {
|
||||
if (OB_FAIL(ls->tablet_freeze(tablet_info_.tablet_id_,
|
||||
is_sync,
|
||||
0, /*timeout, useless for async one*/
|
||||
false, /*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::TRANSFER_BACKFILL))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
|
@ -1160,7 +1160,12 @@ int ObTransferHandler::wait_tablet_write_end_(
|
||||
LOG_WARN("failed to wait tx_write end", KR(ret), K(task_info));
|
||||
} else if (OB_FAIL(ls->get_tx_svr()->traverse_trans_to_submit_redo_log(failed_tx_id))) {
|
||||
LOG_WARN("failed to submit tx log", KR(ret), K(task_info));
|
||||
} else if (OB_FAIL(ls->tablet_freeze(checkpoint::INVALID_TRACE_ID, tablet_list, is_sync))) {
|
||||
} else if (OB_FAIL(ls->tablet_freeze(checkpoint::INVALID_TRACE_ID,
|
||||
tablet_list,
|
||||
is_sync,
|
||||
0, /*timeout, 0 as default(SYNC_FREEZE_DEFAULT_RETRY_TIME)*/
|
||||
false, /*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::TRANSFER_NO_KILL_TX))) {
|
||||
LOG_WARN("batch tablet freeze failed", KR(ret), KPC(ls), K(task_info));
|
||||
} else if (OB_FAIL(ls->check_tablet_no_active_memtable(tablet_list, has_active_memtable))) {
|
||||
LOG_WARN("check tablet has active memtable failed", KR(ret), KPC(ls), K(task_info));
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "logservice/ob_log_handler.h"
|
||||
#include "lib/container/ob_array_serialization.h"
|
||||
#include "share/ob_occam_thread_pool.h"
|
||||
#include "storage/ls/ob_freezer_define.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -52,7 +53,6 @@ namespace checkpoint
|
||||
class ObDataCheckpoint;
|
||||
}
|
||||
|
||||
|
||||
class ObFreezeState
|
||||
{
|
||||
public:
|
||||
@ -199,16 +199,19 @@ public:
|
||||
|
||||
public:
|
||||
/********************** freeze **********************/
|
||||
// freezer interface
|
||||
int logstream_freeze(const int64_t trace_id);
|
||||
int wait_ls_freeze_finish();
|
||||
int wait_tablet_freeze_finish(ObIArray<ObTableHandleV2> &frozen_memtable_handles,
|
||||
ObIArray<ObTabletID> &freeze_failed_tablets);
|
||||
int ls_inner_tablet_freeze(const ObTabletID &tablet_id);
|
||||
int tablet_freeze(const int64_t trace_id,
|
||||
const ObIArray<ObTabletID> &tablet_ids,
|
||||
const bool need_rewrite_meta,
|
||||
ObIArray<ObTableHandleV2> &frozen_memtable_handles,
|
||||
ObIArray<ObTabletID> &freeze_failed_tablets);
|
||||
|
||||
// freezer helper
|
||||
int wait_ls_freeze_finish();
|
||||
int wait_tablet_freeze_finish(ObIArray<ObTableHandleV2> &frozen_memtable_handles,
|
||||
ObIArray<ObTabletID> &freeze_failed_tablets);
|
||||
int ls_inner_tablet_freeze(const ObTabletID &tablet_id);
|
||||
int get_all_async_freeze_tablets(const int64_t ls_epoch, ObIArray<ObTabletID> &tablet_ids);
|
||||
bool is_async_freeze_tablets_empty() const { return async_freeze_tablets_.empty(); }
|
||||
void record_async_freeze_tablet(const AsyncFreezeTabletInfo &async_freeze_tablet_info);
|
||||
|
58
src/storage/ls/ob_freezer_define.h
Normal file
58
src/storage/ls/ob_freezer_define.h
Normal file
@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef OCEANBASE_STORAGE_LS_OB_FREEZER_DEFINE
|
||||
#define OCEANBASE_STORAGE_LS_OB_FREEZER_DEFINE
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
|
||||
// The ObFreezeSourceFlag is used to facilitate effective management of interfaces,
|
||||
// enhance code reusability and modularity, and reduce potential conflicts and
|
||||
// dependency issues(from chatgpt).
|
||||
// Therefore, we will use it for each freeze interface in order to better
|
||||
// identify and manage them. When you add a new freeze call, be sure to add the
|
||||
// flag in the ObFreezeSourceFlag here.
|
||||
// Additionally, please remember to maintain this information in the document at
|
||||
//
|
||||
// future learners and maintainers.
|
||||
enum class ObFreezeSourceFlag : int64_t
|
||||
{
|
||||
INVALID_SOURCE = 0,
|
||||
CLOG_CHECKPOINT = 1,
|
||||
USER_MINOR_FREEZE = 2,
|
||||
FREEZE_TRIGGER = 3,
|
||||
BACKUP = 4,
|
||||
TRANSFER_NO_KILL_TX = 5,
|
||||
TRANSFER_BACKFILL = 6,
|
||||
MAJOR_FREEZE = 7,
|
||||
FAST_FREEZE = 8,
|
||||
DIRECT_INC_START = 9,
|
||||
DIRECT_INC_END = 10,
|
||||
DIRECT_INC_FREEZE = 11,
|
||||
GC_RETAIN_CTX = 12, // deprecated
|
||||
TEST_MODE = 13, // used for test only
|
||||
MAX_SOURCE = 14,
|
||||
};
|
||||
|
||||
inline bool is_valid_freeze_source(const ObFreezeSourceFlag source)
|
||||
{
|
||||
return source > ObFreezeSourceFlag::INVALID_SOURCE
|
||||
&& source < ObFreezeSourceFlag::MAX_SOURCE;
|
||||
}
|
||||
|
||||
} // storage
|
||||
} // oceanbase
|
||||
|
||||
#endif // OCEANBASE_STORAGE_LS_OB_FREEZER_DEFINE
|
@ -2008,11 +2008,17 @@ int ObLS::replay_get_tablet(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLS::logstream_freeze(const int64_t trace_id, const bool is_sync, const int64_t input_abs_timeout_ts)
|
||||
int ObLS::logstream_freeze(const int64_t trace_id,
|
||||
const bool is_sync,
|
||||
const int64_t input_abs_timeout_ts,
|
||||
const ObFreezeSourceFlag source)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (is_sync) {
|
||||
if (!is_valid_freeze_source(source)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected freeze source", K(source));
|
||||
} else if (is_sync) {
|
||||
const int64_t abs_timeout_ts = (0 == input_abs_timeout_ts)
|
||||
? ObClockGenerator::getClock() + ObFreezer::SYNC_FREEZE_DEFAULT_RETRY_TIME
|
||||
: input_abs_timeout_ts;
|
||||
@ -2029,7 +2035,8 @@ int ObLS::logstream_freeze(const int64_t trace_id, const bool is_sync, const int
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLS::logstream_freeze_task(const int64_t trace_id, const int64_t abs_timeout_ts)
|
||||
int ObLS::logstream_freeze_task(const int64_t trace_id,
|
||||
const int64_t abs_timeout_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t start_time = ObClockGenerator::getClock();
|
||||
@ -2073,17 +2080,27 @@ int ObLS::logstream_freeze_task(const int64_t trace_id, const int64_t abs_timeou
|
||||
int ObLS::tablet_freeze(const ObTabletID &tablet_id,
|
||||
const bool is_sync,
|
||||
const int64_t input_abs_timeout_ts,
|
||||
const bool need_rewrite_meta)
|
||||
const bool need_rewrite_meta,
|
||||
const ObFreezeSourceFlag source)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (tablet_id.is_ls_inner_tablet()) {
|
||||
|
||||
if (!is_valid_freeze_source(source)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected freeze source", K(source));
|
||||
} else if (tablet_id.is_ls_inner_tablet()) {
|
||||
ret = ls_freezer_.ls_inner_tablet_freeze(tablet_id);
|
||||
} else {
|
||||
ObSEArray<ObTabletID, 1> tablet_ids;
|
||||
if (OB_FAIL(tablet_ids.push_back(tablet_id))) {
|
||||
STORAGE_LOG(WARN, "push back tablet id failed", KR(ret), K(tablet_id));
|
||||
} else {
|
||||
ret = tablet_freeze(checkpoint::INVALID_TRACE_ID, tablet_ids, is_sync, input_abs_timeout_ts, need_rewrite_meta);
|
||||
ret = tablet_freeze(checkpoint::INVALID_TRACE_ID,
|
||||
tablet_ids,
|
||||
is_sync,
|
||||
input_abs_timeout_ts,
|
||||
need_rewrite_meta,
|
||||
source);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -2093,13 +2110,18 @@ int ObLS::tablet_freeze(const int64_t trace_id,
|
||||
const ObIArray<ObTabletID> &tablet_ids,
|
||||
const bool is_sync,
|
||||
const int64_t input_abs_timeout_ts,
|
||||
const bool need_rewrite_meta)
|
||||
const bool need_rewrite_meta,
|
||||
const ObFreezeSourceFlag source)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
STORAGE_LOG(
|
||||
DEBUG, "start tablet freeze", K(tablet_ids), K(is_sync), KTIME(input_abs_timeout_ts), K(need_rewrite_meta));
|
||||
int64_t freeze_epoch = ATOMIC_LOAD(&switch_epoch_);
|
||||
if (need_rewrite_meta && (!is_sync)) {
|
||||
|
||||
if (!is_valid_freeze_source(source)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected freeze source", K(source));
|
||||
} else if (need_rewrite_meta && (!is_sync)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
STORAGE_LOG(ERROR,
|
||||
"tablet freeze for rewrite meta must be sync freeze ",
|
||||
@ -2216,7 +2238,10 @@ void ObLS::record_async_freeze_tablet_(const ObTabletID &tablet_id, const int64_
|
||||
(void)ls_freezer_.record_async_freeze_tablet(tablet_info);
|
||||
}
|
||||
|
||||
int ObLS::advance_checkpoint_by_flush(SCN recycle_scn, const int64_t abs_timeout_ts, const bool is_tenant_freeze)
|
||||
int ObLS::advance_checkpoint_by_flush(SCN recycle_scn,
|
||||
const int64_t abs_timeout_ts,
|
||||
const bool is_tenant_freeze,
|
||||
const ObFreezeSourceFlag source)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t read_lock = LSLOCKALL;
|
||||
@ -2230,7 +2255,9 @@ int ObLS::advance_checkpoint_by_flush(SCN recycle_scn, const int64_t abs_timeout
|
||||
ObDataCheckpoint::set_tenant_freeze();
|
||||
LOG_INFO("set tenant_freeze", K(ls_meta_.ls_id_));
|
||||
}
|
||||
ObDataCheckpoint::set_freeze_source(source);
|
||||
ret = checkpoint_executor_.advance_checkpoint_by_flush(recycle_scn);
|
||||
ObDataCheckpoint::reset_freeze_source();
|
||||
ObDataCheckpoint::reset_tenant_freeze();
|
||||
}
|
||||
return ret;
|
||||
@ -2239,9 +2266,9 @@ int ObLS::advance_checkpoint_by_flush(SCN recycle_scn, const int64_t abs_timeout
|
||||
int ObLS::flush_to_recycle_clog()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
int64_t read_lock = LSLOCKALL;
|
||||
int64_t write_lock = 0;
|
||||
|
||||
ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -2249,11 +2276,11 @@ int ObLS::flush_to_recycle_clog()
|
||||
} else if (OB_UNLIKELY(is_offline())) {
|
||||
ret = OB_MINOR_FREEZE_NOT_ALLOW;
|
||||
LOG_WARN("offline ls not allowed freeze", K(ret), K_(ls_meta));
|
||||
} else if (FALSE_IT(ObDataCheckpoint::set_freeze_source(ObFreezeSourceFlag::CLOG_CHECKPOINT))) {
|
||||
} else if (OB_FAIL(checkpoint_executor_.advance_checkpoint_by_flush(SCN::invalid_scn() /*recycle_scn*/))) {
|
||||
STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", KR(ret), K(get_ls_id()));
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
ObDataCheckpoint::reset_freeze_source();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -67,6 +67,7 @@
|
||||
#include "storage/high_availability/ob_ls_transfer_info.h"
|
||||
#include "observer/table/ttl/ob_tenant_tablet_ttl_mgr.h"
|
||||
#include "storage/ls/ob_ls_transfer_status.h"
|
||||
#include "storage/ls/ob_freezer_define.h"
|
||||
#ifdef OB_BUILD_SHARED_STORAGE
|
||||
#include "storage/shared_storage/ob_private_block_gc_task.h"
|
||||
#include "storage/shared_storage/prewarm/ob_ls_prewarm_handler.h"
|
||||
@ -906,16 +907,21 @@ public:
|
||||
* @param[in] is_sync if is_sync == true, call logstream_freeze_task directly. Or commit an async task to execute
|
||||
* logstream_freeze_task
|
||||
* @param[in] abs_timeout_ts only used when is_sync == true, 0 as default, which means retry for
|
||||
* ObFreezer::SYNC_FREEZE_DEFAULT_RETRY_TIME seconds
|
||||
* ObFreezer::SYNC_FREEZE_DEFAULT_RETRY_TIME seconds
|
||||
* @param[in] source means the input source of the freeze
|
||||
*/
|
||||
int logstream_freeze(const int64_t trace_id, const bool is_sync, const int64_t abs_timeout_ts = 0);
|
||||
int logstream_freeze(const int64_t trace_id,
|
||||
const bool is_sync,
|
||||
const int64_t abs_timeout_ts = 0,
|
||||
const ObFreezeSourceFlag source = ObFreezeSourceFlag::INVALID_SOURCE);
|
||||
int logstream_freeze_task(const int64_t trace_id,
|
||||
const int64_t abs_timeout_ts);
|
||||
|
||||
int tablet_freeze(const ObTabletID &tablet_id,
|
||||
const bool is_sync,
|
||||
const int64_t input_abs_timeout_ts = 0,
|
||||
const bool need_rewrite_meta = false);
|
||||
const bool need_rewrite_meta = false,
|
||||
const ObFreezeSourceFlag source = ObFreezeSourceFlag::INVALID_SOURCE);
|
||||
/**
|
||||
* @brief freeze one or multiple tablets. if is_sync is true, retry until timeout. or commit an async task and retry
|
||||
* till die
|
||||
@ -926,19 +932,22 @@ public:
|
||||
* logstream_freeze_task
|
||||
* @param[in] need_rewrite_meta
|
||||
* @param[in] abs_timeout_ts only used when is_sync == true, 0 as default, which means retry for
|
||||
* ObFreezer::SYNC_FREEZE_DEFAULT_RETRY_TIME seconds
|
||||
* ObFreezer::SYNC_FREEZE_DEFAULT_RETRY_TIME seconds
|
||||
* @param[in] source means the input source of the freeze
|
||||
*/
|
||||
int tablet_freeze(const int64_t trace_id,
|
||||
const ObIArray<ObTabletID> &tablet_ids,
|
||||
const bool is_sync,
|
||||
const int64_t abs_timeout_ts = 0,
|
||||
const bool need_rewrite_meta = false);
|
||||
const bool need_rewrite_meta = false,
|
||||
const ObFreezeSourceFlag source = ObFreezeSourceFlag::INVALID_SOURCE);
|
||||
int tablet_freeze_task(const int64_t trace_id,
|
||||
const ObIArray<ObTabletID> &tablet_ids,
|
||||
const bool need_rewrite_meta,
|
||||
const bool is_sync,
|
||||
const int64_t abs_timeout_ts,
|
||||
const int64_t freeze_epoch);
|
||||
|
||||
// ObTxTable interface
|
||||
DELEGATE_WITH_RET(tx_table_, get_tx_table_guard, int);
|
||||
DELEGATE_WITH_RET(tx_table_, get_upper_trans_version_before_given_scn, int);
|
||||
@ -953,7 +962,8 @@ public:
|
||||
// @param [in] abs_timeout_ts, wait until timeout if lock conflict
|
||||
int advance_checkpoint_by_flush(share::SCN recycle_scn,
|
||||
const int64_t abs_timeout_ts = INT64_MAX,
|
||||
const bool is_tenant_freeze = false);
|
||||
const bool is_tenant_freeze = false,
|
||||
const ObFreezeSourceFlag source = ObFreezeSourceFlag::INVALID_SOURCE);
|
||||
|
||||
// ObDataCheckpoint interface:
|
||||
DELEGATE_WITH_RET(data_checkpoint_, get_freezecheckpoint_info, int);
|
||||
|
@ -242,7 +242,11 @@ void ObDirectLoadTableGuard::async_freeze_()
|
||||
STORAGE_LOG(ERROR, "ls should not be null", K(ret), KPC(this));
|
||||
} else {
|
||||
const bool is_sync = false;
|
||||
(void)ls->tablet_freeze(tablet_id_, is_sync);
|
||||
(void)ls->tablet_freeze(tablet_id_,
|
||||
is_sync,
|
||||
0, /*timeout, useless for async one*/
|
||||
false, /*need_rewrite_meta*/
|
||||
ObFreezeSourceFlag::DIRECT_INC_FREEZE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,11 @@ int ObAdvanceLSCkptTask::try_advance_ls_ckpt_ts()
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
}
|
||||
TRANS_LOG(WARN, "get ls faild", K(ret), K(MTL(ObLSService *)));
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->advance_checkpoint_by_flush(target_ckpt_ts_))) {
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->advance_checkpoint_by_flush(
|
||||
target_ckpt_ts_,
|
||||
INT64_MAX, /*timeout*/
|
||||
false, /*is_tenant_freeze*/
|
||||
ObFreezeSourceFlag::GC_RETAIN_CTX))) {
|
||||
TRANS_LOG(WARN, "advance checkpoint ts failed", K(ret), K(ls_id_), K(target_ckpt_ts_));
|
||||
}
|
||||
|
||||
|
@ -290,7 +290,10 @@ int ObTenantFreezer::ls_freeze_data_(ObLS *ls, const bool is_sync, const int64_t
|
||||
do {
|
||||
need_retry = false;
|
||||
retry_times++;
|
||||
if (OB_SUCC(ls->logstream_freeze(checkpoint::INVALID_TRACE_ID, is_sync, abs_timeout_ts))) {
|
||||
if (OB_SUCC(ls->logstream_freeze(checkpoint::INVALID_TRACE_ID,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
ObFreezeSourceFlag::FREEZE_TRIGGER))) {
|
||||
} else {
|
||||
need_retry = (ObClockGenerator::getClock() < abs_timeout_ts) && (OB_EAGAIN == ret);
|
||||
}
|
||||
@ -307,7 +310,9 @@ int ObTenantFreezer::ls_freeze_data_(ObLS *ls, const bool is_sync, const int64_t
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantFreezer::ls_freeze_all_unit_(ObLS *ls, const int64_t abs_timeout_ts)
|
||||
int ObTenantFreezer::ls_freeze_all_unit_(ObLS *ls,
|
||||
const int64_t abs_timeout_ts,
|
||||
const ObFreezeSourceFlag source)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t SLEEP_TS = 1000 * 1000; // 1s
|
||||
@ -319,7 +324,10 @@ int ObTenantFreezer::ls_freeze_all_unit_(ObLS *ls, const int64_t abs_timeout_ts)
|
||||
do {
|
||||
need_retry = false;
|
||||
retry_times++;
|
||||
if (OB_SUCC(ls->advance_checkpoint_by_flush(SCN::max_scn(), abs_timeout_ts, true /* is_tenant_freeze */))) {
|
||||
if (OB_SUCC(ls->advance_checkpoint_by_flush(SCN::max_scn(),
|
||||
abs_timeout_ts,
|
||||
true, /* is_tenant_freeze */
|
||||
source))) {
|
||||
} else {
|
||||
current_ts = ObTimeUtil::current_time();
|
||||
is_timeout = (current_ts >= abs_timeout_ts);
|
||||
@ -341,42 +349,6 @@ int ObTenantFreezer::ls_freeze_all_unit_(ObLS *ls, const int64_t abs_timeout_ts)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantFreezer::tablet_freeze_(ObLS *ls,
|
||||
const common::ObTabletID &tablet_id,
|
||||
const bool need_rewrite_tablet_meta,
|
||||
const bool is_sync,
|
||||
const int64_t abs_timeout_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t SLEEP_TS = 1000 * 1000; // 1s
|
||||
int64_t current_ts = 0;
|
||||
int64_t retry_times = 0;
|
||||
bool is_timeout = false;
|
||||
bool need_retry = false;
|
||||
// wait and retry if there is a freeze is doing
|
||||
// or if we can not get the ls lock.
|
||||
do {
|
||||
need_retry = false;
|
||||
retry_times++;
|
||||
if (OB_SUCC(ls->tablet_freeze(tablet_id, is_sync, abs_timeout_ts, need_rewrite_tablet_meta))) {
|
||||
} else {
|
||||
current_ts = ObTimeUtil::current_time();
|
||||
is_timeout = (current_ts >= abs_timeout_ts);
|
||||
// retry condition 1
|
||||
need_retry = (!is_timeout);
|
||||
// retry condition 2, 3
|
||||
need_retry = need_retry && (OB_EAGAIN == ret);
|
||||
}
|
||||
if (need_retry) {
|
||||
ob_usleep(SLEEP_TS);
|
||||
}
|
||||
if (retry_times % 10 == 0) {
|
||||
LOG_WARN_RET(OB_ERR_TOO_MUCH_TIME, "wait ls freeze finished cost too much time", K(retry_times));
|
||||
}
|
||||
} while (need_retry);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantFreezer::tenant_freeze_data_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -420,7 +392,8 @@ int ObTenantFreezer::tenant_freeze_data_()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantFreezer::tenant_freeze()
|
||||
// only called by user triggered minor freeze
|
||||
int ObTenantFreezer::tenant_freeze(const ObFreezeSourceFlag source)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -444,7 +417,7 @@ int ObTenantFreezer::tenant_freeze()
|
||||
LOG_WARN("iter is NULL", K(ret));
|
||||
} else {
|
||||
for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) {
|
||||
if (OB_TMP_FAIL(ls_freeze_all_unit_(ls, abs_timeout_ts))) {
|
||||
if (OB_TMP_FAIL(ls_freeze_all_unit_(ls, abs_timeout_ts, source))) {
|
||||
LOG_WARN("ls freeze all unit failed", K(tmp_ret), K(ls->get_ls_id()));
|
||||
}
|
||||
}
|
||||
@ -462,7 +435,8 @@ int ObTenantFreezer::tenant_freeze()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantFreezer::ls_freeze_all_unit(const share::ObLSID &ls_id)
|
||||
int ObTenantFreezer::ls_freeze_all_unit(const share::ObLSID &ls_id,
|
||||
const ObFreezeSourceFlag source)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSService *ls_srv = MTL(ObLSService *);
|
||||
@ -482,7 +456,7 @@ int ObTenantFreezer::ls_freeze_all_unit(const share::ObLSID &ls_id)
|
||||
} else if (OB_ISNULL(ls = handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("[TenantFreezer] ls is null", KR(ret), K(ls_id));
|
||||
} else if (OB_FAIL(ls_freeze_all_unit_(ls, abs_timeout_ts))) {
|
||||
} else if (OB_FAIL(ls_freeze_all_unit_(ls, abs_timeout_ts, source))) {
|
||||
LOG_WARN("[TenantFreezer] logstream freeze failed", KR(ret), K(ls_id));
|
||||
}
|
||||
|
||||
@ -492,16 +466,23 @@ int ObTenantFreezer::ls_freeze_all_unit(const share::ObLSID &ls_id)
|
||||
int ObTenantFreezer::tablet_freeze(const common::ObTabletID &tablet_id,
|
||||
const bool is_sync,
|
||||
const int64_t max_retry_time_us,
|
||||
const bool need_rewrite_tablet_meta)
|
||||
const bool need_rewrite_tablet_meta,
|
||||
const ObFreezeSourceFlag source)
|
||||
{
|
||||
return tablet_freeze(ObLSID(ObLSID::INVALID_LS_ID), tablet_id, is_sync, max_retry_time_us, need_rewrite_tablet_meta);
|
||||
return tablet_freeze(ObLSID(ObLSID::INVALID_LS_ID),
|
||||
tablet_id,
|
||||
is_sync,
|
||||
max_retry_time_us,
|
||||
need_rewrite_tablet_meta,
|
||||
source);
|
||||
}
|
||||
|
||||
int ObTenantFreezer::tablet_freeze(share::ObLSID ls_id,
|
||||
const common::ObTabletID &tablet_id,
|
||||
const bool is_sync,
|
||||
const int64_t max_retry_time_us,
|
||||
const bool need_rewrite_tablet_meta)
|
||||
const bool need_rewrite_tablet_meta,
|
||||
const ObFreezeSourceFlag source)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_cache_hit = false;
|
||||
@ -535,7 +516,11 @@ int ObTenantFreezer::tablet_freeze(share::ObLSID ls_id,
|
||||
} else if (OB_ISNULL(ls = handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("[TenantFreezer] ls is null", KR(ret), K(ls_id));
|
||||
} else if (OB_FAIL(ls->tablet_freeze(tablet_id, is_sync, abs_timeout_ts, need_rewrite_tablet_meta))) {
|
||||
} else if (OB_FAIL(ls->tablet_freeze(tablet_id,
|
||||
is_sync,
|
||||
abs_timeout_ts,
|
||||
need_rewrite_tablet_meta,
|
||||
source))) {
|
||||
LOG_WARN("[TenantFreezer] fail to freeze tablet", KR(ret), K(ls_id), K(tablet_id));
|
||||
if (OB_NOT_RUNNING == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "storage/tx_storage/ob_tenant_freezer_rpc.h"
|
||||
#include "storage/multi_data_source/runtime_utility/mds_factory.h"
|
||||
#include "storage/compaction/ob_compaction_util.h"
|
||||
#include "storage/ls/ob_freezer_define.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -137,22 +138,25 @@ public:
|
||||
void wait();
|
||||
|
||||
// freeze all the checkpoint unit of this tenant.
|
||||
int tenant_freeze();
|
||||
int tenant_freeze(const ObFreezeSourceFlag source);
|
||||
|
||||
// freeze a ls, if the ls is freezing, do nothing and return OB_ENTRY_EXIST.
|
||||
// if there is some process hold the ls lock or a OB_EAGAIN occur, we will retry
|
||||
// until timeout.
|
||||
int ls_freeze_all_unit(const share::ObLSID &ls_id);
|
||||
int ls_freeze_all_unit(const share::ObLSID &ls_id,
|
||||
const ObFreezeSourceFlag source);
|
||||
// freeze a tablet
|
||||
int tablet_freeze(const common::ObTabletID &tablet_id,
|
||||
const bool is_sync,
|
||||
const int64_t max_retry_time,
|
||||
const bool need_rewrite_tablet_meta);
|
||||
const bool need_rewrite_tablet_meta,
|
||||
const ObFreezeSourceFlag source);
|
||||
int tablet_freeze(share::ObLSID ls_id,
|
||||
const common::ObTabletID &tablet_id,
|
||||
const bool is_sync,
|
||||
const int64_t max_retry_time,
|
||||
const bool need_rewrite_tablet_meta);
|
||||
const bool need_rewrite_tablet_meta,
|
||||
const ObFreezeSourceFlag source);
|
||||
// check if this tenant's memstore is out of range, and trigger minor/major freeze.
|
||||
int check_and_do_freeze();
|
||||
|
||||
@ -256,12 +260,10 @@ private:
|
||||
bool &is_out_of_mem,
|
||||
const bool from_user = true);
|
||||
static int ls_freeze_data_(ObLS *ls, const bool is_sync, const int64_t abs_timeout_ts);
|
||||
static int ls_freeze_all_unit_(ObLS *ls, const int64_t abs_timeout_ts = INT64_MAX);
|
||||
static int tablet_freeze_(ObLS *ls,
|
||||
const common::ObTabletID &tablet_id,
|
||||
const bool need_rewrite_tablet_meta,
|
||||
const bool is_sync,
|
||||
const int64_t abs_timeout_ts);
|
||||
static int ls_freeze_all_unit_(
|
||||
ObLS *ls,
|
||||
const int64_t abs_timeout_ts = INT64_MAX,
|
||||
const ObFreezeSourceFlag source = ObFreezeSourceFlag::INVALID_SOURCE);
|
||||
// freeze all the ls of this tenant.
|
||||
// return the first failed code.
|
||||
int tenant_freeze_data_();
|
||||
|
Loading…
x
Reference in New Issue
Block a user