checkpoint link list lock optimization
This commit is contained in:
@ -110,7 +110,7 @@ public:
|
||||
{
|
||||
if (rec_scn < rec_scn_) {
|
||||
rec_scn_ = rec_scn;
|
||||
check_can_move_to_active();
|
||||
data_checkpoint_->transfer_from_new_create_to_active_without_src_lock_(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -97,7 +97,7 @@ public:
|
||||
{
|
||||
if (rec_scn < rec_scn_) {
|
||||
rec_scn_ = rec_scn;
|
||||
check_can_move_to_active();
|
||||
data_checkpoint_->transfer_from_new_create_to_active_without_src_lock_(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -206,8 +206,7 @@ int ObDataCheckpoint::safe_to_destroy(bool &is_safe_destroy)
|
||||
}
|
||||
}
|
||||
|
||||
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] safe_to_destroy");
|
||||
WLOCK(LS_FROZEN | NEW_CREATE | ACTIVE | PREPARE);
|
||||
new_create_list_.reset();
|
||||
ls_frozen_list_.reset();
|
||||
active_list_.reset();
|
||||
@ -222,8 +221,7 @@ int ObDataCheckpoint::safe_to_destroy(bool &is_safe_destroy)
|
||||
|
||||
SCN ObDataCheckpoint::get_rec_scn()
|
||||
{
|
||||
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] get_rec_scn");
|
||||
RLOCK(LS_FROZEN | NEW_CREATE | ACTIVE | PREPARE);
|
||||
int ret = OB_SUCCESS;
|
||||
SCN min_rec_scn = SCN::max_scn();
|
||||
SCN tmp = SCN::max_scn();
|
||||
@ -240,7 +238,6 @@ SCN ObDataCheckpoint::get_rec_scn()
|
||||
if ((tmp = prepare_list_.get_min_rec_scn_in_list()) < min_rec_scn) {
|
||||
min_rec_scn = tmp;
|
||||
}
|
||||
|
||||
return min_rec_scn;
|
||||
}
|
||||
|
||||
@ -273,14 +270,12 @@ int ObDataCheckpoint::ls_freeze(SCN rec_scn)
|
||||
|
||||
void ObDataCheckpoint::set_ls_freeze_finished_(bool is_finished)
|
||||
{
|
||||
ObSpinLockGuard guard(lock_);
|
||||
ls_freeze_finished_ = is_finished;
|
||||
ATOMIC_STORE(&ls_freeze_finished_, is_finished);
|
||||
}
|
||||
|
||||
bool ObDataCheckpoint::ls_freeze_finished()
|
||||
{
|
||||
ObSpinLockGuard guard(lock_);
|
||||
return ls_freeze_finished_;
|
||||
return ATOMIC_LOAD(&ls_freeze_finished_);
|
||||
}
|
||||
|
||||
ObTabletID ObDataCheckpoint::get_tablet_id() const
|
||||
@ -295,8 +290,7 @@ bool ObDataCheckpoint::is_flushing() const
|
||||
|
||||
bool ObDataCheckpoint::is_empty()
|
||||
{
|
||||
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
|
||||
ObSpinLockGuard guard(lock_);
|
||||
RLOCK(LS_FROZEN | NEW_CREATE | ACTIVE | PREPARE);
|
||||
|
||||
return new_create_list_.is_empty() &&
|
||||
active_list_.is_empty() &&
|
||||
@ -335,7 +329,7 @@ void ObDataCheckpoint::road_to_flush(SCN rec_scn)
|
||||
int64_t last_time = common::ObTimeUtility::fast_current_time();
|
||||
|
||||
// new_create_list -> ls_frozen_list
|
||||
pop_range_to_ls_frozen_(new_create_list_.get_header(), new_create_list_);
|
||||
pop_new_create_to_ls_frozen_();
|
||||
last_time = common::ObTimeUtility::fast_current_time();
|
||||
STORAGE_LOG(INFO, "[Freezer] new_create_list to ls_frozen_list success",
|
||||
K(ls_->get_ls_id()));
|
||||
@ -346,10 +340,10 @@ void ObDataCheckpoint::road_to_flush(SCN rec_scn)
|
||||
// active_list -> ls_frozen_list
|
||||
ObFreezeCheckpoint *last = nullptr;
|
||||
{
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] road_to_flush");
|
||||
RLOCK(ACTIVE);
|
||||
last = active_list_.get_first_greater(rec_scn);
|
||||
}
|
||||
pop_range_to_ls_frozen_(last, active_list_);
|
||||
pop_active_list_to_ls_frozen_(last);
|
||||
last_time = common::ObTimeUtility::fast_current_time();
|
||||
STORAGE_LOG(INFO, "[Freezer] active_list to ls_frozen_list success",
|
||||
K(ls_->get_ls_id()));
|
||||
@ -360,16 +354,33 @@ void ObDataCheckpoint::road_to_flush(SCN rec_scn)
|
||||
set_ls_freeze_finished_(true);
|
||||
}
|
||||
|
||||
void ObDataCheckpoint::pop_range_to_ls_frozen_(ObFreezeCheckpoint *last, ObCheckpointDList &list)
|
||||
void ObDataCheckpoint::pop_new_create_to_ls_frozen_()
|
||||
{
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] pop_range_to_ls_frozen");
|
||||
ObFreezeCheckpoint *cur = list.get_header()->get_next();
|
||||
WLOCK(LS_FROZEN | NEW_CREATE);
|
||||
ObFreezeCheckpoint *cur = new_create_list_.get_header()->get_next();
|
||||
ObFreezeCheckpoint *next = nullptr;
|
||||
while (cur != new_create_list_.get_header()) {
|
||||
int ret = OB_SUCCESS;
|
||||
next = cur->get_next();
|
||||
{
|
||||
if (OB_FAIL(transfer_(cur, new_create_list_, ls_frozen_list_, LS_FROZEN))) {
|
||||
STORAGE_LOG(ERROR, "Transfer To Ls_Frozen Failed", K(ret));
|
||||
}
|
||||
}
|
||||
cur = next;
|
||||
}
|
||||
}
|
||||
|
||||
void ObDataCheckpoint::pop_active_list_to_ls_frozen_(ObFreezeCheckpoint *last)
|
||||
{
|
||||
WLOCK(LS_FROZEN | ACTIVE);
|
||||
ObFreezeCheckpoint *cur = active_list_.get_header()->get_next();
|
||||
ObFreezeCheckpoint *next = nullptr;
|
||||
while (cur != last) {
|
||||
int ret = OB_SUCCESS;
|
||||
next = cur->get_next();
|
||||
{
|
||||
if (OB_FAIL(transfer_(cur, list, ls_frozen_list_, LS_FROZEN))) {
|
||||
if (OB_FAIL(transfer_(cur, active_list_, ls_frozen_list_, LS_FROZEN))) {
|
||||
STORAGE_LOG(ERROR, "Transfer To Ls_Frozen Failed", K(ret));
|
||||
}
|
||||
}
|
||||
@ -395,23 +406,22 @@ void ObDataCheckpoint::ls_frozen_to_active_(int64_t &last_time)
|
||||
STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta));
|
||||
} else {
|
||||
// traversal list once
|
||||
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
|
||||
WLOCK(LS_FROZEN);
|
||||
ObCheckpointIterator iterator;
|
||||
ls_frozen_list_.get_iterator(iterator);
|
||||
while (iterator.has_next()) {
|
||||
int ret = OB_SUCCESS;
|
||||
auto ob_freeze_checkpoint = iterator.get_next();
|
||||
if (ob_freeze_checkpoint->is_active_checkpoint()) {
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] ls_frozen_to_new_created_");
|
||||
// avoid new active ob_freeze_checkpoint block minor merge
|
||||
// push back to new_create_list and wait next freeze
|
||||
if(OB_FAIL(transfer_from_ls_frozen_to_new_created_(ob_freeze_checkpoint))) {
|
||||
if(OB_FAIL(transfer_from_ls_frozen_to_new_created_without_src_lock_(ob_freeze_checkpoint))) {
|
||||
STORAGE_LOG(WARN, "ob_freeze_checkpoint move to new_created_list failed",
|
||||
K(ret), K(*ob_freeze_checkpoint));
|
||||
}
|
||||
} else {
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] ls_frozen_to_active_");
|
||||
if (OB_FAIL(ob_freeze_checkpoint->check_can_move_to_active(true))) {
|
||||
if (ob_freeze_checkpoint->rec_scn_is_stable()
|
||||
&& OB_FAIL(transfer_from_ls_frozen_to_active_without_src_lock_(ob_freeze_checkpoint))) {
|
||||
STORAGE_LOG(WARN, "check can freeze failed", K(ret), K(*ob_freeze_checkpoint));
|
||||
}
|
||||
}
|
||||
@ -428,7 +438,7 @@ void ObDataCheckpoint::ls_frozen_to_active_(int64_t &last_time)
|
||||
ob_usleep(LOOP_TRAVERSAL_INTERVAL_US);
|
||||
if (task_reach_time_interval(3 * 1000 * 1000, last_time)) {
|
||||
STORAGE_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "cost too much time in ls_frozen_list_", K(ret), K(ls_->get_ls_id()));
|
||||
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
|
||||
RLOCK(LS_FROZEN);
|
||||
print_list_(ls_frozen_list_);
|
||||
}
|
||||
} else {
|
||||
@ -457,21 +467,20 @@ void ObDataCheckpoint::ls_frozen_to_prepare_(int64_t &last_time)
|
||||
STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta));
|
||||
} else {
|
||||
// traversal list once
|
||||
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
|
||||
WLOCK(LS_FROZEN);
|
||||
ObCheckpointIterator iterator;
|
||||
ls_frozen_list_.get_iterator(iterator);
|
||||
while (iterator.has_next()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
auto ob_freeze_checkpoint = iterator.get_next();
|
||||
if (ob_freeze_checkpoint->ready_for_flush()) {
|
||||
if (OB_FAIL(ob_freeze_checkpoint->finish_freeze())) {
|
||||
if (OB_FAIL(finish_freeze(ob_freeze_checkpoint))) {
|
||||
STORAGE_LOG(WARN, "finish freeze failed", K(ret));
|
||||
}
|
||||
} else if (ob_freeze_checkpoint->is_active_checkpoint()) {
|
||||
// avoid active ob_freeze_checkpoint block minor merge
|
||||
// push back to active_list and wait next freeze
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] transfer_from_ls_frozen_to_active_");
|
||||
if(OB_SUCCESS != (tmp_ret = (transfer_from_ls_frozen_to_active_(ob_freeze_checkpoint)))) {
|
||||
if(OB_SUCCESS != (tmp_ret = (transfer_from_ls_frozen_to_active_without_src_lock_(ob_freeze_checkpoint)))) {
|
||||
STORAGE_LOG(WARN, "active ob_freeze_checkpoint move to active_list failed",
|
||||
K(tmp_ret), K(*ob_freeze_checkpoint));
|
||||
}
|
||||
@ -489,7 +498,7 @@ void ObDataCheckpoint::ls_frozen_to_prepare_(int64_t &last_time)
|
||||
ob_usleep(LOOP_TRAVERSAL_INTERVAL_US);
|
||||
if (task_reach_time_interval(3 * 1000 * 1000, last_time)) {
|
||||
STORAGE_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "cost too much time in ls_frozen_list_", K(ls_->get_ls_id()));
|
||||
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
|
||||
RLOCK(LS_FROZEN);
|
||||
print_list_(ls_frozen_list_);
|
||||
}
|
||||
} else {
|
||||
@ -506,7 +515,7 @@ void ObDataCheckpoint::ls_frozen_to_prepare_(int64_t &last_time)
|
||||
int ObDataCheckpoint::check_can_move_to_active_in_newcreate()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] check_can_move_to_active_in_newcreate");
|
||||
WLOCK(NEW_CREATE);
|
||||
if (!ls_freeze_finished_) {
|
||||
STORAGE_LOG(INFO, "skip check_can_move when ls freeze");
|
||||
} else {
|
||||
@ -514,7 +523,8 @@ int ObDataCheckpoint::check_can_move_to_active_in_newcreate()
|
||||
new_create_list_.get_iterator(iterator);
|
||||
while (iterator.has_next()) {
|
||||
auto ob_freeze_checkpoint = iterator.get_next();
|
||||
if (OB_FAIL(ob_freeze_checkpoint->check_can_move_to_active())) {
|
||||
if (ob_freeze_checkpoint->rec_scn_is_stable()
|
||||
&& OB_FAIL(transfer_from_new_create_to_active_without_src_lock_(ob_freeze_checkpoint))) {
|
||||
STORAGE_LOG(WARN, "check can freeze failed", K(ret));
|
||||
break;
|
||||
}
|
||||
@ -526,7 +536,7 @@ int ObDataCheckpoint::check_can_move_to_active_in_newcreate()
|
||||
|
||||
int ObDataCheckpoint::add_to_new_create(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] add_to_new_create");
|
||||
WLOCK(NEW_CREATE);
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(insert_(ob_freeze_checkpoint, new_create_list_, false))) {
|
||||
@ -564,30 +574,12 @@ int ObDataCheckpoint::decide_freeze_clock_(ObFreezeCheckpoint *ob_freeze_checkpo
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::unlink_from_prepare(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] unlink_from_prepare");
|
||||
int ret = OB_SUCCESS;
|
||||
// for the node not in prepare_list
|
||||
// return OB_SUCCESS
|
||||
// the node will remove from list in the reset()
|
||||
if (ob_freeze_checkpoint->location_ == PREPARE) {
|
||||
if (OB_FAIL(unlink_(ob_freeze_checkpoint, prepare_list_))) {
|
||||
STORAGE_LOG(ERROR, "Unlink From Prepare Failed");
|
||||
} else {
|
||||
ob_freeze_checkpoint->location_ = OUT;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::get_freezecheckpoint_info(
|
||||
ObIArray<checkpoint::ObFreezeCheckpointVTInfo> &freeze_checkpoint_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
freeze_checkpoint_array.reset();
|
||||
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] get_freezecheckpoint_info");
|
||||
RLOCK(LS_FROZEN | NEW_CREATE | ACTIVE | PREPARE);
|
||||
if (OB_FAIL(new_create_list_.get_freezecheckpoint_info(
|
||||
freeze_checkpoint_array))) {
|
||||
STORAGE_LOG(ERROR, "iterator new_create_list fail",
|
||||
@ -620,7 +612,7 @@ int ObDataCheckpoint::traversal_flush_()
|
||||
ObSEArray<ObTableHandleV2, 16> flush_tasks;
|
||||
|
||||
{
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] traversal_flush_");
|
||||
RLOCK(PREPARE);
|
||||
if (prepare_list_.is_empty()) {
|
||||
STORAGE_LOG(TRACE, "skip traversal_flush", K(ls_freeze_finished_),
|
||||
K(prepare_list_.is_empty()), K(ls_->get_ls_id()));
|
||||
@ -663,9 +655,80 @@ int ObDataCheckpoint::traversal_flush_()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::unlink_(ObFreezeCheckpoint *ob_freeze_checkpoint, ObCheckpointDList &src)
|
||||
int ObDataCheckpoint::unlink_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
return src.unlink(ob_freeze_checkpoint);
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(ob_freeze_checkpoint)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "ob_freeze_checkpoint is NULL");
|
||||
} else {
|
||||
do {
|
||||
ObCheckpointDList *list = NULL;
|
||||
const ObFreezeCheckpointLocation location = ob_freeze_checkpoint->location_;
|
||||
if (OUT == location) {
|
||||
// has unlinked
|
||||
} else {
|
||||
WLOCK(location);
|
||||
// double check
|
||||
if (ob_freeze_checkpoint->location_ == location) {
|
||||
if (FALSE_IT(list = get_checkpoint_list(location))) {
|
||||
} else if (OB_ISNULL(list)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "list is NULL", K(ret), KPC(ob_freeze_checkpoint));
|
||||
} else if (OB_FAIL(list->unlink(ob_freeze_checkpoint))) {
|
||||
STORAGE_LOG(ERROR, "failed to unlink from list", K(ret), KPC(ob_freeze_checkpoint));
|
||||
}
|
||||
} else {
|
||||
ret = OB_EAGAIN;
|
||||
STORAGE_LOG(WARN, "ob_freeze_checkpoint is not in list", KR(ret), K(location), KPC(ob_freeze_checkpoint));
|
||||
}
|
||||
|
||||
}
|
||||
} while (OB_EAGAIN == ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::finish_freeze(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(ob_freeze_checkpoint)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "ob_freeze_checkpoint is NULL");
|
||||
} else {
|
||||
do {
|
||||
switch (ob_freeze_checkpoint->location_) {
|
||||
case NEW_CREATE:
|
||||
if (OB_FAIL(transfer_from_new_create_to_prepare_(ob_freeze_checkpoint))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
STORAGE_LOG(ERROR, "failed to transfer_from_new_create_to_prepare", K(ret), KPC(ob_freeze_checkpoint));
|
||||
}
|
||||
}
|
||||
break;
|
||||
case ACTIVE:
|
||||
if (OB_FAIL(transfer_from_active_to_prepare_(ob_freeze_checkpoint))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
STORAGE_LOG(ERROR, "failed to transfer_from_active_to_prepare", K(ret), KPC(ob_freeze_checkpoint));
|
||||
}
|
||||
}
|
||||
break;
|
||||
case PREPARE:
|
||||
break;
|
||||
case LS_FROZEN:
|
||||
if (OB_FAIL(transfer_from_ls_frozen_to_prepare_without_src_lock_(ob_freeze_checkpoint))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
STORAGE_LOG(ERROR, "failed to transfer_from_ls_frozen_to_prepare", K(ret));
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unknown ObFreezeCheckpoint State", KPC(ob_freeze_checkpoint));
|
||||
break;
|
||||
}
|
||||
} while (OB_EAGAIN == ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::insert_(ObFreezeCheckpoint *ob_freeze_checkpoint,
|
||||
@ -681,9 +744,9 @@ int ObDataCheckpoint::transfer_(ObFreezeCheckpoint *ob_freeze_checkpoint,
|
||||
ObFreezeCheckpointLocation location)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(unlink_(ob_freeze_checkpoint, src))) {
|
||||
if (OB_FAIL(src.unlink(ob_freeze_checkpoint))) {
|
||||
STORAGE_LOG(ERROR, "Unlink From Dlist Failed");
|
||||
} else if (OB_FAIL(insert_(ob_freeze_checkpoint, dst))) {
|
||||
} else if (OB_FAIL(dst.insert(ob_freeze_checkpoint))) {
|
||||
STORAGE_LOG(ERROR, "Insert Into Dlist Failed");
|
||||
} else {
|
||||
ob_freeze_checkpoint->location_ = location;
|
||||
@ -691,47 +754,39 @@ int ObDataCheckpoint::transfer_(ObFreezeCheckpoint *ob_freeze_checkpoint,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::transfer_from_new_create_to_active_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
int ObDataCheckpoint::transfer_from_new_create_to_active_without_src_lock_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(ob_freeze_checkpoint)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "ob_freeze_checkpoint is NULL");
|
||||
} else if (NEW_CREATE != ob_freeze_checkpoint->location_) {
|
||||
ret = OB_EAGAIN;
|
||||
STORAGE_LOG(WARN, "ob_freeze_checkpoint is not in new_create", KR(ret), KPC(ob_freeze_checkpoint));
|
||||
} else {
|
||||
WLOCK(ACTIVE);
|
||||
if (OB_FAIL(transfer_(ob_freeze_checkpoint, new_create_list_, active_list_, ACTIVE))) {
|
||||
STORAGE_LOG(ERROR, "Transfer From NewCreate To Active Failed");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::transfer_from_new_create_to_prepare_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLOCK(NEW_CREATE);
|
||||
if (OB_ISNULL(ob_freeze_checkpoint)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "ob_freeze_checkpoint is NULL");
|
||||
} else if (NEW_CREATE != ob_freeze_checkpoint->location_) {
|
||||
ret = OB_EAGAIN;
|
||||
STORAGE_LOG(WARN, "ob_freeze_checkpoint is not in new_create", KR(ret), KPC(ob_freeze_checkpoint));
|
||||
} else {
|
||||
WLOCK(PREPARE);
|
||||
if (OB_FAIL(transfer_(ob_freeze_checkpoint, new_create_list_, prepare_list_, PREPARE))) {
|
||||
STORAGE_LOG(ERROR, "Transfer From NewCreate To Prepare Failed");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::transfer_from_ls_frozen_to_active_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(transfer_(ob_freeze_checkpoint, ls_frozen_list_, active_list_, ACTIVE))) {
|
||||
STORAGE_LOG(ERROR, "Transfer From Active To Frozen Failed");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::transfer_from_ls_frozen_to_new_created_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(transfer_(ob_freeze_checkpoint, ls_frozen_list_, new_create_list_, NEW_CREATE))) {
|
||||
STORAGE_LOG(ERROR, "Transfer From LS Frozen To New_Created Failed");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::transfer_from_ls_frozen_to_prepare_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(transfer_(ob_freeze_checkpoint, ls_frozen_list_, prepare_list_, PREPARE))) {
|
||||
STORAGE_LOG(ERROR, "Transfer From LS Frozen To Prepare Failed");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -739,9 +794,49 @@ int ObDataCheckpoint::transfer_from_ls_frozen_to_prepare_(ObFreezeCheckpoint *ob
|
||||
int ObDataCheckpoint::transfer_from_active_to_prepare_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLOCK(ACTIVE);
|
||||
if (OB_ISNULL(ob_freeze_checkpoint)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "ob_freeze_checkpoint is NULL");
|
||||
} else if (ACTIVE != ob_freeze_checkpoint->location_) {
|
||||
ret = OB_EAGAIN;
|
||||
STORAGE_LOG(WARN, "ob_freeze_checkpoint is not in active", KR(ret), KPC(ob_freeze_checkpoint));
|
||||
} else {
|
||||
WLOCK(PREPARE);
|
||||
if (OB_FAIL(transfer_(ob_freeze_checkpoint, active_list_, prepare_list_, PREPARE))) {
|
||||
STORAGE_LOG(ERROR, "Transfer From Active To Frozen Failed");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::transfer_from_ls_frozen_to_active_without_src_lock_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLOCK(ACTIVE);
|
||||
if (OB_FAIL(transfer_(ob_freeze_checkpoint, ls_frozen_list_, active_list_, ACTIVE))) {
|
||||
STORAGE_LOG(ERROR, "Transfer From Active To Frozen Failed");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::transfer_from_ls_frozen_to_new_created_without_src_lock_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLOCK(NEW_CREATE);
|
||||
if (OB_FAIL(transfer_(ob_freeze_checkpoint, ls_frozen_list_, new_create_list_, NEW_CREATE))) {
|
||||
STORAGE_LOG(ERROR, "Transfer From LS Frozen To New_Created Failed");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDataCheckpoint::transfer_from_ls_frozen_to_prepare_without_src_lock_(ObFreezeCheckpoint *ob_freeze_checkpoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLOCK(PREPARE);
|
||||
if (OB_FAIL(transfer_(ob_freeze_checkpoint, ls_frozen_list_, prepare_list_, PREPARE))) {
|
||||
STORAGE_LOG(ERROR, "Transfer From LS Frozen To Prepare Failed");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -749,7 +844,7 @@ int ObDataCheckpoint::get_need_flush_tablets_(const share::SCN recycle_scn,
|
||||
ObIArray<ObTabletID> &flush_tablets)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockTimeGuard guard(lock_, "[data_checkpoint] get_need_flush_tablets_");
|
||||
RLOCK(NEW_CREATE | ACTIVE | PREPARE);
|
||||
ObSArray<ObFreezeCheckpoint*> need_freeze_checkpoints;
|
||||
if (OB_FAIL(new_create_list_.get_need_freeze_checkpoints(
|
||||
recycle_scn, need_freeze_checkpoints))) {
|
||||
|
@ -67,37 +67,20 @@ private:
|
||||
ObCheckpointDList *dlist_;
|
||||
};
|
||||
|
||||
class ObSpinLockTimeGuard
|
||||
{
|
||||
public:
|
||||
ObSpinLockTimeGuard(common::ObSpinLock &lock,
|
||||
const char *owner = "unknown",
|
||||
const int64_t warn_threshold = 100 * 1000 /* 100 ms */)
|
||||
: time_guard_(owner, warn_threshold),
|
||||
lock_guard_(lock){}
|
||||
|
||||
~ObSpinLockTimeGuard() {}
|
||||
void click(const char *mod = NULL) { time_guard_.click(mod); }
|
||||
private:
|
||||
ObTimeGuard time_guard_;
|
||||
ObSpinLockGuard lock_guard_;
|
||||
};
|
||||
|
||||
// responsible for maintenance transaction checkpoint unit
|
||||
class ObDataCheckpoint : public ObCommonCheckpoint
|
||||
{
|
||||
friend class ObFreezeCheckpoint;
|
||||
friend class ObCheckpointLockGuard;
|
||||
|
||||
public:
|
||||
ObDataCheckpoint()
|
||||
: is_inited_(false),
|
||||
lock_(common::ObLatchIds::CLOG_CKPT_LOCK),
|
||||
ls_(nullptr),
|
||||
new_create_list_(),
|
||||
active_list_(),
|
||||
prepare_list_(),
|
||||
ls_frozen_list_(),
|
||||
ls_frozen_list_lock_(common::ObLatchIds::CLOG_CKPT_LOCK),
|
||||
ls_freeze_finished_(true)
|
||||
{}
|
||||
~ObDataCheckpoint() { ls_ = nullptr; }
|
||||
@ -106,6 +89,28 @@ public:
|
||||
static const uint64_t LS_DATA_CHECKPOINT_TABLET_ID = 40000;
|
||||
int init(ObLS *ls);
|
||||
int safe_to_destroy(bool &is_safe_destroy);
|
||||
ObCheckpointDList* get_checkpoint_list(const ObFreezeCheckpointLocation &location)
|
||||
{
|
||||
ObCheckpointDList *ret = NULL;
|
||||
switch (location) {
|
||||
case LS_FROZEN:
|
||||
ret = &ls_frozen_list_;
|
||||
break;
|
||||
case NEW_CREATE:
|
||||
ret = &new_create_list_;
|
||||
break;
|
||||
case ACTIVE:
|
||||
ret = &active_list_;
|
||||
break;
|
||||
case PREPARE:
|
||||
ret = &prepare_list_;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
share::SCN get_rec_scn();
|
||||
// if min_rec_scn <= the input rec_scn
|
||||
// logstream freeze
|
||||
@ -118,8 +123,6 @@ public:
|
||||
void road_to_flush(share::SCN rec_scn);
|
||||
// ObFreezeCheckpoint register into ObDataCheckpoint
|
||||
int add_to_new_create(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
// remove from prepare_list when finish minor_merge
|
||||
int unlink_from_prepare(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
// timer to tranfer freeze_checkpoint that rec_scn is stable from new_create_list to
|
||||
// active_list
|
||||
int check_can_move_to_active_in_newcreate();
|
||||
@ -145,7 +148,8 @@ private:
|
||||
// case1: some memtable flush failed when ls freeze
|
||||
// case2: the memtable that tablet freeze
|
||||
int traversal_flush_();
|
||||
int unlink_(ObFreezeCheckpoint *ob_freeze_checkpoint, ObCheckpointDList &src);
|
||||
int unlink_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int finish_freeze(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int insert_(ObFreezeCheckpoint *ob_freeze_checkpoint,
|
||||
ObCheckpointDList &dst,
|
||||
bool ordered = true);
|
||||
@ -154,14 +158,15 @@ private:
|
||||
ObCheckpointDList &dst,
|
||||
ObFreezeCheckpointLocation location);
|
||||
|
||||
int transfer_from_new_create_to_active_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int transfer_from_ls_frozen_to_active_without_src_lock_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int transfer_from_ls_frozen_to_new_created_without_src_lock_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int transfer_from_ls_frozen_to_prepare_without_src_lock_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int transfer_from_new_create_to_active_without_src_lock_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int transfer_from_new_create_to_prepare_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int transfer_from_ls_frozen_to_active_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int transfer_from_ls_frozen_to_prepare_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int transfer_from_ls_frozen_to_new_created_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
int transfer_from_active_to_prepare_(ObFreezeCheckpoint *ob_freeze_checkpoint);
|
||||
|
||||
void pop_range_to_ls_frozen_(ObFreezeCheckpoint *last, ObCheckpointDList &list);
|
||||
void pop_active_list_to_ls_frozen_(ObFreezeCheckpoint *last);
|
||||
void pop_new_create_to_ls_frozen_();
|
||||
void ls_frozen_to_active_(int64_t &last_time);
|
||||
void ls_frozen_to_prepare_(int64_t &last_time);
|
||||
void print_list_(ObCheckpointDList &list);
|
||||
@ -180,8 +185,6 @@ private:
|
||||
// logstream_freeze without get_need_flush_tablets
|
||||
static const int64_t MAX_FREEZE_CHECKPOINT_NUM = 50;
|
||||
bool is_inited_;
|
||||
// avoid leaving out ObFreezeCheckpoint that unlinking and not in any list
|
||||
common::ObSpinLock lock_;
|
||||
ObLS *ls_;
|
||||
// new_create_list is unordered_list
|
||||
// active_list and prepare_list is ordered_list and order by rec_log_ts
|
||||
@ -196,13 +199,97 @@ private:
|
||||
// tmp_list for ls_freeze to improve performance
|
||||
// used when new_create_list_ -> active_list and active_list -> frozen_list
|
||||
ObCheckpointDList ls_frozen_list_;
|
||||
// avoid blocking other list due to traversal ls_frozen_list
|
||||
common::ObSpinLock ls_frozen_list_lock_;
|
||||
|
||||
obsys::ObRWLock ls_frozen_list_lock_;
|
||||
obsys::ObRWLock new_create_list_lock_;
|
||||
obsys::ObRWLock active_list_lock_;
|
||||
obsys::ObRWLock prepare_list_lock_;
|
||||
|
||||
struct ObCheckpointLock
|
||||
{
|
||||
obsys::ObRWLock ls_frozen_list_lock_;
|
||||
obsys::ObRWLock new_create_list_lock_;
|
||||
obsys::ObRWLock active_list_lock_;
|
||||
obsys::ObRWLock prepare_list_lock_;
|
||||
} lock_;
|
||||
|
||||
bool ls_freeze_finished_;
|
||||
|
||||
static __thread bool is_tenant_freeze_for_flush_;
|
||||
};
|
||||
|
||||
// list lock for DataChcekpoint
|
||||
// flag:1~4: ObFreezeCheckpointLocation list lock
|
||||
// flag:8: read/write lock
|
||||
#define RLOCK(flag) ObCheckpointLockGuard lock_guard(*this, ~0x80 & flag, "[data_checkpoint]"); lock_guard.click(__FUNCTION__)
|
||||
#define WLOCK(flag) ObCheckpointLockGuard lock_guard(*this, 0x80 | flag, "[data_checkpoint]"); lock_guard.click(__FUNCTION__)
|
||||
|
||||
class ObCheckpointLockGuard
|
||||
{
|
||||
public:
|
||||
ObCheckpointLockGuard(ObDataCheckpoint &data_checkpoint,
|
||||
uint8_t flag,
|
||||
const char *owner = "unknown",
|
||||
const int64_t warn_threshold = 50 * 1000 /* 50 ms */)
|
||||
: time_guard_(owner, warn_threshold),
|
||||
lock_(data_checkpoint.lock_),
|
||||
flag_(flag)
|
||||
{
|
||||
if (0 != (flag & LS_FROZEN)) {
|
||||
lock(flag, lock_.ls_frozen_list_lock_);
|
||||
}
|
||||
if (0 != (flag & NEW_CREATE)) {
|
||||
lock(flag, lock_.new_create_list_lock_);
|
||||
}
|
||||
if (0 != (flag & ACTIVE)) {
|
||||
lock(flag, lock_.active_list_lock_);
|
||||
}
|
||||
if (0 != (flag & PREPARE)) {
|
||||
lock(flag, lock_.prepare_list_lock_);
|
||||
}
|
||||
}
|
||||
|
||||
~ObCheckpointLockGuard()
|
||||
{
|
||||
if (0 != (flag_ & PREPARE)) {
|
||||
unlock(flag_, lock_.prepare_list_lock_);
|
||||
}
|
||||
if (0 != (flag_ & ACTIVE)) {
|
||||
unlock(flag_, lock_.active_list_lock_);
|
||||
}
|
||||
if (0 != (flag_ & NEW_CREATE)) {
|
||||
unlock(flag_, lock_.new_create_list_lock_);
|
||||
}
|
||||
if (0 != (flag_ & LS_FROZEN)) {
|
||||
unlock(flag_, lock_.ls_frozen_list_lock_);
|
||||
}
|
||||
}
|
||||
|
||||
static void lock(uint8_t flag, obsys::ObRWLock &lock)
|
||||
{
|
||||
if (0 != (flag & 0x80)) {
|
||||
lock.wlock()->lock();
|
||||
} else {
|
||||
lock.rlock()->lock();
|
||||
}
|
||||
}
|
||||
|
||||
static void unlock(uint8_t flag, obsys::ObRWLock &lock)
|
||||
{
|
||||
if (0 != (flag & 0x80)) {
|
||||
lock.wlock()->unlock();
|
||||
} else {
|
||||
lock.rlock()->unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void click(const char *mod = NULL) { time_guard_.click(mod); }
|
||||
private:
|
||||
ObTimeGuard time_guard_;
|
||||
ObDataCheckpoint::ObCheckpointLock &lock_;
|
||||
uint8_t flag_;
|
||||
};
|
||||
|
||||
static const ObTabletID LS_DATA_CHECKPOINT_TABLET(ObDataCheckpoint::LS_DATA_CHECKPOINT_TABLET_ID);
|
||||
|
||||
} // namespace checkpoint
|
||||
|
@ -25,8 +25,6 @@ namespace checkpoint
|
||||
void ObFreezeCheckpoint::remove_from_data_checkpoint()
|
||||
{
|
||||
if (OUT != location_) {
|
||||
ObSpinLockGuard ls_frozen_list_guard(data_checkpoint_->ls_frozen_list_lock_);
|
||||
ObSpinLockGuard guard(data_checkpoint_->lock_);
|
||||
int ret = OB_SUCCESS;
|
||||
if(OB_FAIL(unlink_())) {
|
||||
STORAGE_LOG(WARN, "ObFreezeCheckpoint Unlink From DataCheckpoint Failed", K(ret));
|
||||
@ -42,37 +40,13 @@ void ObFreezeCheckpoint::reset()
|
||||
int ObFreezeCheckpoint::unlink_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (location_ != OUT) {
|
||||
switch (location_) {
|
||||
case NEW_CREATE:
|
||||
if (OB_FAIL(data_checkpoint_->unlink_(this, data_checkpoint_->new_create_list_))) {
|
||||
STORAGE_LOG(ERROR, "ObFreezeCheckpoint Unlink From New_Create_List Failed", K(ret));
|
||||
}
|
||||
break;
|
||||
case ACTIVE:
|
||||
if (OB_FAIL(data_checkpoint_->unlink_(this, data_checkpoint_->active_list_))) {
|
||||
STORAGE_LOG(ERROR, "ObFreezeCheckpoint Unlink From Active_List Failed", K(ret));
|
||||
}
|
||||
break;
|
||||
case LS_FROZEN:
|
||||
if (OB_FAIL(data_checkpoint_->unlink_(this, data_checkpoint_->ls_frozen_list_))) {
|
||||
STORAGE_LOG(ERROR, "ObFreezeCheckpoint Unlink From Ls_Frozen_List Failed", K(ret));
|
||||
}
|
||||
break;
|
||||
case PREPARE:
|
||||
if (OB_FAIL(data_checkpoint_->unlink_(this, data_checkpoint_->prepare_list_))) {
|
||||
STORAGE_LOG(ERROR, "ObFreezeCheckpoint Unlink From Prepare_List Failed", K(ret));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unknown ObFreezeCheckpoint State", K(location_));
|
||||
break;
|
||||
}
|
||||
if (OB_FAIL(data_checkpoint_->unlink_(this))) {
|
||||
STORAGE_LOG(ERROR, "failed to unlink", K(ret), KPC(this));
|
||||
} else {
|
||||
location_ = OUT;
|
||||
}
|
||||
prev_ = NULL;
|
||||
next_ = NULL;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -93,50 +67,9 @@ int ObFreezeCheckpoint::add_to_data_checkpoint(ObDataCheckpoint *data_checkpoint
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObFreezeCheckpoint::check_can_move_to_active(bool is_ls_freeze)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (location_ != ACTIVE) {
|
||||
// only when the unit rec_scn is stable that can be moved to ordered_active_list
|
||||
if (rec_scn_is_stable()) {
|
||||
if (OB_FAIL(move_to_active_(is_ls_freeze))) {
|
||||
STORAGE_LOG(ERROR, "transfer to active failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObFreezeCheckpoint::move_to_active_(bool is_ls_freeze)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_ls_freeze) {
|
||||
if (OB_FAIL(data_checkpoint_->transfer_from_ls_frozen_to_active_(this))) {
|
||||
STORAGE_LOG(ERROR, "can_freeze Failed", K(is_ls_freeze));
|
||||
}
|
||||
} else if (OB_FAIL(data_checkpoint_->transfer_from_new_create_to_active_(this))) {
|
||||
STORAGE_LOG(ERROR, "can_freeze Failed", K(is_ls_freeze));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObFreezeCheckpoint::finish_freeze()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(data_checkpoint_->lock_);
|
||||
if (PREPARE != location_) {
|
||||
if (NEW_CREATE == location_ &&
|
||||
OB_FAIL(data_checkpoint_->transfer_from_new_create_to_prepare_(this))) {
|
||||
STORAGE_LOG(ERROR, "transfer_from_new_create_to_prepare failed", K(ret), K(*this));
|
||||
} else if (ACTIVE == location_ &&
|
||||
OB_FAIL(data_checkpoint_->transfer_from_active_to_prepare_(this))) {
|
||||
STORAGE_LOG(ERROR, "finish_freeze transfer_from_active_to_prepare_ failed", K(ret));
|
||||
} else if (LS_FROZEN == location_ &&
|
||||
OB_FAIL(data_checkpoint_->transfer_from_ls_frozen_to_prepare_(this))) {
|
||||
STORAGE_LOG(ERROR, "finish_freeze transfer_from_ls_frozen_to_prepare_ failed", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return data_checkpoint_->finish_freeze(this);
|
||||
}
|
||||
|
||||
} // namespace checkpoint
|
||||
|
@ -30,11 +30,11 @@ class ObDataCheckpoint;
|
||||
|
||||
enum ObFreezeCheckpointLocation
|
||||
{
|
||||
OUT = 0,
|
||||
NEW_CREATE,
|
||||
ACTIVE,
|
||||
PREPARE,
|
||||
LS_FROZEN
|
||||
LS_FROZEN = 1,
|
||||
NEW_CREATE = 2,
|
||||
ACTIVE = 4,
|
||||
PREPARE = 8,
|
||||
OUT = 16,
|
||||
};
|
||||
|
||||
static inline
|
||||
@ -105,10 +105,6 @@ public:
|
||||
// register into ObDataCheckpoint
|
||||
int add_to_data_checkpoint(ObDataCheckpoint *data_checkpoint);
|
||||
bool is_in_prepare_list_of_data_checkpoint();
|
||||
// transfer to active_list in ObDataCheckpoint
|
||||
// when the checkpoint unit rec_scn_is_stable
|
||||
// @param[in] is_ls_freeze, whether the process is triggered by logstream_freeze
|
||||
int check_can_move_to_active(bool is_ls_freeze = false);
|
||||
// after checkpoint ready_for_flush
|
||||
// move to prepare_list in data_checkpoint
|
||||
int finish_freeze();
|
||||
@ -118,7 +114,6 @@ public:
|
||||
VIRTUAL_TO_STRING_KV(K_(location));
|
||||
|
||||
private:
|
||||
int move_to_active_(bool is_ls_freeze);
|
||||
int unlink_();
|
||||
// ensure safe by lock of data_checkpoint
|
||||
ObFreezeCheckpointLocation location_;
|
||||
|
Reference in New Issue
Block a user