Add sync interface to check if freeze finished

This commit is contained in:
obdev 2022-12-27 08:38:37 +00:00 committed by ob-robot
parent 6c9add4152
commit 59c443225d
9 changed files with 129 additions and 101 deletions

View File

@ -262,7 +262,7 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
}
} else if (OB_FAIL(submit_medium_clog(medium_info))) {
LOG_WARN("failed to submit medium clog and update inner table", K(ret), KPC(this));
} else if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, false/*force_freeze*/))) {
} else if (OB_TMP_FAIL(ls_.tablet_freeze(tablet_id, true/*is_sync*/))) {
// need to freeze memtable with MediumCompactionInfo
LOG_WARN("failed to freeze tablet", K(tmp_ret), KPC(this));
}
@ -760,7 +760,7 @@ int ObMediumCompactionScheduleFunc::freeze_memtable_to_get_medium_info()
} // end of for
if (OB_FAIL(ret)) {
} else if (receive_medium_info) {
if (OB_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_.get_tablet_meta().tablet_id_, false/*force_freeze*/))) {
if (OB_FAIL(ls_.tablet_freeze(tablet_.get_tablet_meta().tablet_id_, true/*is_sync*/))) {
if (OB_TABLE_NOT_EXIST != ret) {
LOG_WARN("failed to freeze tablet", K(ret), KPC(this));
}

View File

@ -830,7 +830,7 @@ int ObTenantTabletScheduler::schedule_ls_minor_merge(
} else if (OB_TMP_FAIL(fast_freeze_checker_.check_need_fast_freeze(*tablet_handle.get_obj(), need_fast_freeze))) {
LOG_WARN("failed to check need fast freeze", K(tmp_ret), K(tablet_handle));
} else if (need_fast_freeze) {
if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, false/*force_freeze*/))) {
if (OB_TMP_FAIL(ls.tablet_freeze(tablet_id, true/*is_sync*/))) {
LOG_WARN("failt to freeze tablet", K(tmp_ret), K(tablet_id));
}
}

View File

