adjust to avoid frequent freeze due to clog disk usage
This commit is contained in:
@ -29,9 +29,7 @@ namespace checkpoint
|
|||||||
{
|
{
|
||||||
|
|
||||||
ObCheckpointExecutor::ObCheckpointExecutor()
|
ObCheckpointExecutor::ObCheckpointExecutor()
|
||||||
: wait_advance_checkpoint_(false),
|
: update_checkpoint_enabled_(false)
|
||||||
last_set_wait_advance_checkpoint_time_(0),
|
|
||||||
update_checkpoint_enabled_(false)
|
|
||||||
{
|
{
|
||||||
reset();
|
reset();
|
||||||
}
|
}
|
||||||
@ -185,7 +183,6 @@ int ObCheckpointExecutor::update_clog_checkpoint()
|
|||||||
STORAGE_LOG(ERROR, "set base lsn failed", K(ret), K(clog_checkpoint_lsn), K(ls_id));
|
STORAGE_LOG(ERROR, "set base lsn failed", K(ret), K(clog_checkpoint_lsn), K(ls_id));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ATOMIC_STORE(&wait_advance_checkpoint_, false);
|
|
||||||
FLOG_INFO("[CHECKPOINT] update clog checkpoint successfully",
|
FLOG_INFO("[CHECKPOINT] update clog checkpoint successfully",
|
||||||
K(clog_checkpoint_lsn), K(checkpoint_ts), K(ls_id),
|
K(clog_checkpoint_lsn), K(checkpoint_ts), K(ls_id),
|
||||||
K(service_type));
|
K(service_type));
|
||||||
@ -268,43 +265,6 @@ int ObCheckpointExecutor::get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &chec
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObCheckpointExecutor::need_flush()
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
bool need_flush = false;
|
|
||||||
int64_t end_log_ts = 0;
|
|
||||||
if (OB_FAIL(loghandler_->get_end_ts_ns(end_log_ts))) {
|
|
||||||
STORAGE_LOG(WARN, "get_end_ts_ns failed", K(ret));
|
|
||||||
} else if (end_log_ts -
|
|
||||||
ls_->get_clog_checkpoint_ts() > MAX_NEED_REPLAY_CLOG_INTERVAL) {
|
|
||||||
STORAGE_LOG(INFO, "over max need replay clog interval",
|
|
||||||
K(end_log_ts), K(ls_->get_clog_checkpoint_ts()));
|
|
||||||
need_flush = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return need_flush;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ObCheckpointExecutor::is_wait_advance_checkpoint()
|
|
||||||
{
|
|
||||||
if (ATOMIC_LOAD(&wait_advance_checkpoint_)) {
|
|
||||||
if (ObTimeUtility::current_time() - last_set_wait_advance_checkpoint_time_ > 10 * 1000 * 1000) {
|
|
||||||
ATOMIC_STORE(&wait_advance_checkpoint_, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ATOMIC_LOAD(&wait_advance_checkpoint_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void ObCheckpointExecutor::set_wait_advance_checkpoint(int64_t checkpoint_log_ts)
|
|
||||||
{
|
|
||||||
ObSpinLockGuard guard(lock_);
|
|
||||||
if (checkpoint_log_ts == ls_->get_clog_checkpoint_ts()) {
|
|
||||||
ATOMIC_STORE(&wait_advance_checkpoint_, true);
|
|
||||||
last_set_wait_advance_checkpoint_time_ = ObTimeUtility::current_time();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t ObCheckpointExecutor::get_cannot_recycle_log_size()
|
int64_t ObCheckpointExecutor::get_cannot_recycle_log_size()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
@ -73,13 +73,6 @@ public:
|
|||||||
// for __all_virtual_checkpoint
|
// for __all_virtual_checkpoint
|
||||||
int get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &checkpoint_array);
|
int get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &checkpoint_array);
|
||||||
|
|
||||||
// avoid need replay too mang logs
|
|
||||||
bool need_flush();
|
|
||||||
|
|
||||||
bool is_wait_advance_checkpoint();
|
|
||||||
|
|
||||||
void set_wait_advance_checkpoint(int64_t checkpoint_log_ts);
|
|
||||||
|
|
||||||
int64_t get_cannot_recycle_log_size();
|
int64_t get_cannot_recycle_log_size();
|
||||||
|
|
||||||
void get_min_rec_log_ts(int &log_type, int64_t &min_rec_log_ts) const;
|
void get_min_rec_log_ts(int &log_type, int64_t &min_rec_log_ts) const;
|
||||||
@ -88,7 +81,6 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t CLOG_GC_PERCENT = 60;
|
static const int64_t CLOG_GC_PERCENT = 60;
|
||||||
static const int64_t MAX_NEED_REPLAY_CLOG_INTERVAL = (int64_t)60 * 60 * 1000 * 1000 * 1000; //ns
|
|
||||||
|
|
||||||
ObLS *ls_;
|
ObLS *ls_;
|
||||||
logservice::ObILogHandler *loghandler_;
|
logservice::ObILogHandler *loghandler_;
|
||||||
@ -98,9 +90,6 @@ private:
|
|||||||
// when the public interfaces are invoked
|
// when the public interfaces are invoked
|
||||||
mutable common::ObSpinLock lock_;
|
mutable common::ObSpinLock lock_;
|
||||||
|
|
||||||
// avoid frequent freeze when clog_used_over_threshold
|
|
||||||
bool wait_advance_checkpoint_;
|
|
||||||
int64_t last_set_wait_advance_checkpoint_time_;
|
|
||||||
bool update_checkpoint_enabled_;
|
bool update_checkpoint_enabled_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -27,13 +27,13 @@ enum ObCommonCheckpointType
|
|||||||
{
|
{
|
||||||
INVALID_BASE_TYPE = 0,
|
INVALID_BASE_TYPE = 0,
|
||||||
|
|
||||||
DATA_CHECKPOINT_TYPE = 1,
|
TX_CTX_MEMTABLE_TYPE = 1,
|
||||||
|
|
||||||
TX_CTX_MEMTABLE_TYPE = 2,
|
TX_DATA_MEMTABLE_TYPE = 2,
|
||||||
|
|
||||||
TX_DATA_MEMTABLE_TYPE = 3,
|
LOCK_MEMTABLE_TYPE = 3,
|
||||||
|
|
||||||
LOCK_MEMTABLE_TYPE = 4,
|
DATA_CHECKPOINT_TYPE = 4,
|
||||||
|
|
||||||
// for unittest
|
// for unittest
|
||||||
TEST_COMMON_CHECKPOINT = 5,
|
TEST_COMMON_CHECKPOINT = 5,
|
||||||
|
|||||||
@ -224,7 +224,9 @@ int ObDataCheckpoint::flush(int64_t recycle_log_ts, bool need_freeze)
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (need_freeze) {
|
if (need_freeze) {
|
||||||
if (get_rec_log_ts() <= recycle_log_ts) {
|
if (get_rec_log_ts() <= recycle_log_ts) {
|
||||||
if (OB_FAIL(ls_->logstream_freeze())) {
|
if (!is_flushing() &&
|
||||||
|
!has_prepared_flush_checkpoint() &&
|
||||||
|
OB_FAIL(ls_->logstream_freeze())) {
|
||||||
STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id()));
|
STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -486,6 +488,11 @@ int ObDataCheckpoint::unlink_from_prepare(ObFreezeCheckpoint *ob_freeze_checkpoi
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ObDataCheckpoint::has_prepared_flush_checkpoint()
|
||||||
|
{
|
||||||
|
return !prepare_list_.is_empty();
|
||||||
|
}
|
||||||
|
|
||||||
int ObDataCheckpoint::get_freezecheckpoint_info(
|
int ObDataCheckpoint::get_freezecheckpoint_info(
|
||||||
ObIArray<checkpoint::ObFreezeCheckpointVTInfo> &freeze_checkpoint_array)
|
ObIArray<checkpoint::ObFreezeCheckpointVTInfo> &freeze_checkpoint_array)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -116,6 +116,8 @@ public:
|
|||||||
|
|
||||||
bool is_flushing() const;
|
bool is_flushing() const;
|
||||||
|
|
||||||
|
bool has_prepared_flush_checkpoint();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// traversal prepare_list to flush memtable
|
// traversal prepare_list to flush memtable
|
||||||
// case1: some memtable flush failed when ls freeze
|
// case1: some memtable flush failed when ls freeze
|
||||||
|
|||||||
@ -1212,17 +1212,16 @@ int ObLS::flush_if_need_(const bool need_flush)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t clog_checkpoint_ts = get_clog_checkpoint_ts();
|
int64_t clog_checkpoint_ts = get_clog_checkpoint_ts();
|
||||||
if ((!need_flush && !checkpoint_executor_.need_flush()) || checkpoint_executor_.is_wait_advance_checkpoint()) {
|
if (!need_flush) {
|
||||||
STORAGE_LOG(INFO, "the ls no need flush to advance_checkpoint", K(get_ls_id()));
|
STORAGE_LOG(INFO, "the ls no need flush to advance_checkpoint",
|
||||||
|
K(get_ls_id()),
|
||||||
|
K(need_flush));
|
||||||
} else if (OB_FAIL(checkpoint_executor_.advance_checkpoint_by_flush())) {
|
} else if (OB_FAIL(checkpoint_executor_.advance_checkpoint_by_flush())) {
|
||||||
STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", KR(ret), K(get_ls_id()));
|
STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", KR(ret), K(get_ls_id()));
|
||||||
} else {
|
|
||||||
checkpoint_executor_.set_wait_advance_checkpoint(clog_checkpoint_ts);
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int ObLS::try_update_uppder_trans_version()
|
int ObLS::try_update_uppder_trans_version()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
@ -171,10 +171,10 @@ void ObCheckPointService::ObCheckpointTask::runTimerTask()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObCheckPointService::clog_disk_usage_over_threshold_(int64_t &threshold)
|
bool ObCheckPointService::get_disk_usage_threshold_(int64_t &threshold)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int clog_disk_usage_over_threshold = false;
|
bool get_disk_usage_threshold_success = false;
|
||||||
// avod clog disk full
|
// avod clog disk full
|
||||||
logservice::ObLogService *log_service = nullptr;
|
logservice::ObLogService *log_service = nullptr;
|
||||||
PalfEnv *palf_env = nullptr;
|
PalfEnv *palf_env = nullptr;
|
||||||
@ -189,13 +189,13 @@ bool ObCheckPointService::clog_disk_usage_over_threshold_(int64_t &threshold)
|
|||||||
int64_t total_size = 0;
|
int64_t total_size = 0;
|
||||||
if (OB_FAIL(palf_env->get_disk_usage(used_size, total_size))) {
|
if (OB_FAIL(palf_env->get_disk_usage(used_size, total_size))) {
|
||||||
STORAGE_LOG(WARN, "get_disk_usage failed", K(ret), K(used_size), K(total_size));
|
STORAGE_LOG(WARN, "get_disk_usage failed", K(ret), K(used_size), K(total_size));
|
||||||
} else if (used_size > (threshold = (total_size * NEED_FLUSH_CLOG_DISK_PERCENT / 100))) {
|
} else {
|
||||||
STORAGE_LOG(INFO, "clog disk is not enough",
|
threshold = total_size * NEED_FLUSH_CLOG_DISK_PERCENT / 100;
|
||||||
K(used_size), K(total_size));
|
get_disk_usage_threshold_success = true;
|
||||||
clog_disk_usage_over_threshold = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return clog_disk_usage_over_threshold;
|
|
||||||
|
return get_disk_usage_threshold_success;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObCheckPointService::cannot_recycle_log_over_threshold_(const int64_t threshold)
|
bool ObCheckPointService::cannot_recycle_log_over_threshold_(const int64_t threshold)
|
||||||
@ -327,7 +327,7 @@ void ObCheckPointService::ObCheckClogDiskUsageTask::runTimerTask()
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t threshold_size = INT64_MAX;
|
int64_t threshold_size = INT64_MAX;
|
||||||
bool need_flush = false;
|
bool need_flush = false;
|
||||||
if (checkpoint_service_.clog_disk_usage_over_threshold_(threshold_size)) {
|
if (checkpoint_service_.get_disk_usage_threshold_(threshold_size)) {
|
||||||
if (checkpoint_service_.cannot_recycle_log_over_threshold_(threshold_size)) {
|
if (checkpoint_service_.cannot_recycle_log_over_threshold_(threshold_size)) {
|
||||||
need_flush = true;
|
need_flush = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,7 +37,7 @@ public:
|
|||||||
check_clog_disk_usage_task_(*this)
|
check_clog_disk_usage_task_(*this)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
static const int64_t NEED_FLUSH_CLOG_DISK_PERCENT = 60;
|
static const int64_t NEED_FLUSH_CLOG_DISK_PERCENT = 30;
|
||||||
static int mtl_init(ObCheckPointService *&m);
|
static int mtl_init(ObCheckPointService *&m);
|
||||||
int init();
|
int init();
|
||||||
int start();
|
int start();
|
||||||
@ -60,7 +60,7 @@ private:
|
|||||||
// the thread which is used to deal with checkpoint task.
|
// the thread which is used to deal with checkpoint task.
|
||||||
ObLSFreezeThread freeze_thread_;
|
ObLSFreezeThread freeze_thread_;
|
||||||
|
|
||||||
bool clog_disk_usage_over_threshold_(int64_t &threshold);
|
bool get_disk_usage_threshold_(int64_t &threshold);
|
||||||
bool cannot_recycle_log_over_threshold_(const int64_t threshold);
|
bool cannot_recycle_log_over_threshold_(const int64_t threshold);
|
||||||
int flush_if_need_(bool need_flush);
|
int flush_if_need_(bool need_flush);
|
||||||
// reduce the risk of clog full due to checkpoint long interval
|
// reduce the risk of clog full due to checkpoint long interval
|
||||||
|
|||||||
Reference in New Issue
Block a user