fix lock_memtable get_rec_log_ts() to avoid back
This commit is contained in:
@ -418,6 +418,7 @@ int64_t ObLSTxService::get_rec_log_ts()
|
||||
int64_t min_rec_log_ts = INT64_MAX;
|
||||
int min_rec_log_ts_common_checkpoint_type_index = 0;
|
||||
char common_checkpoint_type[common::MAX_CHECKPOINT_TYPE_BUF_LENGTH];
|
||||
ObSpinLockGuard guard(lock_);
|
||||
for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) {
|
||||
if (OB_NOT_NULL(common_checkpoints_[i])) {
|
||||
int64_t rec_log_ts = common_checkpoints_[i]->get_rec_log_ts();
|
||||
@ -442,6 +443,7 @@ int ObLSTxService::flush(int64_t rec_log_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(lock_);
|
||||
for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) {
|
||||
// only flush the common_checkpoint that whose clog need recycle
|
||||
if (OB_NOT_NULL(common_checkpoints_[i]) && rec_log_ts >= common_checkpoints_[i]->get_rec_log_ts()) {
|
||||
@ -475,6 +477,7 @@ int ObLSTxService::get_common_checkpoint_info(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common_checkpoint_array.reset();
|
||||
ObSpinLockGuard guard(lock_);
|
||||
for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) {
|
||||
ObCommonCheckpoint *common_checkpoint = common_checkpoints_[i];
|
||||
if (OB_ISNULL(common_checkpoint)) {
|
||||
@ -553,6 +556,7 @@ int ObLSTxService::traversal_flush()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(lock_);
|
||||
for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) {
|
||||
if (OB_NOT_NULL(common_checkpoints_[i]) &&
|
||||
OB_SUCCESS != (tmp_ret = common_checkpoints_[i]->flush(INT64_MAX, false))) {
|
||||
@ -563,6 +567,7 @@ int ObLSTxService::traversal_flush()
|
||||
}
|
||||
|
||||
void ObLSTxService::reset_() {
|
||||
ObSpinLockGuard guard(lock_);
|
||||
for (int i = 0; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) {
|
||||
common_checkpoints_[i] = NULL;
|
||||
}
|
||||
|
@ -49,6 +49,7 @@ ObLockMemtable::ObLockMemtable()
|
||||
freeze_log_ts_(0),
|
||||
flushed_log_ts_(0),
|
||||
rec_log_ts_(INT64_MAX),
|
||||
pre_rec_log_ts_(OB_INVALID_TIMESTAMP),
|
||||
max_committed_log_ts_(OB_INVALID_TIMESTAMP),
|
||||
is_frozen_(false),
|
||||
freezer_(nullptr)
|
||||
@ -522,6 +523,7 @@ int ObLockMemtable::update_lock_status(
|
||||
RLockGuard guard(flush_lock_);
|
||||
dec_update(&rec_log_ts_, commit_log_ts);
|
||||
inc_update(&max_committed_log_ts_, commit_log_ts);
|
||||
LOG_INFO("out_trans update_lock_status", K(ret), K(op_info), K(commit_log_ts), K(status), K(rec_log_ts_), K(ls_id_));
|
||||
}
|
||||
LOG_DEBUG("ObLockMemtable::update_lock_status", K(ret), K(op_info), K(commit_log_ts), K(status));
|
||||
return ret;
|
||||
@ -735,9 +737,14 @@ int ObLockMemtable::get_frozen_schema_version(int64_t &schema_version) const
|
||||
int64_t ObLockMemtable::get_rec_log_ts()
|
||||
{
|
||||
// no need lock because rec_log_ts_ aesc except INT64_MAX
|
||||
LOG_INFO("rec_log_ts of ObLockMemtable is ", K(rec_log_ts_), K(flushed_log_ts_),
|
||||
LOG_INFO("rec_log_ts of ObLockMemtable is ",
|
||||
K(rec_log_ts_), K(flushed_log_ts_), K(pre_rec_log_ts_),
|
||||
K(freeze_log_ts_), K(max_committed_log_ts_), K(is_frozen_), K(ls_id_));
|
||||
return rec_log_ts_;
|
||||
if (pre_rec_log_ts_ == OB_INVALID_TIMESTAMP) {
|
||||
return rec_log_ts_;
|
||||
} else {
|
||||
return pre_rec_log_ts_;
|
||||
}
|
||||
}
|
||||
|
||||
ObTabletID ObLockMemtable::get_tablet_id() const
|
||||
@ -754,14 +761,7 @@ int ObLockMemtable::on_memtable_flushed()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLockGuard guard(flush_lock_);
|
||||
if (max_committed_log_ts_ > freeze_log_ts_) {
|
||||
// have no_flushed commit_log_ts
|
||||
rec_log_ts_ = freeze_log_ts_;
|
||||
} else {
|
||||
// all commit_log_ts flushed
|
||||
rec_log_ts_ = INT64_MAX;
|
||||
max_committed_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
}
|
||||
pre_rec_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
if (freeze_log_ts_ > flushed_log_ts_) {
|
||||
flushed_log_ts_ = freeze_log_ts_;
|
||||
} else {
|
||||
@ -794,19 +794,21 @@ bool ObLockMemtable::is_active_memtable() const
|
||||
int ObLockMemtable::flush(int64_t recycle_log_ts, bool need_freeze)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
UNUSED(need_freeze);
|
||||
{
|
||||
if (need_freeze) {
|
||||
WLockGuard guard(flush_lock_);
|
||||
int64_t rec_log_ts = get_rec_log_ts();
|
||||
if (rec_log_ts >= recycle_log_ts) {
|
||||
LOG_INFO("lock memtable no need to flush", K(rec_log_ts), K(recycle_log_ts),
|
||||
K(is_frozen_), K(ls_id_));
|
||||
} else if (is_active_memtable()) {
|
||||
if (OB_FAIL(freezer_->get_max_consequent_callbacked_log_ts(freeze_log_ts_))) {
|
||||
LOG_WARN("get_max_consequent_callbacked_log_ts failed", K(ret), K(ls_id_));
|
||||
} else if (flushed_log_ts_ >= freeze_log_ts_) {
|
||||
freeze_log_ts_ = max_committed_log_ts_;
|
||||
if (flushed_log_ts_ >= freeze_log_ts_) {
|
||||
LOG_INFO("skip freeze because of flushed", K_(ls_id), K_(flushed_log_ts), K_(freeze_log_ts));
|
||||
} else {
|
||||
pre_rec_log_ts_ = rec_log_ts_;
|
||||
rec_log_ts_ = INT64_MAX;
|
||||
max_committed_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
|
||||
ObLogTsRange log_ts_range;
|
||||
log_ts_range.start_log_ts_ = 1;
|
||||
log_ts_range.end_log_ts_ = freeze_log_ts_;
|
||||
@ -818,16 +820,25 @@ int ObLockMemtable::flush(int64_t recycle_log_ts, bool need_freeze)
|
||||
}
|
||||
|
||||
if (is_frozen_memtable()) {
|
||||
// dependent to judging is_active_memtable() in dag
|
||||
// otherwise maybe merge active memtable
|
||||
compaction::ObTabletMergeDagParam param;
|
||||
param.ls_id_ = ls_id_;
|
||||
param.tablet_id_ = LS_LOCK_TABLET;
|
||||
param.merge_type_ = MINI_MERGE;
|
||||
param.merge_version_ = ObVersion::MIN_VERSION;
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tx_table_merge_dag(param))) {
|
||||
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
|
||||
LOG_WARN("failed to schedule lock_memtable merge dag", K(ret), K(this));
|
||||
int64_t max_consequent_callbacked_log_ts = 0;
|
||||
if (OB_FAIL(freezer_->get_max_consequent_callbacked_log_ts(max_consequent_callbacked_log_ts))) {
|
||||
LOG_WARN("get_max_consequent_callbacked_log_ts failed", K(ret), K(ls_id_));
|
||||
} else if (max_consequent_callbacked_log_ts < freeze_log_ts_) {
|
||||
LOG_INFO("lock memtable not ready for flush",
|
||||
K(max_consequent_callbacked_log_ts),
|
||||
K(freeze_log_ts_));
|
||||
} else {
|
||||
// dependent to judging is_active_memtable() in dag
|
||||
// otherwise maybe merge active memtable
|
||||
compaction::ObTabletMergeDagParam param;
|
||||
param.ls_id_ = ls_id_;
|
||||
param.tablet_id_ = LS_LOCK_TABLET;
|
||||
param.merge_type_ = MINI_MERGE;
|
||||
param.merge_version_ = ObVersion::MIN_VERSION;
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tx_table_merge_dag(param))) {
|
||||
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
|
||||
LOG_WARN("failed to schedule lock_memtable merge dag", K(ret), K(this));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -222,6 +222,7 @@ private:
|
||||
// data before the flushed_log_ts_ have been flushed
|
||||
int64_t flushed_log_ts_;
|
||||
int64_t rec_log_ts_;
|
||||
int64_t pre_rec_log_ts_;
|
||||
int64_t max_committed_log_ts_;
|
||||
bool is_frozen_;
|
||||
|
||||
|
Reference in New Issue
Block a user