make retain ctx GC more aggressive
This commit is contained in:
parent
4c4c68977b
commit
098c6384d4
@ -79,8 +79,9 @@ void ObTxLoopWorker::destroy()
|
||||
|
||||
void ObTxLoopWorker::reset()
|
||||
{
|
||||
last_tx_gc_ts_ = false;
|
||||
last_retain_ctx_gc_ts_ = 0;
|
||||
tx_gc_.reset();
|
||||
retain_tx_gc_.reset();
|
||||
advance_checkpoint_.reset();
|
||||
}
|
||||
|
||||
void ObTxLoopWorker::run1()
|
||||
@ -89,45 +90,27 @@ void ObTxLoopWorker::run1()
|
||||
int64_t start_time_us = 0;
|
||||
int64_t time_used = 0;
|
||||
lib::set_thread_name("TxLoopWorker");
|
||||
bool can_gc_tx = false;
|
||||
bool can_gc_retain_ctx = false;
|
||||
|
||||
while (!has_set_stop()) {
|
||||
start_time_us = ObTimeUtility::current_time();
|
||||
|
||||
// tx gc, interval = 5s
|
||||
if (common::ObClockGenerator::getClock() - last_tx_gc_ts_ > TX_GC_INTERVAL) {
|
||||
TRANS_LOG(INFO, "tx gc loop thread is running", K(MTL_ID()));
|
||||
last_tx_gc_ts_ = common::ObClockGenerator::getClock();
|
||||
can_gc_tx = true;
|
||||
}
|
||||
|
||||
//retain ctx gc, interval = 5s
|
||||
if (common::ObClockGenerator::getClock() - last_retain_ctx_gc_ts_ > TX_RETAIN_CTX_GC_INTERVAL) {
|
||||
TRANS_LOG(INFO, "try gc retain ctx");
|
||||
last_retain_ctx_gc_ts_ = common::ObClockGenerator::getClock();
|
||||
can_gc_retain_ctx = true;
|
||||
}
|
||||
|
||||
(void)scan_all_ls_(can_gc_tx, can_gc_retain_ctx);
|
||||
(void)scan_all_ls_();
|
||||
|
||||
// TODO shanyan.g
|
||||
// 1) We use max(max_commit_ts, gts_cache) as read snapshot,
|
||||
// but now we adopt updating max_commit_ts periodly to avoid getting gts cache cost
|
||||
// 2) Some time later, we will revert current modification when performance problem solved;
|
||||
update_max_commit_ts_();
|
||||
update_max_commit_ts_();
|
||||
|
||||
time_used = ObTimeUtility::current_time() - start_time_us;
|
||||
|
||||
if (time_used < LOOP_INTERVAL) {
|
||||
ob_usleep(LOOP_INTERVAL- time_used);
|
||||
}
|
||||
can_gc_tx = false;
|
||||
can_gc_retain_ctx = false;
|
||||
}
|
||||
}
|
||||
|
||||
int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
|
||||
int ObTxLoopWorker::scan_all_ls_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int iter_ret = OB_SUCCESS;
|
||||
@ -165,8 +148,7 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
|
||||
status = MinStartScnStatus::UNKOWN;
|
||||
}
|
||||
|
||||
// tx gc, interval = 15s
|
||||
if (can_tx_gc) {
|
||||
if (tx_gc_.reach()) {
|
||||
// TODO shanyan.g close ctx gc temporarily because of logical bug
|
||||
//
|
||||
do_tx_gc_(cur_ls_ptr, min_start_scn, status);
|
||||
@ -192,9 +174,12 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
|
||||
// keep alive, interval = 100ms
|
||||
do_keep_alive_(cur_ls_ptr, min_start_scn, status);
|
||||
|
||||
if (can_gc_retain_ctx) {
|
||||
if (retain_tx_gc_.reach()) {
|
||||
do_retain_ctx_gc_(cur_ls_ptr);
|
||||
}
|
||||
if (advance_checkpoint_.reach()) {
|
||||
do_advance_retain_ctx_gc_(cur_ls_ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -261,23 +246,37 @@ void ObTxLoopWorker::update_max_commit_ts_()
|
||||
void ObTxLoopWorker::do_retain_ctx_gc_(ObLS *ls_ptr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
ObTxRetainCtxMgr *retain_ctx_mgr = ls_ptr->get_tx_svr()->get_retain_ctx_mgr();
|
||||
if (OB_ISNULL(retain_ctx_mgr)) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "[Tx Loop Worker] retain_ctx_mgr is not inited", K(ret), K(MTL_ID()),
|
||||
K(*ls_ptr));
|
||||
|
||||
} else if (retain_ctx_mgr->try_gc_retain_ctx(ls_ptr)) {
|
||||
TRANS_LOG(WARN, "[Tx Loop Worker] retain_ctx_mgr try to gc retain ctx failed", K(ret),
|
||||
K(MTL_ID()), K(*ls_ptr));
|
||||
} else {
|
||||
TRANS_LOG(DEBUG, "[Tx Loop Worker] retain_ctx_mgr try to gc retain ctx success", K(ret),
|
||||
K(MTL_ID()), K(*ls_ptr));
|
||||
if (OB_FAIL(retain_ctx_mgr->try_gc_retain_ctx(ls_ptr))) {
|
||||
TRANS_LOG(WARN, "[Tx Loop Worker] retain_ctx_mgr try to gc retain ctx failed", K(ret),
|
||||
K(MTL_ID()), K(*ls_ptr));
|
||||
} else {
|
||||
TRANS_LOG(DEBUG, "[Tx Loop Worker] retain_ctx_mgr try to gc retain ctx success", K(ret),
|
||||
K(MTL_ID()), K(*ls_ptr));
|
||||
}
|
||||
retain_ctx_mgr->print_retain_ctx_info(ls_ptr->get_ls_id());
|
||||
}
|
||||
UNUSED(ret);
|
||||
}
|
||||
|
||||
retain_ctx_mgr->print_retain_ctx_info(ls_ptr->get_ls_id());
|
||||
retain_ctx_mgr->try_advance_retain_ctx_gc(ls_ptr->get_ls_id());
|
||||
void ObTxLoopWorker::do_advance_retain_ctx_gc_(ObLS *ls_ptr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTxRetainCtxMgr *retain_ctx_mgr = ls_ptr->get_tx_svr()->get_retain_ctx_mgr();
|
||||
if (OB_ISNULL(retain_ctx_mgr)) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "[Tx Loop Worker] retain_ctx_mgr is not inited", K(ret), K(MTL_ID()),
|
||||
K(*ls_ptr));
|
||||
|
||||
} else {
|
||||
retain_ctx_mgr->try_advance_retain_ctx_gc(ls_ptr->get_ls_id());
|
||||
}
|
||||
UNUSED(ret);
|
||||
}
|
||||
|
||||
|
@ -39,9 +39,12 @@ public:
|
||||
const static int64_t LOOP_INTERVAL = 100 * 1000; // 100ms
|
||||
const static int64_t KEEP_ALIVE_PRINT_INFO_INTERVAL = 5 * 60 * 1000 * 1000; // 5min
|
||||
const static int64_t TX_GC_INTERVAL = 5 * 1000 * 1000; // 5s
|
||||
const static int64_t TX_RETAIN_CTX_GC_INTERVAL = 5 * 1000 * 1000; // 5s
|
||||
const static int64_t TX_RETAIN_CTX_GC_INTERVAL = 1 * 1000 * 1000; // 1s
|
||||
const static int64_t TX_RETAIN_CTX_ADVANCE_CHECKPOINT_INTERVAL = 5 * 1000 * 1000; // 5s
|
||||
public:
|
||||
ObTxLoopWorker() { reset(); }
|
||||
ObTxLoopWorker() : tx_gc_(TX_GC_INTERVAL),
|
||||
retain_tx_gc_(TX_RETAIN_CTX_GC_INTERVAL),
|
||||
advance_checkpoint_(TX_RETAIN_CTX_ADVANCE_CHECKPOINT_INTERVAL) {}
|
||||
~ObTxLoopWorker() {}
|
||||
static int mtl_init(ObTxLoopWorker *&ka);
|
||||
int init();
|
||||
@ -55,15 +58,17 @@ public:
|
||||
virtual void run1();
|
||||
|
||||
private:
|
||||
int scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx);
|
||||
int scan_all_ls_();
|
||||
void do_keep_alive_(ObLS *ls, const share::SCN &min_start_scn, MinStartScnStatus status); // 100ms
|
||||
void do_tx_gc_(ObLS *ls, share::SCN &min_start_scn, MinStartScnStatus &status); // 15s
|
||||
void update_max_commit_ts_();
|
||||
void do_retain_ctx_gc_(ObLS * ls); // 15s
|
||||
void do_retain_ctx_gc_(ObLS *ls);
|
||||
void do_advance_retain_ctx_gc_(ObLS *ls);
|
||||
|
||||
private:
|
||||
int64_t last_tx_gc_ts_;
|
||||
int64_t last_retain_ctx_gc_ts_;
|
||||
ObTimeInterval tx_gc_;
|
||||
ObTimeInterval retain_tx_gc_;
|
||||
ObTimeInterval advance_checkpoint_;
|
||||
};
|
||||
|
||||
|
||||
|
@ -225,7 +225,7 @@ int ObTxRetainCtxMgr::push_retain_ctx(ObIRetainCtxCheckFunctor *retain_func, int
|
||||
int ObTxRetainCtxMgr::try_gc_retain_ctx(storage::ObLS *ls)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
static const int64_t MAX_RUN_US = 500 * 1000;
|
||||
static const int64_t MAX_RUN_US = 300 * 1000;
|
||||
ObTimeGuard tg(__func__, 1 * 1000 * 1000);
|
||||
SpinWLockGuard guard(retain_ctx_lock_);
|
||||
tg.click();
|
||||
|
Loading…
x
Reference in New Issue
Block a user