@ -216,7 +216,6 @@ ObFreezer::ObFreezer()
low_priority_freeze_cnt_(0),
need_resubmit_log_(false),
enable_(true),
ready_for_flush_(false),
is_inited_(false)
{}
@ -231,7 +230,6 @@ ObFreezer::ObFreezer(ObLS *ls)
low_priority_freeze_cnt_(0),
need_resubmit_log_(false),
enable_(true),
ready_for_flush_(false),
is_inited_(false)
{}
@ -252,7 +250,6 @@ void ObFreezer::reset()
low_priority_freeze_cnt_ = 0;
need_resubmit_log_ = false;
enable_ = true;
ready_for_flush_ = false;
is_inited_ = false;
}
@ -274,7 +271,6 @@ int ObFreezer::init(ObLS *ls)
low_priority_freeze_cnt_ = 0;
need_resubmit_log_ = false;
enable_ = true;
ready_for_flush_ = false;
is_inited_ = true;
}
@ -314,7 +310,7 @@ ObLSWRSHandler* ObFreezer::get_ls_wrs_handler()
}
/* logstream freeze */
int ObFreezer::logstream_freeze(bool is_tenant_freeze)
int ObFreezer::logstream_freeze(ObFuture<int> *result)
{
int ret = OB_SUCCESS;
SCN freeze_snapshot_version;
@ -346,7 +342,7 @@ int ObFreezer::logstream_freeze(bool is_tenant_freeze)
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
} else if (FALSE_IT(set_need_resubmit_log(false))) {
} else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) {
} else if (OB_FAIL(inner_logstream_freeze(is_tenant_freeze))) {
} else if (OB_FAIL(inner_logstream_freeze(result))) {
TRANS_LOG(WARN, "[Freezer] logstream_freeze failure", K(ret), K(ls_id));
undo_freeze_();
}
@ -356,20 +352,17 @@ int ObFreezer::logstream_freeze(bool is_tenant_freeze)
return ret;
}
int ObFreezer::inner_logstream_freeze(bool is_tenant_freeze)
int ObFreezer::inner_logstream_freeze(ObFuture<int> *result)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
if (is_tenant_freeze) {
set_ready_for_flush(false);
}
if (OB_FAIL(get_ls_data_checkpoint()->ls_freeze(SCN::max_scn()))) {
// move memtables from active_list to frozen_list
TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K(ls_id));
stat_.add_diagnose_info("data_checkpoint freeze failed");
} else if (FALSE_IT(submit_log_for_freeze())) {
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, is_tenant_freeze))) {
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, result))) {
TRANS_LOG(WARN, "failed to submit ls_freeze task", K(ret), K(ls_id));
stat_.add_diagnose_info("fail to submit ls_freeze_task");
} else {
@ -379,7 +372,7 @@ int ObFreezer::inner_logstream_freeze(bool is_tenant_freeze)
return ret;
}
void ObFreezer::ls_freeze_task(bool is_tenant_freeze)
int ObFreezer::ls_freeze_task()
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
@ -410,9 +403,6 @@ void ObFreezer::ls_freeze_task(bool is_tenant_freeze)
}
ob_usleep(100);
}
if (is_tenant_freeze) {
set_ready_for_flush(true);
}
stat_.add_diagnose_info("logstream_freeze success");
FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock));
@ -421,6 +411,8 @@ void ObFreezer::ls_freeze_task(bool is_tenant_freeze)
stat_.ret_code_ = ret;
unset_freeze_();
return ret;
}
// must be used under the protection of ls_lock
@ -439,18 +431,8 @@ int ObFreezer::check_ls_state()
return ret;
}
bool ObFreezer::is_ready_for_flush()
{
return ATOMIC_LOAD(&ready_for_flush_);
}
void ObFreezer::set_ready_for_flush(bool ready_for_flush)
{
ATOMIC_STORE(&ready_for_flush_, ready_for_flush);
}
/* tablet freeze */
int ObFreezer::tablet_freeze(const ObTabletID &tablet_id)
int ObFreezer::tablet_freeze(const ObTabletID &tablet_id, ObFuture<int> *result)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
@ -509,7 +491,7 @@ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id)
stat_.add_diagnose_info("fail to set is_tablet_freeze");
}
} else if (FALSE_IT(submit_log_for_freeze())) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, false/*is_tenant_freeze*/, imemtable))) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, result, imemtable))) {
TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to submit tablet_freeze_task");
} else {
@ -582,7 +564,7 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to set is_tablet_freeze");
} else if (FALSE_IT(submit_log_for_freeze())) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, false/*is_tenant_freeze*/, imemtable))) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, nullptr, imemtable))) {
TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to submit freeze_task");
} else {
@ -836,7 +818,7 @@ int ObFreezer::submit_log_for_freeze()
return ret;
}
int ObFreezer::submit_freeze_task(bool is_ls_freeze, bool is_tenant_freeze, memtable::ObIMemtable *imemtable)
int ObFreezer::submit_freeze_task(bool is_ls_freeze, ObFuture<int> *result, memtable::ObIMemtable *imemtable)
{
int ret = OB_SUCCESS;
ObTenantFreezer *tenant_freezer = nullptr;
@ -849,8 +831,24 @@ int ObFreezer::submit_freeze_task(bool is_ls_freeze, bool is_tenant_freeze, memt
} else {
ObSpinLockGuard freeze_thread_pool(tenant_freezer->freeze_thread_pool_lock_);
do {
ret = is_ls_freeze ? tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, is_tenant_freeze]() { ls_freeze_task(is_tenant_freeze); })
: tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, imemtable]() { tablet_freeze_task(imemtable); });
if (OB_ISNULL(result)) {
if (is_ls_freeze) {
ret = tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this]() {
return ls_freeze_task(); });
} else {
ret = tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, imemtable]() {
return tablet_freeze_task(imemtable); });
}
} else {
if (is_ls_freeze) {
ret = tenant_freezer->freeze_thread_pool_.commit_task(*result,
[this]() { return ls_freeze_task(); });
} else {
ret = tenant_freezer->freeze_thread_pool_.commit_task(*result,
[this, imemtable]() { return tablet_freeze_task(imemtable); });
}
}
if (OB_FAIL(ret)) {
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 100 * 1000) {
@ -865,6 +863,28 @@ int ObFreezer::submit_freeze_task(bool is_ls_freeze, bool is_tenant_freeze, memt
return ret;
}
int ObFreezer::wait_freeze_finished(ObFuture<int> &result)
{
int ret = OB_SUCCESS;
if (result.is_valid()) {
share::ObLSID ls_id = get_ls_id();
int *ret_code = nullptr;
const int64_t start = ObTimeUtility::current_time();
result.get(ret_code);
const int64_t cost_time = ObTimeUtility::current_time() - start;
ret = OB_ISNULL(ret_code) ? OB_ERR_UNEXPECTED : *ret_code;
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "failed to freeze", K(ret), K(cost_time), K(ls_id));
} else if (cost_time > 3 * 1000 * 1000) {
TRANS_LOG(WARN, "waiting ready_for_flush costs too much time", K(ret), K(cost_time), K(ls_id));
}
}
return ret;
}
void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable)
{
share::ObLSID ls_id = get_ls_id();

View File

@ -195,8 +195,8 @@ public:
public:
/* freeze */
int logstream_freeze(bool is_tenant_freeze=false);
int tablet_freeze(const ObTabletID &tablet_id);
int logstream_freeze(ObFuture<int> *result = nullptr);
int tablet_freeze(const ObTabletID &tablet_id, ObFuture<int> *result = nullptr);
int force_tablet_freeze(const ObTabletID &tablet_id);
int tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *&imemtable);
int handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *imemtable);
@ -240,8 +240,8 @@ public:
ObFreezerStat& get_stat() { return stat_; }
bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); }
void set_need_resubmit_log(bool flag) { return ATOMIC_STORE(&need_resubmit_log_, flag); }
bool is_ready_for_flush();
void set_ready_for_flush(bool ready_for_flush);
// only used after start freeze_task successfully
int wait_freeze_finished(ObFuture<int> &result);
private:
class ObLSFreezeGuard
@ -272,11 +272,11 @@ private:
void undo_freeze_();
/* inner subfunctions for freeze process */
int inner_logstream_freeze(bool is_tenant_freeze);
int inner_logstream_freeze(ObFuture<int> *result);
int submit_log_for_freeze();
void ls_freeze_task(bool is_tenant_freeze);
int ls_freeze_task();
int tablet_freeze_task(memtable::ObIMemtable *imemtable);
int submit_freeze_task(bool is_ls_freeze, bool is_tenant_freeze, memtable::ObIMemtable *imemtable = nullptr);
int submit_freeze_task(bool is_ls_freeze, ObFuture<int> *result, memtable::ObIMemtable *imemtable = nullptr);
void wait_memtable_ready_for_flush(memtable::ObMemtable *memtable);
int wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable);
int handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable);
@ -310,7 +310,6 @@ private:
bool need_resubmit_log_;
bool enable_; // whether we can do freeze now
bool ready_for_flush_;
bool is_inited_;
};

