diff --git a/src/storage/ls/ob_freezer.cpp b/src/storage/ls/ob_freezer.cpp index 1a24aab4a9..b5effaa957 100644 --- a/src/storage/ls/ob_freezer.cpp +++ b/src/storage/ls/ob_freezer.cpp @@ -204,6 +204,7 @@ ObFreezer::ObFreezer() low_priority_freeze_cnt_(0), need_resubmit_log_(false), enable_(true), + ready_for_flush_(false), is_inited_(false) {} @@ -218,6 +219,7 @@ ObFreezer::ObFreezer(ObLS *ls) low_priority_freeze_cnt_(0), need_resubmit_log_(false), enable_(true), + ready_for_flush_(false), is_inited_(false) {} @@ -238,6 +240,7 @@ void ObFreezer::reset() low_priority_freeze_cnt_ = 0; need_resubmit_log_ = false; enable_ = true; + ready_for_flush_ = false; is_inited_ = false; } @@ -259,6 +262,7 @@ int ObFreezer::init(ObLS *ls) low_priority_freeze_cnt_ = 0; need_resubmit_log_ = false; enable_ = true; + ready_for_flush_ = false; is_inited_ = true; } @@ -298,7 +302,7 @@ ObLSWRSHandler* ObFreezer::get_ls_wrs_handler() } /* logstream freeze */ -int ObFreezer::logstream_freeze() +int ObFreezer::logstream_freeze(bool is_tenant_freeze) { int ret = OB_SUCCESS; SCN freeze_snapshot_version; @@ -329,7 +333,7 @@ int ObFreezer::logstream_freeze() } else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) { } else if (FALSE_IT(set_need_resubmit_log(false))) { } else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) { - } else if (OB_FAIL(inner_logstream_freeze())) { + } else if (OB_FAIL(inner_logstream_freeze(is_tenant_freeze))) { TRANS_LOG(WARN, "[Freezer] logstream_freeze failure", K(ret), K(ls_id)); undo_freeze_(); } @@ -339,17 +343,20 @@ int ObFreezer::logstream_freeze() return ret; } -int ObFreezer::inner_logstream_freeze() +int ObFreezer::inner_logstream_freeze(bool is_tenant_freeze) { int ret = OB_SUCCESS; share::ObLSID ls_id = get_ls_id(); + if (is_tenant_freeze) { + set_ready_for_flush(false); + } if (OB_FAIL(get_ls_data_checkpoint()->ls_freeze(SCN::max_scn()))) { // move memtables from active_list to frozen_list TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K(ls_id)); stat_.add_diagnose_info("data_checkpoint freeze failed"); } else if (FALSE_IT(submit_log_for_freeze())) { - } else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/))) { + } else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, is_tenant_freeze))) { TRANS_LOG(WARN, "failed to submit ls_freeze task", K(ret), K(ls_id)); stat_.add_diagnose_info("fail to submit ls_freeze_task"); } else { @@ -359,7 +366,7 @@ int ObFreezer::inner_logstream_freeze() return ret; } -void ObFreezer::ls_freeze_task() +void ObFreezer::ls_freeze_task(bool is_tenant_freeze) { int ret = OB_SUCCESS; share::ObLSID ls_id = get_ls_id(); @@ -390,6 +397,9 @@ void ObFreezer::ls_freeze_task() } ob_usleep(100); } + if (is_tenant_freeze) { + set_ready_for_flush(true); + } stat_.add_diagnose_info("logstream_freeze success"); FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock)); @@ -416,6 +426,16 @@ int ObFreezer::check_ls_state() return ret; } +bool ObFreezer::is_ready_for_flush() +{ + return ATOMIC_LOAD(&ready_for_flush_); +} + +void ObFreezer::set_ready_for_flush(bool ready_for_flush) +{ + ATOMIC_STORE(&ready_for_flush_, ready_for_flush); +} + /* tablet freeze */ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id) { @@ -475,7 +495,7 @@ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id) stat_.add_diagnose_info("fail to set is_tablet_freeze"); } } else if (FALSE_IT(submit_log_for_freeze())) { - } else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, imemtable))) { + } else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, false/*is_tenant_freeze*/, imemtable))) { TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to submit tablet_freeze_task"); } else { @@ -547,7 +567,7 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id) TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to set is_tablet_freeze"); } else if (FALSE_IT(submit_log_for_freeze())) { - } else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, imemtable))) { + } else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, false/*is_tenant_freeze*/, imemtable))) { TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to submit freeze_task"); } else { @@ -800,7 +820,7 @@ int ObFreezer::submit_log_for_freeze() return ret; } -int ObFreezer::submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imemtable) +int ObFreezer::submit_freeze_task(bool is_ls_freeze, bool is_tenant_freeze, memtable::ObIMemtable *imemtable) { int ret = OB_SUCCESS; ObTenantFreezer *tenant_freezer = nullptr; @@ -813,7 +833,7 @@ int ObFreezer::submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imem } else { ObSpinLockGuard freeze_thread_pool(tenant_freezer->freeze_thread_pool_lock_); do { - ret = is_ls_freeze ? tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this]() { ls_freeze_task(); }) + ret = is_ls_freeze ? tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, is_tenant_freeze]() { ls_freeze_task(is_tenant_freeze); }) : tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, imemtable]() { tablet_freeze_task(imemtable); }); if (OB_FAIL(ret)) { const int64_t cost_time = ObTimeUtility::current_time() - start; diff --git a/src/storage/ls/ob_freezer.h b/src/storage/ls/ob_freezer.h index 8cc257ed30..b82000ba06 100644 --- a/src/storage/ls/ob_freezer.h +++ b/src/storage/ls/ob_freezer.h @@ -194,7 +194,7 @@ public: public: /* freeze */ - int logstream_freeze(); + int logstream_freeze(bool is_tenant_freeze=false); int tablet_freeze(const ObTabletID &tablet_id); int force_tablet_freeze(const ObTabletID &tablet_id); int tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *&imemtable); @@ -239,6 +239,8 @@ public: ObFreezerStat& get_stat() { return stat_; } bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); } void set_need_resubmit_log(bool flag) { return ATOMIC_STORE(&need_resubmit_log_, flag); } + bool is_ready_for_flush(); + void set_ready_for_flush(bool ready_for_flush); private: class ObLSFreezeGuard @@ -269,11 +271,11 @@ private: void undo_freeze_(); /* inner subfunctions for freeze process */ - int inner_logstream_freeze(); + int inner_logstream_freeze(bool is_tenant_freeze); int submit_log_for_freeze(); - void ls_freeze_task(); + void ls_freeze_task(bool is_tenant_freeze); int tablet_freeze_task(memtable::ObIMemtable *imemtable); - int submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imemtable = nullptr); + int submit_freeze_task(bool is_ls_freeze, bool is_tenant_freeze, memtable::ObIMemtable *imemtable = nullptr); void wait_memtable_ready_for_flush(memtable::ObMemtable *memtable); int wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable); int handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable); @@ -307,6 +309,7 @@ private: bool need_resubmit_log_; bool enable_; // whether we can do freeze now + bool ready_for_flush_; bool is_inited_; }; diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index 2fec73baa4..6c4eeb9876 100644 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -1166,7 +1166,7 @@ int ObLS::replay_get_tablet(const common::ObTabletID &tablet_id, return ret; } -int ObLS::logstream_freeze() +int ObLS::logstream_freeze(bool is_tenant_freeze) { int ret = OB_SUCCESS; int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; @@ -1181,7 +1181,7 @@ int ObLS::logstream_freeze() } else if (OB_UNLIKELY(!log_handler_.is_replay_enabled())) { ret = OB_NOT_RUNNING; LOG_WARN("log handler not enable replay, should not freeze", K(ret), K_(ls_meta)); - } else if (OB_FAIL(ls_freezer_.logstream_freeze())) { + } else if (OB_FAIL(ls_freezer_.logstream_freeze(is_tenant_freeze))) { LOG_WARN("logstream freeze failed", K(ret), K_(ls_meta)); } else { // do nothing diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index be0b7ff4d5..5f76e35aa7 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -587,8 +587,8 @@ public: // ObFreezer interface: // logstream freeze - // @param [in] null - int logstream_freeze(); + // @param [in] is_tenant_freeze: only used for ObTenantFreezer::tenant_freeze_() + int logstream_freeze(bool is_tenant_freeze=false); // tablet freeze // @param [in] tablet_id // int tablet_freeze(const ObTabletID &tablet_id); diff --git a/src/storage/tx_storage/ob_tenant_freezer.cpp b/src/storage/tx_storage/ob_tenant_freezer.cpp index 7f57a5e79b..4cf66f58b3 100644 --- a/src/storage/tx_storage/ob_tenant_freezer.cpp +++ b/src/storage/tx_storage/ob_tenant_freezer.cpp @@ -199,13 +199,18 @@ int ObTenantFreezer::ls_freeze_(ObLS *ls) // wait if there is a freeze is doing do { retry_times++; - if (OB_FAIL(ls->logstream_freeze()) && OB_ENTRY_EXIST == ret) { + if (OB_FAIL(ls->logstream_freeze(true/*is_tenant_freeze*/)) && OB_ENTRY_EXIST == ret) { ob_usleep(SLEEP_TS); } if (retry_times % 10 == 0) { LOG_WARN("wait ls freeze finished cost too much time", K(retry_times)); } } while (ret == OB_ENTRY_EXIST); + if (OB_SUCC(ret)) { + while (!ls->get_freezer()->is_ready_for_flush()) { + ob_usleep(100); + } + } return ret; }