[4.2] move submit_log_for_freeze out of ls lock
This commit is contained in:
@ -530,6 +530,16 @@ int ObFreezer::logstream_freeze(ObFuture<int> *result)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ObFreezer::try_submit_log_for_freeze_()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
if (OB_FAIL(submit_log_for_freeze(true/*try*/))) {
|
||||||
|
TRANS_LOG(WARN, "fail to try submit log for freeze", K(ret));
|
||||||
|
set_need_resubmit_log(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int ObFreezer::inner_logstream_freeze(ObFuture<int> *result)
|
int ObFreezer::inner_logstream_freeze(ObFuture<int> *result)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -537,7 +547,7 @@ int ObFreezer::inner_logstream_freeze(ObFuture<int> *result)
|
|||||||
ObTableHandleV2 handle;
|
ObTableHandleV2 handle;
|
||||||
|
|
||||||
if (FALSE_IT(submit_checkpoint_task())) {
|
if (FALSE_IT(submit_checkpoint_task())) {
|
||||||
} else if (FALSE_IT(submit_log_for_freeze())) {
|
} else if (FALSE_IT(try_submit_log_for_freeze_())) {
|
||||||
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, result, handle))) {
|
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, result, handle))) {
|
||||||
TRANS_LOG(ERROR, "failed to submit ls_freeze task", K(ret), K(ls_id));
|
TRANS_LOG(ERROR, "failed to submit ls_freeze task", K(ret), K(ls_id));
|
||||||
stat_.add_diagnose_info("fail to submit ls_freeze_task");
|
stat_.add_diagnose_info("fail to submit ls_freeze_task");
|
||||||
@ -573,24 +583,27 @@ int ObFreezer::ls_freeze_task()
|
|||||||
// wait till all memtables are moved from frozen_list to prepare_list
|
// wait till all memtables are moved from frozen_list to prepare_list
|
||||||
// this means that all memtables can be dumped
|
// this means that all memtables can be dumped
|
||||||
while (!get_ls_data_checkpoint()->ls_freeze_finished()) {
|
while (!get_ls_data_checkpoint()->ls_freeze_finished()) {
|
||||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
||||||
if (cost_time > 5 * 1000 * 1000) {
|
if (need_resubmit_log()) {
|
||||||
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
|
||||||
if (need_resubmit_log()) {
|
int64_t write_lock = 0;
|
||||||
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
|
ObLSLockGuard lock_ls(ls_, ls_->lock_, read_lock, write_lock);
|
||||||
int64_t write_lock = 0;
|
if (OB_FAIL(check_ls_state())) {
|
||||||
ObLSLockGuard lock_ls(ls_, ls_->lock_, read_lock, write_lock);
|
} else {
|
||||||
if (OB_FAIL(check_ls_state())) {
|
submit_log_for_freeze(false/*try*/);
|
||||||
} else {
|
TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id));
|
||||||
submit_log_for_freeze();
|
|
||||||
TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id), K(cost_time));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||||
|
|
||||||
|
if (cost_time > 5 * 1000 * 1000) {
|
||||||
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "[Freezer] finish ls_freeze costs too much time",
|
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "[Freezer] finish ls_freeze costs too much time",
|
||||||
K(ls_id), K(cost_time));
|
K(ls_id), K(cost_time));
|
||||||
stat_.add_diagnose_info("finish ls_freeze costs too much time");
|
stat_.add_diagnose_info("finish ls_freeze costs too much time");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ob_usleep(100);
|
ob_usleep(100);
|
||||||
}
|
}
|
||||||
stat_.add_diagnose_info("logstream_freeze success");
|
stat_.add_diagnose_info("logstream_freeze success");
|
||||||
@ -696,7 +709,7 @@ int ObFreezer::freeze_normal_tablet_(const ObTabletID &tablet_id, ObFuture<int>
|
|||||||
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
|
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
|
||||||
stat_.add_diagnose_info("fail to set is_tablet_freeze");
|
stat_.add_diagnose_info("fail to set is_tablet_freeze");
|
||||||
}
|
}
|
||||||
} else if (FALSE_IT(submit_log_for_freeze())) {
|
} else if (FALSE_IT(try_submit_log_for_freeze_())) {
|
||||||
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, result, frozen_memtable_handle))) {
|
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, result, frozen_memtable_handle))) {
|
||||||
TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
|
TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
|
||||||
stat_.add_diagnose_info("fail to submit tablet_freeze_task");
|
stat_.add_diagnose_info("fail to submit tablet_freeze_task");
|
||||||
@ -782,7 +795,7 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
|
|||||||
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(frozen_memtable_handle, true))) {
|
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(frozen_memtable_handle, true))) {
|
||||||
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
|
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
|
||||||
stat_.add_diagnose_info("fail to set is_tablet_freeze");
|
stat_.add_diagnose_info("fail to set is_tablet_freeze");
|
||||||
} else if (FALSE_IT(submit_log_for_freeze())) {
|
} else if (FALSE_IT(try_submit_log_for_freeze_())) {
|
||||||
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, nullptr, frozen_memtable_handle))) {
|
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, nullptr, frozen_memtable_handle))) {
|
||||||
TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id));
|
TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id));
|
||||||
stat_.add_diagnose_info("fail to submit freeze_task");
|
stat_.add_diagnose_info("fail to submit freeze_task");
|
||||||
@ -867,18 +880,20 @@ int ObFreezer::try_wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtab
|
|||||||
TRANS_LOG(WARN, "[Freezer] memtable cannot be null", K(ret));
|
TRANS_LOG(WARN, "[Freezer] memtable cannot be null", K(ret));
|
||||||
} else if (FALSE_IT(ready_for_flush = memtable->ready_for_flush())) {
|
} else if (FALSE_IT(ready_for_flush = memtable->ready_for_flush())) {
|
||||||
} else if (!ready_for_flush) {
|
} else if (!ready_for_flush) {
|
||||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
||||||
if (cost_time > 5 * 1000 * 1000) {
|
if (need_resubmit_log()) {
|
||||||
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
submit_log_for_freeze(false/*try*/);
|
||||||
if (need_resubmit_log()) {
|
TRANS_LOG(INFO, "[Freezer] resubmit log", K(ret));
|
||||||
submit_log_for_freeze();
|
}
|
||||||
TRANS_LOG(INFO, "[Freezer] resubmit log", K(cost_time));
|
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||||
}
|
|
||||||
|
if (cost_time > 5 * 1000 * 1000) {
|
||||||
TRANS_LOG(WARN, "[Freezer] ready_for_flush costs too much time",
|
TRANS_LOG(WARN, "[Freezer] ready_for_flush costs too much time",
|
||||||
K(cost_time), KPC(memtable));
|
K(cost_time), KPC(memtable));
|
||||||
stat_.add_diagnose_info("ready_for_flush costs too much time");
|
stat_.add_diagnose_info("ready_for_flush costs too much time");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ob_usleep(100);
|
ob_usleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -955,6 +970,7 @@ int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTICE: not called by user now
|
||||||
// must ensure that call tablet_freeze_for_replace_tablet_meta() successfully before calling the func
|
// must ensure that call tablet_freeze_for_replace_tablet_meta() successfully before calling the func
|
||||||
int ObFreezer::handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &tablet_id, ObTableHandleV2 &handle)
|
int ObFreezer::handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &tablet_id, ObTableHandleV2 &handle)
|
||||||
{
|
{
|
||||||
@ -1083,7 +1099,7 @@ int ObFreezer::batch_tablet_freeze_(const ObIArray<ObTabletID> &tablet_ids, ObFu
|
|||||||
need_freeze = false;
|
need_freeze = false;
|
||||||
TRANS_LOG(INFO, "[Freezer] no need to freeze batch tablets", K(ret), K(tablet_ids));
|
TRANS_LOG(INFO, "[Freezer] no need to freeze batch tablets", K(ret), K(tablet_ids));
|
||||||
stat_.add_diagnose_info("no need to freeze batch tablets");
|
stat_.add_diagnose_info("no need to freeze batch tablets");
|
||||||
} else if (FALSE_IT(submit_log_for_freeze())) {
|
} else if (FALSE_IT(try_submit_log_for_freeze_())) {
|
||||||
} else if (OB_FAIL(submit_batch_tablet_freeze_task(memtable_handles, result))) {
|
} else if (OB_FAIL(submit_batch_tablet_freeze_task(memtable_handles, result))) {
|
||||||
TRANS_LOG(WARN, "[Freezer] fail to submit batch_tablet_freeze task", K(ret));
|
TRANS_LOG(WARN, "[Freezer] fail to submit batch_tablet_freeze task", K(ret));
|
||||||
} else {
|
} else {
|
||||||
@ -1187,6 +1203,8 @@ int ObFreezer::finish_freeze_with_ls_lock(memtable::ObMemtable *memtable)
|
|||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTICE: not called by user now
|
||||||
/* private subfunctions for freeze process */
|
/* private subfunctions for freeze process */
|
||||||
int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable)
|
int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable)
|
||||||
{
|
{
|
||||||
@ -1197,7 +1215,7 @@ int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtabl
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
} else {
|
} else {
|
||||||
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable*>(imemtable);
|
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable*>(imemtable);
|
||||||
submit_log_for_freeze();
|
try_submit_log_for_freeze_();
|
||||||
wait_memtable_ready_for_flush(memtable);
|
wait_memtable_ready_for_flush(memtable);
|
||||||
if (OB_FAIL(memtable->finish_freeze())) {
|
if (OB_FAIL(memtable->finish_freeze())) {
|
||||||
TRANS_LOG(ERROR, "[Freezer] memtable cannot be flushed",
|
TRANS_LOG(ERROR, "[Freezer] memtable cannot be flushed",
|
||||||
@ -1212,7 +1230,7 @@ int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtabl
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObFreezer::submit_log_for_freeze()
|
int ObFreezer::submit_log_for_freeze(bool is_try)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
share::ObLSID ls_id = get_ls_id();
|
share::ObLSID ls_id = get_ls_id();
|
||||||
@ -1242,16 +1260,19 @@ int ObFreezer::submit_log_for_freeze()
|
|||||||
if (OB_LOG_OUTOF_DISK_SPACE == ret) {
|
if (OB_LOG_OUTOF_DISK_SPACE == ret) {
|
||||||
ob_usleep(100 * 1000);
|
ob_usleep(100 * 1000);
|
||||||
}
|
}
|
||||||
} while (OB_FAIL(ret));
|
} while (!(is_try && (ObTimeUtility::current_time() - start > 10 * 1_s))
|
||||||
DEL_SUSPECT_INFO(MINI_MERGE, ls_id, tablet_id);
|
// we break the loop if we retry with a long time with the try semantic
|
||||||
|
&& OB_FAIL(ret));
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
|
DEL_SUSPECT_INFO(MINI_MERGE, ls_id, tablet_id);
|
||||||
|
|
||||||
if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_next_log())) {
|
if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_next_log())) {
|
||||||
TRANS_LOG(WARN, "traverse trans ctx to submit next log failed", K(ret));
|
TRANS_LOG(WARN, "traverse trans ctx to submit next log failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
stat_.set_state(ObFreezeState::WAIT_READY_FOR_FLUSH);
|
stat_.set_state(ObFreezeState::WAIT_READY_FOR_FLUSH);
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -1330,15 +1351,16 @@ void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable)
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
while (!memtable->ready_for_flush()) {
|
while (!memtable->ready_for_flush()) {
|
||||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
||||||
if (cost_time > 5 * 1000 * 1000) {
|
if (need_resubmit_log()) {
|
||||||
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
submit_log_for_freeze(false/*try*/);
|
||||||
if (need_resubmit_log()) {
|
TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id));
|
||||||
submit_log_for_freeze();
|
}
|
||||||
TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id), K(cost_time));
|
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||||
}
|
|
||||||
|
if (cost_time > 5 * 1000 * 1000) {
|
||||||
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "[Freezer] ready_for_flush costs too much time",
|
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "[Freezer] ready_for_flush costs too much time",
|
||||||
K(ls_id), K(cost_time), KPC(memtable));
|
K(ls_id), K(cost_time), KPC(memtable));
|
||||||
stat_.add_diagnose_info("ready_for_flush costs too much time");
|
stat_.add_diagnose_info("ready_for_flush costs too much time");
|
||||||
memtable->print_ready_for_flush();
|
memtable->print_ready_for_flush();
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,8 @@ private:
|
|||||||
|
|
||||||
/* inner subfunctions for freeze process */
|
/* inner subfunctions for freeze process */
|
||||||
int inner_logstream_freeze(ObFuture<int> *result);
|
int inner_logstream_freeze(ObFuture<int> *result);
|
||||||
int submit_log_for_freeze();
|
int submit_log_for_freeze(bool is_try);
|
||||||
|
void try_submit_log_for_freeze_();
|
||||||
int ls_freeze_task();
|
int ls_freeze_task();
|
||||||
int tablet_freeze_task(ObTableHandleV2 handle);
|
int tablet_freeze_task(ObTableHandleV2 handle);
|
||||||
int submit_freeze_task(const bool is_ls_freeze, ObFuture<int> *result, ObTableHandleV2 &handle);
|
int submit_freeze_task(const bool is_ls_freeze, ObFuture<int> *result, ObTableHandleV2 &handle);
|
||||||
|
Reference in New Issue
Block a user