fix restore hang and tenant freeze fail in data_checkpoint flushing
This commit is contained in:
parent
36608dd357
commit
ffb515a326
@ -193,7 +193,9 @@ int ObCheckpointExecutor::update_clog_checkpoint()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObCheckpointExecutor::advance_checkpoint_by_flush(SCN recycle_scn) {
|
int ObCheckpointExecutor::advance_checkpoint_by_flush(
|
||||||
|
SCN recycle_scn)
|
||||||
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
RLockGuard guard(rwlock_);
|
RLockGuard guard(rwlock_);
|
||||||
@ -278,7 +280,7 @@ int64_t ObCheckpointExecutor::get_cannot_recycle_log_size()
|
|||||||
LSN end_lsn;
|
LSN end_lsn;
|
||||||
if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) {
|
if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) {
|
||||||
STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id()));
|
STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id()));
|
||||||
} else {
|
} else if (!ls_->get_data_checkpoint()->is_flushing()) {
|
||||||
cannot_recycle_log_size =
|
cannot_recycle_log_size =
|
||||||
end_lsn.val_ - ls_->get_clog_base_lsn().val_;
|
end_lsn.val_ - ls_->get_clog_base_lsn().val_;
|
||||||
}
|
}
|
||||||
|
@ -75,7 +75,8 @@ public:
|
|||||||
|
|
||||||
// the service will flush and advance checkpoint
|
// the service will flush and advance checkpoint
|
||||||
// after flush, checkpoint_scn will be equal or greater than recycle_scn
|
// after flush, checkpoint_scn will be equal or greater than recycle_scn
|
||||||
int advance_checkpoint_by_flush(share::SCN recycle_scn = share::SCN::invalid_scn());
|
int advance_checkpoint_by_flush(
|
||||||
|
share::SCN recycle_scn = share::SCN::invalid_scn());
|
||||||
|
|
||||||
// for __all_virtual_checkpoint
|
// for __all_virtual_checkpoint
|
||||||
int get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &checkpoint_array);
|
int get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &checkpoint_array);
|
||||||
@ -87,6 +88,7 @@ public:
|
|||||||
int diagnose(CheckpointDiagnoseInfo &diagnose_info) const;
|
int diagnose(CheckpointDiagnoseInfo &diagnose_info) const;
|
||||||
|
|
||||||
int traversal_flush() const;
|
int traversal_flush() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t CLOG_GC_PERCENT = 60;
|
static const int64_t CLOG_GC_PERCENT = 60;
|
||||||
|
|
||||||
|
@ -24,6 +24,8 @@ namespace storage
|
|||||||
namespace checkpoint
|
namespace checkpoint
|
||||||
{
|
{
|
||||||
|
|
||||||
|
__thread bool ObDataCheckpoint::is_tenant_freeze_for_flush_ = false;
|
||||||
|
|
||||||
// ** ObCheckpointDList **
|
// ** ObCheckpointDList **
|
||||||
void ObCheckpointDList::reset()
|
void ObCheckpointDList::reset()
|
||||||
{
|
{
|
||||||
@ -770,7 +772,7 @@ int ObDataCheckpoint::freeze_base_on_needs_(share::SCN recycle_scn)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (get_rec_scn() <= recycle_scn) {
|
if (get_rec_scn() <= recycle_scn) {
|
||||||
if (!is_flushing() && prepare_list_.is_empty()) {
|
if (is_tenant_freeze() || (!is_flushing() && prepare_list_.is_empty())) {
|
||||||
int64_t wait_flush_num =
|
int64_t wait_flush_num =
|
||||||
new_create_list_.checkpoint_list_.get_size()
|
new_create_list_.checkpoint_list_.get_size()
|
||||||
+ active_list_.checkpoint_list_.get_size();
|
+ active_list_.checkpoint_list_.get_size();
|
||||||
@ -787,7 +789,7 @@ int ObDataCheckpoint::freeze_base_on_needs_(share::SCN recycle_scn)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (logstream_freeze) {
|
if (logstream_freeze) {
|
||||||
if (OB_FAIL(ls_->logstream_freeze(true/*is_sync*/))) {
|
if (OB_FAIL(ls_->logstream_freeze(false /* !is_sync */))) {
|
||||||
STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id()));
|
STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id()));
|
||||||
}
|
}
|
||||||
} else if (OB_FAIL(ls_->batch_tablet_freeze(need_flush_tablets, true/*is_sync*/))) {
|
} else if (OB_FAIL(ls_->batch_tablet_freeze(need_flush_tablets, true/*is_sync*/))) {
|
||||||
|
@ -120,6 +120,10 @@ public:
|
|||||||
|
|
||||||
bool is_empty();
|
bool is_empty();
|
||||||
|
|
||||||
|
static void set_tenant_freeze() { is_tenant_freeze_for_flush_ = true; }
|
||||||
|
static void reset_tenant_freeze() { is_tenant_freeze_for_flush_ = false; }
|
||||||
|
static bool is_tenant_freeze() { return is_tenant_freeze_for_flush_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// traversal prepare_list to flush memtable
|
// traversal prepare_list to flush memtable
|
||||||
// case1: some memtable flush failed when ls freeze
|
// case1: some memtable flush failed when ls freeze
|
||||||
@ -179,6 +183,8 @@ private:
|
|||||||
// avoid blocking other list due to traversal ls_frozen_list
|
// avoid blocking other list due to traversal ls_frozen_list
|
||||||
common::ObSpinLock ls_frozen_list_lock_;
|
common::ObSpinLock ls_frozen_list_lock_;
|
||||||
bool ls_freeze_finished_;
|
bool ls_freeze_finished_;
|
||||||
|
|
||||||
|
static __thread bool is_tenant_freeze_for_flush_;
|
||||||
};
|
};
|
||||||
|
|
||||||
static const ObTabletID LS_DATA_CHECKPOINT_TABLET(ObDataCheckpoint::LS_DATA_CHECKPOINT_TABLET_ID);
|
static const ObTabletID LS_DATA_CHECKPOINT_TABLET(ObDataCheckpoint::LS_DATA_CHECKPOINT_TABLET_ID);
|
||||||
|
@ -1505,7 +1505,7 @@ int ObLS::batch_tablet_freeze(const ObIArray<ObTabletID> &tablet_ids,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObLS::advance_checkpoint_by_flush(SCN recycle_scn, const int64_t abs_timeout_ts)
|
int ObLS::advance_checkpoint_by_flush(SCN recycle_scn, const int64_t abs_timeout_ts, const bool is_tennat_freeze)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t read_lock = LSLOCKALL;
|
int64_t read_lock = LSLOCKALL;
|
||||||
@ -1515,7 +1515,12 @@ int ObLS::advance_checkpoint_by_flush(SCN recycle_scn, const int64_t abs_timeout
|
|||||||
ret = OB_TIMEOUT;
|
ret = OB_TIMEOUT;
|
||||||
LOG_WARN("lock failed, please retry later", K(ret), K(ls_meta_));
|
LOG_WARN("lock failed, please retry later", K(ret), K(ls_meta_));
|
||||||
} else {
|
} else {
|
||||||
|
if (is_tennat_freeze) {
|
||||||
|
ObDataCheckpoint::set_tenant_freeze();
|
||||||
|
LOG_INFO("set tenant_freeze", K(ls_meta_.ls_id_));
|
||||||
|
}
|
||||||
ret = checkpoint_executor_.advance_checkpoint_by_flush(recycle_scn);
|
ret = checkpoint_executor_.advance_checkpoint_by_flush(recycle_scn);
|
||||||
|
ObDataCheckpoint::reset_tenant_freeze();
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -694,7 +694,8 @@ public:
|
|||||||
// advance the checkpoint of this ls
|
// advance the checkpoint of this ls
|
||||||
// @param [in] abs_timeout_ts, wait until timeout if lock conflict
|
// @param [in] abs_timeout_ts, wait until timeout if lock conflict
|
||||||
int advance_checkpoint_by_flush(share::SCN recycle_scn,
|
int advance_checkpoint_by_flush(share::SCN recycle_scn,
|
||||||
const int64_t abs_timeout_ts = INT64_MAX);
|
const int64_t abs_timeout_ts = INT64_MAX,
|
||||||
|
const bool is_tenant_freeze = false);
|
||||||
|
|
||||||
// ObDataCheckpoint interface:
|
// ObDataCheckpoint interface:
|
||||||
DELEGATE_WITH_RET(data_checkpoint_, get_freezecheckpoint_info, int);
|
DELEGATE_WITH_RET(data_checkpoint_, get_freezecheckpoint_info, int);
|
||||||
|
@ -245,7 +245,7 @@ int ObTenantFreezer::ls_freeze_all_unit_(ObLS *ls, const int64_t abs_timeout_ts)
|
|||||||
do {
|
do {
|
||||||
need_retry = false;
|
need_retry = false;
|
||||||
retry_times++;
|
retry_times++;
|
||||||
if (OB_SUCC(ls->advance_checkpoint_by_flush(SCN::max_scn(), abs_timeout_ts))) {
|
if (OB_SUCC(ls->advance_checkpoint_by_flush(SCN::max_scn(), abs_timeout_ts, true /* is_tennat_freeze */))) {
|
||||||
} else {
|
} else {
|
||||||
current_ts = ObTimeUtil::current_time();
|
current_ts = ObTimeUtil::current_time();
|
||||||
is_timeout = (current_ts >= abs_timeout_ts);
|
is_timeout = (current_ts >= abs_timeout_ts);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user