View File

@ -1224,49 +1224,67 @@ int ObLS::replay_get_tablet(const common::ObTabletID &tablet_id,
return ret;
}
int ObLS::logstream_freeze(bool is_tenant_freeze)
int ObLS::logstream_freeze(bool is_sync)
{
int ret = OB_SUCCESS;
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_myself(lock_, read_lock, write_lock);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls is not inited", K(ret));
} else if (OB_UNLIKELY(is_stopped_)) {
ret = OB_NOT_RUNNING;
LOG_WARN("ls stopped", K(ret), K_(ls_meta));
} else if (OB_UNLIKELY(!log_handler_.is_replay_enabled())) {
ret = OB_NOT_RUNNING;
LOG_WARN("log handler not enable replay, should not freeze", K(ret), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.logstream_freeze(is_tenant_freeze))) {
LOG_WARN("logstream freeze failed", K(ret), K_(ls_meta));
} else {
// do nothing
ObFuture<int> result;
{
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_myself(lock_, read_lock, write_lock);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls is not inited", K(ret));
} else if (OB_UNLIKELY(is_stopped_)) {
ret = OB_NOT_RUNNING;
LOG_WARN("ls stopped", K(ret), K_(ls_meta));
} else if (OB_UNLIKELY(!log_handler_.is_replay_enabled())) {
ret = OB_NOT_RUNNING;
LOG_WARN("log handler not enable replay, should not freeze", K(ret), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.logstream_freeze(&result))) {
LOG_WARN("logstream freeze failed", K(ret), K_(ls_meta));
} else {
// do nothing
}
}
if (is_sync) {
ret = ls_freezer_.wait_freeze_finished(result);
}
return ret;
}
int ObLS::tablet_freeze(const ObTabletID &tablet_id)
int ObLS::tablet_freeze(const ObTabletID &tablet_id, bool is_sync)
{
int ret = OB_SUCCESS;
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_myself(lock_, read_lock, write_lock);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls is not inited", K(ret));
} else if (OB_UNLIKELY(is_stopped_)) {
ret = OB_NOT_RUNNING;
LOG_WARN("ls stopped", K(ret), K_(ls_meta));
} else if (OB_UNLIKELY(!log_handler_.is_replay_enabled())) {
ret = OB_NOT_RUNNING;
LOG_WARN("log handler not enable replay, should not freeze", K(ret), K(tablet_id), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.tablet_freeze(tablet_id))) {
LOG_WARN("tablet freeze failed", K(ret), K(tablet_id));
} else {
// do nothing
ObFuture<int> result;
{
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_myself(lock_, read_lock, write_lock);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls is not inited", K(ret));
} else if (OB_UNLIKELY(is_stopped_)) {
ret = OB_NOT_RUNNING;
LOG_WARN("ls stopped", K(ret), K_(ls_meta));
} else if (OB_UNLIKELY(!log_handler_.is_replay_enabled())) {
ret = OB_NOT_RUNNING;
LOG_WARN("log handler not enable replay, should not freeze", K(ret), K(tablet_id), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.tablet_freeze(tablet_id, &result))) {
LOG_WARN("tablet freeze failed", K(ret), K(tablet_id));
} else {
// do nothing
}
}
if (is_sync) {
ret = ls_freezer_.wait_freeze_finished(result);
}
return ret;
}

