[IMPROVEMENT] move submit log out of ls lock
This commit is contained in:
parent
06205d99e6
commit
85f5291643
@ -1321,7 +1321,8 @@ int ObMultipleMerge::prepare_tables_from_iterator(ObTableStoreIterator &table_it
|
||||
} else if (table_ptr->is_memtable()) {
|
||||
++memtable_cnt;
|
||||
read_released_memtable = read_released_memtable ||
|
||||
TabletMemtableFreezeState::RELEASED == (static_cast<ObITabletMemtable *>(table_ptr))->get_freeze_state();
|
||||
TabletMemtableFreezeState::FORCE_RELEASED == (static_cast<ObITabletMemtable *>(table_ptr))->get_freeze_state() ||
|
||||
TabletMemtableFreezeState::RELEASED == (static_cast<ObITabletMemtable *>(table_ptr))->get_freeze_state();
|
||||
}
|
||||
if (OB_FAIL(tables_.push_back(table_ptr))) {
|
||||
LOG_WARN("add table fail", K(ret), K(*table_ptr));
|
||||
|
@ -597,14 +597,9 @@ int ObFreezer::ls_freeze_task_()
|
||||
// retry to submit the log to go around the above case
|
||||
(ObTimeUtility::current_time() - last_submit_log_time >= 1_min)) {
|
||||
last_submit_log_time = ObTimeUtility::current_time();
|
||||
int64_t read_lock = LSLOCKALL;
|
||||
int64_t write_lock = 0;
|
||||
ObLSLockGuard lock_ls(ls_, ls_->lock_, read_lock, write_lock);
|
||||
if (OB_FAIL(check_ls_state())) {
|
||||
} else {
|
||||
submit_log_for_freeze(false/*tablet freeze*/, false/*try*/);
|
||||
TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id));
|
||||
}
|
||||
|
||||
(void)submit_log_for_freeze(false/*tablet freeze*/, false/*try*/);
|
||||
TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id));
|
||||
}
|
||||
|
||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||
@ -1093,35 +1088,37 @@ int ObFreezer::wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tab
|
||||
int64_t last_submit_log_time = start;
|
||||
int ret = OB_SUCCESS;
|
||||
bool ready_for_flush = false;
|
||||
bool is_force_released = false;
|
||||
|
||||
do {
|
||||
if (OB_FAIL(try_wait_memtable_ready_for_flush_with_ls_lock(tablet_memtable,
|
||||
ready_for_flush,
|
||||
is_force_released,
|
||||
start,
|
||||
last_submit_log_time))) {
|
||||
TRANS_LOG(WARN, "[Freezer] memtable is not ready_for_flush", K(ret));
|
||||
}
|
||||
} while (OB_SUCC(ret) && !ready_for_flush);
|
||||
} while (OB_SUCC(ret) && !ready_for_flush && !is_force_released);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObFreezer::try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable,
|
||||
bool &ready_for_flush,
|
||||
bool &is_force_released,
|
||||
const int64_t start,
|
||||
int64_t &last_submit_log_time)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t read_lock = LSLOCKALL;
|
||||
int64_t write_lock = 0;
|
||||
ObLSLockGuard lock_ls(ls_, ls_->lock_, read_lock, write_lock);
|
||||
|
||||
if (OB_FAIL(check_ls_state())) {
|
||||
} else if (OB_ISNULL(tablet_memtable)) {
|
||||
if (OB_ISNULL(tablet_memtable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "[Freezer] memtable cannot be null", K(ret));
|
||||
} else if (FALSE_IT(ready_for_flush = tablet_memtable->ready_for_flush())) {
|
||||
} else if (!ready_for_flush) {
|
||||
} else if (FALSE_IT(is_force_released = tablet_memtable->is_force_released())) {
|
||||
} else if (!ready_for_flush && !is_force_released) {
|
||||
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
||||
if (need_resubmit_log() ||
|
||||
// In order to prevent the txn has already passed the try_submit test
|
||||
@ -1129,7 +1126,7 @@ int ObFreezer::try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable
|
||||
// retry to submit the log to go around the above case
|
||||
(ObTimeUtility::current_time() - last_submit_log_time >= 1_min)) {
|
||||
last_submit_log_time = ObTimeUtility::current_time();
|
||||
submit_log_for_freeze(true/*tablet freeze*/, false/*try*/);
|
||||
(void)submit_log_for_freeze(true/*tablet freeze*/, false/*try*/);
|
||||
TRANS_LOG(INFO, "[Freezer] resubmit log", K(ret));
|
||||
}
|
||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||
@ -1343,6 +1340,7 @@ int ObFreezer::batch_tablet_freeze_task(ObTableHandleArray tables_array)
|
||||
ObTableHandleV2 &handle = tables_array.at(i);
|
||||
ObITabletMemtable *tablet_memtable = nullptr;
|
||||
bool ready_for_flush = false;
|
||||
bool is_force_released = false;
|
||||
if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "memtable cannot be null", K(ret), K(ls_id));
|
||||
@ -1350,10 +1348,11 @@ int ObFreezer::batch_tablet_freeze_task(ObTableHandleArray tables_array)
|
||||
LOG_WARN("fail to get memtable", K(ret));
|
||||
} else if (OB_FAIL(try_wait_memtable_ready_for_flush_with_ls_lock(tablet_memtable,
|
||||
ready_for_flush,
|
||||
is_force_released,
|
||||
start,
|
||||
last_submit_log_time))) {
|
||||
TRANS_LOG(WARN, "[Freezer] fail to wait memtable ready_for_flush", K(ret), K(ls_id));
|
||||
} else if (!ready_for_flush) {
|
||||
} else if (!ready_for_flush && !is_force_released) {
|
||||
} else if (OB_FAIL(finish_freeze_with_ls_lock(tablet_memtable))) {
|
||||
TRANS_LOG(WARN, "[Freezer] fail to finish_freeze", K(ret), K(ls_id), KPC(tablet_memtable));
|
||||
} else if (OB_FAIL(tables_array.pop_back(handle))) {
|
||||
@ -1392,32 +1391,6 @@ int ObFreezer::finish_freeze_with_ls_lock(ObITabletMemtable *tablet_memtable)
|
||||
return ret;
|
||||
}
|
||||
|
||||
// NOTICE: not called by user now
|
||||
/* private subfunctions for freeze process */
|
||||
int ObFreezer::handle_memtable_for_tablet_freeze(ObIMemtable *imemtable)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
share::ObLSID ls_id = get_ls_id();
|
||||
|
||||
if (OB_ISNULL(imemtable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else {
|
||||
ObITabletMemtable *tablet_memtable = static_cast<ObITabletMemtable*>(imemtable);
|
||||
try_submit_log_for_freeze_(true/*tablet freeze*/);
|
||||
wait_memtable_ready_for_flush(tablet_memtable);
|
||||
if (OB_FAIL(tablet_memtable->finish_freeze())) {
|
||||
TRANS_LOG(ERROR, "[Freezer] memtable cannot be flushed",
|
||||
K(ret), K(ls_id), KPC(tablet_memtable));
|
||||
stat_.add_diagnose_info("memtable cannot be flushed");
|
||||
} else {
|
||||
TRANS_LOG(INFO, "[Freezer] memtable is ready to be flushed",
|
||||
K(ret), K(ls_id), KPC(tablet_memtable));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
namespace {
|
||||
struct FreezeDiagnoseInfo {
|
||||
const char *fmt_;
|
||||
@ -1447,6 +1420,18 @@ int ObFreezer::submit_log_for_freeze(const bool is_tablet_freeze, const bool is_
|
||||
do {
|
||||
ret = OB_SUCCESS;
|
||||
transaction::ObTransID fail_tx_id;
|
||||
|
||||
{
|
||||
int64_t read_lock = LSLOCKALL;
|
||||
int64_t write_lock = 0;
|
||||
ObLSLockGuard lock_ls(ls_, ls_->lock_, read_lock, write_lock);
|
||||
if (OB_FAIL(check_ls_state())) {
|
||||
// ls has died, we need break the loop for submitting log
|
||||
TRANS_LOG(WARN, "ls state has die", K(ls_id));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// because tablet freeze will not inc freeze clock, fake the freeze clock
|
||||
const uint32_t freeze_clock = is_tablet_freeze ? get_freeze_clock() + 1 : get_freeze_clock();
|
||||
if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_redo_log(fail_tx_id, freeze_clock))) {
|
||||
@ -1580,38 +1565,6 @@ int ObFreezer::wait_freeze_finished(ObFuture<int> &result)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObFreezer::wait_memtable_ready_for_flush(ObITabletMemtable *tablet_memtable)
|
||||
{
|
||||
share::ObLSID ls_id = get_ls_id();
|
||||
const int64_t start = ObTimeUtility::current_time();
|
||||
int64_t last_submit_log_time = start;
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
while (!tablet_memtable->ready_for_flush()) {
|
||||
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
|
||||
if (need_resubmit_log() ||
|
||||
// In order to prevent the txn has already passed the try_submit test
|
||||
// while failing to submit some logs due to an unexpected bug, we need
|
||||
// retry to submit the log to go around the above case
|
||||
(ObTimeUtility::current_time() - last_submit_log_time >= 1_min)) {
|
||||
last_submit_log_time = ObTimeUtility::current_time();
|
||||
submit_log_for_freeze(true/*tablet freeze*/, false/*try*/);
|
||||
TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id));
|
||||
}
|
||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||
|
||||
if (cost_time > 5 * 1000 * 1000) {
|
||||
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "[Freezer] ready_for_flush costs too much time",
|
||||
K(ls_id), K(cost_time), KPC(tablet_memtable));
|
||||
stat_.add_diagnose_info("ready_for_flush costs too much time");
|
||||
tablet_memtable->print_ready_for_flush();
|
||||
}
|
||||
}
|
||||
ob_usleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* private subfunctions about freeze flag*/
|
||||
int ObFreezer::loop_set_freeze_flag()
|
||||
{
|
||||
|
@ -289,9 +289,7 @@ private:
|
||||
const share::SCN freeze_snapshot_version,
|
||||
int &ret);
|
||||
void submit_freeze_task_(const bool is_ls_freeze, ObFuture<int> *result, ObTableHandleV2 &handle);
|
||||
void wait_memtable_ready_for_flush(ObITabletMemtable *tablet_memtable);
|
||||
int wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable);
|
||||
int handle_memtable_for_tablet_freeze(ObIMemtable *imemtable);
|
||||
int try_set_tablet_freeze_begin_();
|
||||
void set_tablet_freeze_begin_();
|
||||
void set_tablet_freeze_end_();
|
||||
@ -309,6 +307,7 @@ private:
|
||||
int finish_freeze_with_ls_lock(ObITabletMemtable *tablet_memtable);
|
||||
int try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable,
|
||||
bool &ready_for_flush,
|
||||
bool &is_force_released,
|
||||
const int64_t start,
|
||||
int64_t &last_submit_log_time);
|
||||
void submit_checkpoint_task();
|
||||
|
@ -371,6 +371,13 @@ bool ObITabletMemtable::can_be_minor_merged()
|
||||
return is_in_prepare_list_of_data_checkpoint();
|
||||
}
|
||||
|
||||
|
||||
bool ObITabletMemtable::is_force_released() const
|
||||
{
|
||||
return TabletMemtableFreezeState::FORCE_RELEASED == get_freeze_state();
|
||||
}
|
||||
|
||||
|
||||
} // namespace storage
|
||||
|
||||
} // namespace oceanbase
|
||||
|
@ -120,6 +120,7 @@ enum TabletMemtableFreezeState : int64_t {
|
||||
READY_FOR_FLUSH = 3,
|
||||
FLUSHED = 4,
|
||||
RELEASED = 5,
|
||||
FORCE_RELEASED = 6,
|
||||
MAX_FREEZE_STATE
|
||||
};
|
||||
|
||||
@ -134,7 +135,9 @@ const static char *TABLET_MEMTABLE_FREEZE_STATE_TO_STR(const int64_t state)
|
||||
STATIC_ASSERT(TabletMemtableFreezeState::READY_FOR_FLUSH == 3, "Invalid State Enum");
|
||||
STATIC_ASSERT(TabletMemtableFreezeState::FLUSHED == 4, "Invalid State Enum");
|
||||
STATIC_ASSERT(TabletMemtableFreezeState::RELEASED == 5, "Invalid State Enum");
|
||||
const static char TABLET_MEMTABLE_FREEZE_STATE_TO_STR[6][20] = {"INVALID", "ACTIVE", "FREEZING", "READY_FOR_FLUSH", "FLUSHED", "RELEASED"};
|
||||
STATIC_ASSERT(TabletMemtableFreezeState::FORCE_RELEASED == 6, "Invalid State Enum");
|
||||
const static char TABLET_MEMTABLE_FREEZE_STATE_TO_STR[7][20] =
|
||||
{"INVALID", "ACTIVE", "FREEZING", "READY_FOR_FLUSH", "FLUSHED", "RELEASED", "FORCE_RELEASED"};
|
||||
return TABLET_MEMTABLE_FREEZE_STATE_TO_STR[state];
|
||||
}
|
||||
|
||||
@ -290,6 +293,7 @@ public:
|
||||
virtual void print_ready_for_flush() = 0;
|
||||
virtual void set_allow_freeze(const bool allow_freeze) = 0;
|
||||
virtual int set_frozen() = 0;
|
||||
virtual bool is_force_released() const;
|
||||
virtual int get_schema_info(
|
||||
const int64_t input_column_cnt,
|
||||
int64_t &max_schema_version_on_memtable,
|
||||
@ -326,7 +330,7 @@ public:
|
||||
void set_push_table_into_gc_queue_time(const int64_t timestamp) { mt_stat_.push_table_into_gc_queue_time_ = timestamp; }
|
||||
void set_freeze_state(const TabletMemtableFreezeState state)
|
||||
{
|
||||
if (state >= TabletMemtableFreezeState::ACTIVE && state <= TabletMemtableFreezeState::RELEASED) {
|
||||
if (state >= TabletMemtableFreezeState::ACTIVE && state <= TabletMemtableFreezeState::FORCE_RELEASED) {
|
||||
freeze_state_ = state;
|
||||
}
|
||||
}
|
||||
|
@ -741,7 +741,6 @@ DEF_REPORT_CHEKCPOINT_DIAGNOSE_INFO(UpdateReleaseTime, update_release_time)
|
||||
int ObTabletMemtableMgr::release_head_memtable_(ObIMemtable *imemtable,
|
||||
const bool force)
|
||||
{
|
||||
UNUSED(force);
|
||||
int ret = OB_SUCCESS;
|
||||
ObITabletMemtable *memtable = static_cast<ObITabletMemtable *>(imemtable);
|
||||
|
||||
@ -768,7 +767,11 @@ int ObTabletMemtableMgr::release_head_memtable_(ObIMemtable *imemtable,
|
||||
}
|
||||
memtable->remove_from_data_checkpoint();
|
||||
memtable->set_is_flushed();
|
||||
memtable->set_freeze_state(TabletMemtableFreezeState::RELEASED);
|
||||
if (force) {
|
||||
memtable->set_freeze_state(TabletMemtableFreezeState::FORCE_RELEASED);
|
||||
} else {
|
||||
memtable->set_freeze_state(TabletMemtableFreezeState::RELEASED);
|
||||
}
|
||||
memtable->set_frozen();
|
||||
memtable->report_memtable_diagnose_info(UpdateReleaseTime());
|
||||
release_head_memtable();
|
||||
|
Loading…
x
Reference in New Issue
Block a user