Add ready_for_flush flag for tenant_freeze triggered by ObTenantFreezer
This commit is contained in:
@ -204,6 +204,7 @@ ObFreezer::ObFreezer()
|
|||||||
low_priority_freeze_cnt_(0),
|
low_priority_freeze_cnt_(0),
|
||||||
need_resubmit_log_(false),
|
need_resubmit_log_(false),
|
||||||
enable_(true),
|
enable_(true),
|
||||||
|
ready_for_flush_(false),
|
||||||
is_inited_(false)
|
is_inited_(false)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
@ -218,6 +219,7 @@ ObFreezer::ObFreezer(ObLS *ls)
|
|||||||
low_priority_freeze_cnt_(0),
|
low_priority_freeze_cnt_(0),
|
||||||
need_resubmit_log_(false),
|
need_resubmit_log_(false),
|
||||||
enable_(true),
|
enable_(true),
|
||||||
|
ready_for_flush_(false),
|
||||||
is_inited_(false)
|
is_inited_(false)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
@ -238,6 +240,7 @@ void ObFreezer::reset()
|
|||||||
low_priority_freeze_cnt_ = 0;
|
low_priority_freeze_cnt_ = 0;
|
||||||
need_resubmit_log_ = false;
|
need_resubmit_log_ = false;
|
||||||
enable_ = true;
|
enable_ = true;
|
||||||
|
ready_for_flush_ = false;
|
||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,6 +262,7 @@ int ObFreezer::init(ObLS *ls)
|
|||||||
low_priority_freeze_cnt_ = 0;
|
low_priority_freeze_cnt_ = 0;
|
||||||
need_resubmit_log_ = false;
|
need_resubmit_log_ = false;
|
||||||
enable_ = true;
|
enable_ = true;
|
||||||
|
ready_for_flush_ = false;
|
||||||
|
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
}
|
}
|
||||||
@ -298,7 +302,7 @@ ObLSWRSHandler* ObFreezer::get_ls_wrs_handler()
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* logstream freeze */
|
/* logstream freeze */
|
||||||
int ObFreezer::logstream_freeze()
|
int ObFreezer::logstream_freeze(bool is_tenant_freeze)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
SCN freeze_snapshot_version;
|
SCN freeze_snapshot_version;
|
||||||
@ -329,7 +333,7 @@ int ObFreezer::logstream_freeze()
|
|||||||
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
|
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
|
||||||
} else if (FALSE_IT(set_need_resubmit_log(false))) {
|
} else if (FALSE_IT(set_need_resubmit_log(false))) {
|
||||||
} else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) {
|
} else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) {
|
||||||
} else if (OB_FAIL(inner_logstream_freeze())) {
|
} else if (OB_FAIL(inner_logstream_freeze(is_tenant_freeze))) {
|
||||||
TRANS_LOG(WARN, "[Freezer] logstream_freeze failure", K(ret), K(ls_id));
|
TRANS_LOG(WARN, "[Freezer] logstream_freeze failure", K(ret), K(ls_id));
|
||||||
undo_freeze_();
|
undo_freeze_();
|
||||||
}
|
}
|
||||||
@ -339,17 +343,20 @@ int ObFreezer::logstream_freeze()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObFreezer::inner_logstream_freeze()
|
int ObFreezer::inner_logstream_freeze(bool is_tenant_freeze)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
share::ObLSID ls_id = get_ls_id();
|
share::ObLSID ls_id = get_ls_id();
|
||||||
|
|
||||||
|
if (is_tenant_freeze) {
|
||||||
|
set_ready_for_flush(false);
|
||||||
|
}
|
||||||
if (OB_FAIL(get_ls_data_checkpoint()->ls_freeze(SCN::max_scn()))) {
|
if (OB_FAIL(get_ls_data_checkpoint()->ls_freeze(SCN::max_scn()))) {
|
||||||
// move memtables from active_list to frozen_list
|
// move memtables from active_list to frozen_list
|
||||||
TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K(ls_id));
|
TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K(ls_id));
|
||||||
stat_.add_diagnose_info("data_checkpoint freeze failed");
|
stat_.add_diagnose_info("data_checkpoint freeze failed");
|
||||||
} else if (FALSE_IT(submit_log_for_freeze())) {
|
} else if (FALSE_IT(submit_log_for_freeze())) {
|
||||||
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/))) {
|
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, is_tenant_freeze))) {
|
||||||
TRANS_LOG(WARN, "failed to submit ls_freeze task", K(ret), K(ls_id));
|
TRANS_LOG(WARN, "failed to submit ls_freeze task", K(ret), K(ls_id));
|
||||||
stat_.add_diagnose_info("fail to submit ls_freeze_task");
|
stat_.add_diagnose_info("fail to submit ls_freeze_task");
|
||||||
} else {
|
} else {
|
||||||
@ -359,7 +366,7 @@ int ObFreezer::inner_logstream_freeze()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObFreezer::ls_freeze_task()
|
void ObFreezer::ls_freeze_task(bool is_tenant_freeze)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
share::ObLSID ls_id = get_ls_id();
|
share::ObLSID ls_id = get_ls_id();
|
||||||
@ -390,6 +397,9 @@ void ObFreezer::ls_freeze_task()
|
|||||||
}
|
}
|
||||||
ob_usleep(100);
|
ob_usleep(100);
|
||||||
}
|
}
|
||||||
|
if (is_tenant_freeze) {
|
||||||
|
set_ready_for_flush(true);
|
||||||
|
}
|
||||||
stat_.add_diagnose_info("logstream_freeze success");
|
stat_.add_diagnose_info("logstream_freeze success");
|
||||||
FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock));
|
FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock));
|
||||||
|
|
||||||
@ -416,6 +426,16 @@ int ObFreezer::check_ls_state()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ObFreezer::is_ready_for_flush()
|
||||||
|
{
|
||||||
|
return ATOMIC_LOAD(&ready_for_flush_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObFreezer::set_ready_for_flush(bool ready_for_flush)
|
||||||
|
{
|
||||||
|
ATOMIC_STORE(&ready_for_flush_, ready_for_flush);
|
||||||
|
}
|
||||||
|
|
||||||
/* tablet freeze */
|
/* tablet freeze */
|
||||||
int ObFreezer::tablet_freeze(const ObTabletID &tablet_id)
|
int ObFreezer::tablet_freeze(const ObTabletID &tablet_id)
|
||||||
{
|
{
|
||||||
@ -475,7 +495,7 @@ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id)
|
|||||||
stat_.add_diagnose_info("fail to set is_tablet_freeze");
|
stat_.add_diagnose_info("fail to set is_tablet_freeze");
|
||||||
}
|
}
|
||||||
} else if (FALSE_IT(submit_log_for_freeze())) {
|
} else if (FALSE_IT(submit_log_for_freeze())) {
|
||||||
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, imemtable))) {
|
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, false/*is_tenant_freeze*/, imemtable))) {
|
||||||
TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
|
TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
|
||||||
stat_.add_diagnose_info("fail to submit tablet_freeze_task");
|
stat_.add_diagnose_info("fail to submit tablet_freeze_task");
|
||||||
} else {
|
} else {
|
||||||
@ -547,7 +567,7 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
|
|||||||
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
|
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
|
||||||
stat_.add_diagnose_info("fail to set is_tablet_freeze");
|
stat_.add_diagnose_info("fail to set is_tablet_freeze");
|
||||||
} else if (FALSE_IT(submit_log_for_freeze())) {
|
} else if (FALSE_IT(submit_log_for_freeze())) {
|
||||||
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, imemtable))) {
|
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, false/*is_tenant_freeze*/, imemtable))) {
|
||||||
TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id));
|
TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id));
|
||||||
stat_.add_diagnose_info("fail to submit freeze_task");
|
stat_.add_diagnose_info("fail to submit freeze_task");
|
||||||
} else {
|
} else {
|
||||||
@ -800,7 +820,7 @@ int ObFreezer::submit_log_for_freeze()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObFreezer::submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imemtable)
|
int ObFreezer::submit_freeze_task(bool is_ls_freeze, bool is_tenant_freeze, memtable::ObIMemtable *imemtable)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTenantFreezer *tenant_freezer = nullptr;
|
ObTenantFreezer *tenant_freezer = nullptr;
|
||||||
@ -813,7 +833,7 @@ int ObFreezer::submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imem
|
|||||||
} else {
|
} else {
|
||||||
ObSpinLockGuard freeze_thread_pool(tenant_freezer->freeze_thread_pool_lock_);
|
ObSpinLockGuard freeze_thread_pool(tenant_freezer->freeze_thread_pool_lock_);
|
||||||
do {
|
do {
|
||||||
ret = is_ls_freeze ? tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this]() { ls_freeze_task(); })
|
ret = is_ls_freeze ? tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, is_tenant_freeze]() { ls_freeze_task(is_tenant_freeze); })
|
||||||
: tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, imemtable]() { tablet_freeze_task(imemtable); });
|
: tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, imemtable]() { tablet_freeze_task(imemtable); });
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||||
|
|||||||
@ -194,7 +194,7 @@ public:
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
/* freeze */
|
/* freeze */
|
||||||
int logstream_freeze();
|
int logstream_freeze(bool is_tenant_freeze=false);
|
||||||
int tablet_freeze(const ObTabletID &tablet_id);
|
int tablet_freeze(const ObTabletID &tablet_id);
|
||||||
int force_tablet_freeze(const ObTabletID &tablet_id);
|
int force_tablet_freeze(const ObTabletID &tablet_id);
|
||||||
int tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *&imemtable);
|
int tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *&imemtable);
|
||||||
@ -239,6 +239,8 @@ public:
|
|||||||
ObFreezerStat& get_stat() { return stat_; }
|
ObFreezerStat& get_stat() { return stat_; }
|
||||||
bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); }
|
bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); }
|
||||||
void set_need_resubmit_log(bool flag) { return ATOMIC_STORE(&need_resubmit_log_, flag); }
|
void set_need_resubmit_log(bool flag) { return ATOMIC_STORE(&need_resubmit_log_, flag); }
|
||||||
|
bool is_ready_for_flush();
|
||||||
|
void set_ready_for_flush(bool ready_for_flush);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
class ObLSFreezeGuard
|
class ObLSFreezeGuard
|
||||||
@ -269,11 +271,11 @@ private:
|
|||||||
void undo_freeze_();
|
void undo_freeze_();
|
||||||
|
|
||||||
/* inner subfunctions for freeze process */
|
/* inner subfunctions for freeze process */
|
||||||
int inner_logstream_freeze();
|
int inner_logstream_freeze(bool is_tenant_freeze);
|
||||||
int submit_log_for_freeze();
|
int submit_log_for_freeze();
|
||||||
void ls_freeze_task();
|
void ls_freeze_task(bool is_tenant_freeze);
|
||||||
int tablet_freeze_task(memtable::ObIMemtable *imemtable);
|
int tablet_freeze_task(memtable::ObIMemtable *imemtable);
|
||||||
int submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imemtable = nullptr);
|
int submit_freeze_task(bool is_ls_freeze, bool is_tenant_freeze, memtable::ObIMemtable *imemtable = nullptr);
|
||||||
void wait_memtable_ready_for_flush(memtable::ObMemtable *memtable);
|
void wait_memtable_ready_for_flush(memtable::ObMemtable *memtable);
|
||||||
int wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable);
|
int wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable);
|
||||||
int handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable);
|
int handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable);
|
||||||
@ -307,6 +309,7 @@ private:
|
|||||||
|
|
||||||
bool need_resubmit_log_;
|
bool need_resubmit_log_;
|
||||||
bool enable_; // whether we can do freeze now
|
bool enable_; // whether we can do freeze now
|
||||||
|
bool ready_for_flush_;
|
||||||
|
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -1166,7 +1166,7 @@ int ObLS::replay_get_tablet(const common::ObTabletID &tablet_id,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObLS::logstream_freeze()
|
int ObLS::logstream_freeze(bool is_tenant_freeze)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
|
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
|
||||||
@ -1181,7 +1181,7 @@ int ObLS::logstream_freeze()
|
|||||||
} else if (OB_UNLIKELY(!log_handler_.is_replay_enabled())) {
|
} else if (OB_UNLIKELY(!log_handler_.is_replay_enabled())) {
|
||||||
ret = OB_NOT_RUNNING;
|
ret = OB_NOT_RUNNING;
|
||||||
LOG_WARN("log handler not enable replay, should not freeze", K(ret), K_(ls_meta));
|
LOG_WARN("log handler not enable replay, should not freeze", K(ret), K_(ls_meta));
|
||||||
} else if (OB_FAIL(ls_freezer_.logstream_freeze())) {
|
} else if (OB_FAIL(ls_freezer_.logstream_freeze(is_tenant_freeze))) {
|
||||||
LOG_WARN("logstream freeze failed", K(ret), K_(ls_meta));
|
LOG_WARN("logstream freeze failed", K(ret), K_(ls_meta));
|
||||||
} else {
|
} else {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
|||||||
@ -587,8 +587,8 @@ public:
|
|||||||
|
|
||||||
// ObFreezer interface:
|
// ObFreezer interface:
|
||||||
// logstream freeze
|
// logstream freeze
|
||||||
// @param [in] null
|
// @param [in] is_tenant_freeze: only used for ObTenantFreezer::tenant_freeze_()
|
||||||
int logstream_freeze();
|
int logstream_freeze(bool is_tenant_freeze=false);
|
||||||
// tablet freeze
|
// tablet freeze
|
||||||
// @param [in] tablet_id
|
// @param [in] tablet_id
|
||||||
// int tablet_freeze(const ObTabletID &tablet_id);
|
// int tablet_freeze(const ObTabletID &tablet_id);
|
||||||
|
|||||||
@ -199,13 +199,18 @@ int ObTenantFreezer::ls_freeze_(ObLS *ls)
|
|||||||
// wait if there is a freeze is doing
|
// wait if there is a freeze is doing
|
||||||
do {
|
do {
|
||||||
retry_times++;
|
retry_times++;
|
||||||
if (OB_FAIL(ls->logstream_freeze()) && OB_ENTRY_EXIST == ret) {
|
if (OB_FAIL(ls->logstream_freeze(true/*is_tenant_freeze*/)) && OB_ENTRY_EXIST == ret) {
|
||||||
ob_usleep(SLEEP_TS);
|
ob_usleep(SLEEP_TS);
|
||||||
}
|
}
|
||||||
if (retry_times % 10 == 0) {
|
if (retry_times % 10 == 0) {
|
||||||
LOG_WARN("wait ls freeze finished cost too much time", K(retry_times));
|
LOG_WARN("wait ls freeze finished cost too much time", K(retry_times));
|
||||||
}
|
}
|
||||||
} while (ret == OB_ENTRY_EXIST);
|
} while (ret == OB_ENTRY_EXIST);
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
while (!ls->get_freezer()->is_ready_for_flush()) {
|
||||||
|
ob_usleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user