View File

@ -592,14 +592,16 @@ public:
DELEGATE_WITH_RET(replay_handler_, replay, int);
// ObFreezer interface:
// logstream freeze
// @param [in] is_tenant_freeze: only used for ObTenantFreezer::tenant_freeze_()
int logstream_freeze(bool is_tenant_freeze=false);
// @param [in] result, only used for wait_freeze_finished()
// int logstream_freeze(ObFuture<int> *result = nullptr);
// DELEGATE_WITH_RET(ls_freezer_, logstream_freeze, int);
int logstream_freeze(bool is_sync = false);
// tablet freeze
// @param [in] tablet_id
// int tablet_freeze(const ObTabletID &tablet_id);
// @param [in] result, only used for wait_freeze_finished()
// int tablet_freeze(const ObTabletID &tablet_id, ObFuture<int> *result = nullptr);
// DELEGATE_WITH_RET(ls_freezer_, tablet_freeze, int);
int tablet_freeze(const ObTabletID &tablet_id);
int tablet_freeze(const ObTabletID &tablet_id, bool is_sync = false);
// force freeze tablet
// @param [in] tablet_id
// int force_tablet_freeze(const ObTabletID &tablet_id);

View File

@ -353,8 +353,8 @@ int ObTabletGCHandler::freeze_unpersist_tablet_ids(const common::ObTabletIDArray
if (!unpersist_tablet_ids.at(i).is_valid()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "invalid tablet_id", KR(ret), KPC(this->ls_), K(unpersist_tablet_ids));
} else if (OB_FAIL(ls_->tablet_freeze(unpersist_tablet_ids.at(i)))) {
STORAGE_LOG(WARN, "fail to tablet freeze", KR(ret), KPC(this->ls_), K(unpersist_tablet_ids.at(i)));
} else if (OB_FAIL(ls_->tablet_freeze(unpersist_tablet_ids.at(i), true/*is_sync*/))) {
STORAGE_LOG(WARN, "fail to freeze tablet", KR(ret), KPC(this->ls_), K(unpersist_tablet_ids.at(i)));
}
}
}

View File

@ -199,25 +199,14 @@ int ObTenantFreezer::ls_freeze_(ObLS *ls)
// wait if there is a freeze is doing
do {
retry_times++;
if (OB_FAIL(ls->logstream_freeze(true/*is_tenant_freeze*/)) && OB_ENTRY_EXIST == ret) {
if (OB_FAIL(ls->logstream_freeze(true/*is_sync*/)) && OB_ENTRY_EXIST == ret) {
ob_usleep(SLEEP_TS);
}
if (retry_times % 10 == 0) {
LOG_WARN("wait ls freeze finished cost too much time", K(retry_times));
}
} while (ret == OB_ENTRY_EXIST);
if (OB_SUCC(ret)) {
const int64_t start = ObTimeUtility::current_time();
while (!ls->get_freezer()->is_ready_for_flush()) {
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 5 * 1000 * 1000) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
TRANS_LOG(WARN, "[TenantFreezer] cost too much time to wait ls ready_for_flush", K(ls->get_ls_id()), K(cost_time));
}
}
ob_usleep(100);
}
} else if (OB_NOT_RUNNING == ret) {
if (OB_NOT_RUNNING == ret) {
ret = OB_SUCCESS;
}
return ret;
@ -264,7 +253,8 @@ int ObTenantFreezer::tenant_freeze_()
}
int ObTenantFreezer::tablet_freeze(const common::ObTabletID &tablet_id,
const bool is_force_freeze)
const bool is_force_freeze,
const bool is_sync)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id;
@ -293,10 +283,8 @@ int ObTenantFreezer::tablet_freeze(const common::ObTabletID &tablet_id,
LOG_WARN("[TenantFreezer] ls is null", KR(ret), K(ls_id));
} else if (OB_FAIL(is_force_freeze
? ls->force_tablet_freeze(tablet_id)
: ls->tablet_freeze(tablet_id))) {
: ls->tablet_freeze(tablet_id, is_sync))) {
LOG_WARN("[TenantFreezer] fail to freeze tablet", KR(ret), K(ls_id), K(tablet_id));
} else {
LOG_INFO("[TenantFreezer] succeed to freeze tablet", KR(ret), K(ls_id), K(tablet_id));
}
return ret;

View File

@ -59,7 +59,8 @@ public:
// freeze a tablet
int tablet_freeze(const common::ObTabletID &tablet_id,
const bool is_force_freeze=false);
const bool is_force_freeze = false,
const bool is_sync = false);
// check if this tenant's memstore is out of range, and trigger minor/major freeze.
int check_and_do_freeze();