Split logstream_freeze and tablet_freeze

This commit is contained in:
obdev 2022-12-01 07:38:29 +00:00 committed by ob-robot
parent 8a7d7b401e
commit 2754b76deb
14 changed files with 468 additions and 288 deletions

View File

@ -296,7 +296,7 @@ LATCH_DEF(TENANT_MEM_USAGE_LOCK, 285, "tenant memory usage lock" , LATCH_FIFO, 2
LATCH_DEF(TX_TABLE_LOCK, 286, "tx table lock", LATCH_FIFO, 2000, 0, TX_TABLE_LOCK_WAIT, "tx table lock")
LATCH_DEF(MEMTABLE_STAT_LOCK, 287, "metmable stat lock", LATCH_FIFO, 2000, 0, MEMTABLE_STAT_LOCK_WAIT, "memtable stat lock")
LATCH_DEF(DEADLOCK_DETECT_LOCK, 288, "deadlock detect lock", LATCH_FIFO, 2000, 0, DEADLOCK_DETECT_LOCK_WAIT, "deadlock detect lock")
LATCH_DEF(FREEZE_THREAD_POOL_LOCK, 289, "freeze thread pool lock", LATCH_FIFO, 2000, 0, FREEZE_THREAD_POOL_WAIT, "freeze thread pool lock")
LATCH_DEF(LATCH_END, 99999, "latch end", LATCH_FIFO, 2000, 0, WAIT_EVENT_END, "latch end")
#endif

View File

@ -345,6 +345,7 @@ WAIT_EVENT_DEF(REPLAY_STATUS_WAIT, 16051, "replay status lock wait", "", "", "",
WAIT_EVENT_DEF(REPLAY_STATUS_TASK_WAIT, 16052, "replay status task lock wait", "", "", "", CONCURRENCY, "REPLAY_STATUS_TASK_WAIT", true)
WAIT_EVENT_DEF(MAX_APPLY_SCN_WAIT, 16053, "max apply scn lock wait", "", "", "", CONCURRENCY, "MAX_APPLY_SCN_WAIT", true)
WAIT_EVENT_DEF(GC_HANDLER_WAIT, 16054, "gc handler lock wait", "", "", "", CONCURRENCY, "GC_HANDLER_WAIT", true)
WAIT_EVENT_DEF(FREEZE_THREAD_POOL_WAIT, 16055, "freeze thread pool wait", "", "", "", CONCURRENCY, "FREEZE_THREAD_POOL_WAIT", true)
//replication group
WAIT_EVENT_DEF(RG_TRANSFER_LOCK_WAIT, 17000, "transfer lock wait", "src_rg", "dst_rg", "transfer_pkey", CONCURRENCY, "transfer lock wait", false)

View File

@ -276,6 +276,17 @@ bool ObDataCheckpoint::is_flushing() const
return !ls_freeze_finished_;
}
bool ObDataCheckpoint::is_empty()
{
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObSpinLockGuard guard(lock_);
return new_create_list_.is_empty() &&
active_list_.is_empty() &&
prepare_list_.is_empty() &&
ls_frozen_list_.is_empty();
}
static inline bool task_reach_time_interval(int64_t i, int64_t &last_time)
{
bool bret = false;
@ -355,35 +366,51 @@ void ObDataCheckpoint::ls_frozen_to_active_(int64_t &last_time)
bool ls_frozen_list_is_empty = false;
do {
{
// traversal list once
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObCheckpointIterator iterator;
ls_frozen_list_.get_iterator(iterator);
while (iterator.has_next()) {
int ret = OB_SUCCESS;
auto ob_freeze_checkpoint = iterator.get_next();
if (ob_freeze_checkpoint->is_active_checkpoint()) {
ObSpinLockGuard guard(lock_);
// avoid new active ob_freeze_checkpoint block minor merge
// push back to new_create_list and wait next freeze
if(OB_FAIL(transfer_from_ls_frozen_to_new_created_(ob_freeze_checkpoint))) {
STORAGE_LOG(WARN, "ob_freeze_checkpoint move to new_created_list failed",
K(ret), K(*ob_freeze_checkpoint));
}
} else {
ObSpinLockGuard guard(lock_);
if (OB_FAIL(ob_freeze_checkpoint->check_can_move_to_active(true))) {
STORAGE_LOG(WARN, "check can freeze failed", K(ret), K(*ob_freeze_checkpoint));
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock);
if (OB_UNLIKELY(ls_->is_stopped_)) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_->ls_meta));
} else if (OB_UNLIKELY(!(ls_->get_log_handler()->is_replay_enabled()))) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta));
} else {
// traversal list once
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObCheckpointIterator iterator;
ls_frozen_list_.get_iterator(iterator);
while (iterator.has_next()) {
int ret = OB_SUCCESS;
auto ob_freeze_checkpoint = iterator.get_next();
if (ob_freeze_checkpoint->is_active_checkpoint()) {
ObSpinLockGuard guard(lock_);
// avoid new active ob_freeze_checkpoint block minor merge
// push back to new_create_list and wait next freeze
if(OB_FAIL(transfer_from_ls_frozen_to_new_created_(ob_freeze_checkpoint))) {
STORAGE_LOG(WARN, "ob_freeze_checkpoint move to new_created_list failed",
K(ret), K(*ob_freeze_checkpoint));
}
} else {
ObSpinLockGuard guard(lock_);
if (OB_FAIL(ob_freeze_checkpoint->check_can_move_to_active(true))) {
STORAGE_LOG(WARN, "check can freeze failed", K(ret), K(*ob_freeze_checkpoint));
}
}
}
ls_frozen_list_is_empty = ls_frozen_list_.is_empty();
}
if (OB_NOT_RUNNING == ret && is_empty()) {
ls_frozen_list_is_empty = true;
}
ls_frozen_list_is_empty = ls_frozen_list_.is_empty();
}
if (!ls_frozen_list_is_empty) {
ob_usleep(LOOP_TRAVERSAL_INTERVAL_US);
if (task_reach_time_interval(3 * 1000 * 1000, last_time)) {
STORAGE_LOG(WARN, "cost too much time in ls_frozen_list_", K(ls_->get_ls_id()));
STORAGE_LOG(WARN, "cost too much time in ls_frozen_list_", K(ret), K(ls_->get_ls_id()));
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
print_list_(ls_frozen_list_);
}
@ -401,28 +428,44 @@ void ObDataCheckpoint::ls_frozen_to_prepare_(int64_t &last_time)
bool ls_frozen_list_is_empty = false;
do {
{
// traversal list once
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObCheckpointIterator iterator;
ls_frozen_list_.get_iterator(iterator);
while (iterator.has_next()) {
int tmp_ret = OB_SUCCESS;
auto ob_freeze_checkpoint = iterator.get_next();
if (ob_freeze_checkpoint->ready_for_flush()) {
if (OB_FAIL(ob_freeze_checkpoint->finish_freeze())) {
STORAGE_LOG(WARN, "finish freeze failed", K(ret));
}
} else if (ob_freeze_checkpoint->is_active_checkpoint()) {
// avoid active ob_freeze_checkpoint block minor merge
// push back to active_list and wait next freeze
ObSpinLockGuard guard(lock_);
if(OB_SUCCESS != (tmp_ret = (transfer_from_ls_frozen_to_active_(ob_freeze_checkpoint)))) {
STORAGE_LOG(WARN, "active ob_freeze_checkpoint move to active_list failed",
K(tmp_ret), K(*ob_freeze_checkpoint));
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock);
if (OB_UNLIKELY(ls_->is_stopped_)) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_->ls_meta));
} else if (OB_UNLIKELY(!(ls_->get_log_handler()->is_replay_enabled()))) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta));
} else {
// traversal list once
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObCheckpointIterator iterator;
ls_frozen_list_.get_iterator(iterator);
while (iterator.has_next()) {
int tmp_ret = OB_SUCCESS;
auto ob_freeze_checkpoint = iterator.get_next();
if (ob_freeze_checkpoint->ready_for_flush()) {
if (OB_FAIL(ob_freeze_checkpoint->finish_freeze())) {
STORAGE_LOG(WARN, "finish freeze failed", K(ret));
}
} else if (ob_freeze_checkpoint->is_active_checkpoint()) {
// avoid active ob_freeze_checkpoint block minor merge
// push back to active_list and wait next freeze
ObSpinLockGuard guard(lock_);
if(OB_SUCCESS != (tmp_ret = (transfer_from_ls_frozen_to_active_(ob_freeze_checkpoint)))) {
STORAGE_LOG(WARN, "active ob_freeze_checkpoint move to active_list failed",
K(tmp_ret), K(*ob_freeze_checkpoint));
}
}
}
ls_frozen_list_is_empty = ls_frozen_list_.is_empty();
}
if (OB_NOT_RUNNING == ret && is_empty()) {
ls_frozen_list_is_empty = true;
}
ls_frozen_list_is_empty = ls_frozen_list_.is_empty();
}
if (!ls_frozen_list_is_empty) {

View File

@ -118,6 +118,8 @@ public:
bool has_prepared_flush_checkpoint();
bool is_empty();
private:
// traversal prepare_list to flush memtable
// case1: some memtable flush failed when ls freeze

View File

@ -22,6 +22,8 @@
#include "storage/tx/ob_trans_service.h"
#include "storage/compaction/ob_compaction_diagnose.h"
#include "logservice/ob_log_service.h"
#include "storage/ls/ob_ls.h"
#include "storage/tx_storage/ob_tenant_freezer.h"
namespace oceanbase
{
@ -195,12 +197,7 @@ ObFreezer::ObFreezer()
: freeze_flag_(0),
freeze_snapshot_version_(),
max_decided_scn_(),
ls_wrs_handler_(nullptr),
ls_tx_svr_(nullptr),
ls_tablet_svr_(nullptr),
data_checkpoint_(nullptr),
loghandler_(nullptr),
ls_id_(),
ls_(nullptr),
stat_(),
empty_memtable_cnt_(0),
high_priority_freeze_cnt_(0),
@ -210,21 +207,11 @@ ObFreezer::ObFreezer()
is_inited_(false)
{}
ObFreezer::ObFreezer(ObLSWRSHandler *ls_loop_worker,
ObLSTxService *ls_tx_svr,
ObLSTabletService *ls_tablet_svr,
checkpoint::ObDataCheckpoint *data_checkpoint,
ObILogHandler *ob_loghandler,
const share::ObLSID &ls_id)
ObFreezer::ObFreezer(ObLS *ls)
: freeze_flag_(0),
freeze_snapshot_version_(),
max_decided_scn_(),
ls_wrs_handler_(ls_loop_worker),
ls_tx_svr_(ls_tx_svr),
ls_tablet_svr_(ls_tablet_svr),
data_checkpoint_(data_checkpoint),
loghandler_(ob_loghandler),
ls_id_(ls_id),
ls_(ls),
stat_(),
empty_memtable_cnt_(0),
high_priority_freeze_cnt_(0),
@ -239,39 +226,12 @@ ObFreezer::~ObFreezer()
reset();
}
void ObFreezer::set(ObLSWRSHandler *ls_loop_worker,
ObLSTxService *ls_tx_svr,
ObLSTabletService *ls_tablet_svr,
checkpoint::ObDataCheckpoint *data_checkpoint,
ObILogHandler *ob_loghandler,
const share::ObLSID &ls_id,
uint32_t freeze_flag)
{
freeze_flag_ = freeze_flag;
freeze_snapshot_version_.reset();
max_decided_scn_.reset();
ls_wrs_handler_ = ls_loop_worker;
ls_tx_svr_ = ls_tx_svr;
ls_tablet_svr_ = ls_tablet_svr;
data_checkpoint_ = data_checkpoint;
loghandler_ = ob_loghandler;
ls_id_ = ls_id;
stat_.reset();
empty_memtable_cnt_ = 0;
need_resubmit_log_ = false;
}
void ObFreezer::reset()
{
freeze_flag_ = 0;
freeze_snapshot_version_.reset();
max_decided_scn_.reset();
ls_wrs_handler_ = nullptr;
ls_tx_svr_ = nullptr;
data_checkpoint_ = nullptr;
ls_tablet_svr_ = nullptr;
loghandler_ = nullptr;
ls_id_.reset();
ls_ = nullptr;
stat_.reset();
empty_memtable_cnt_ = 0;
high_priority_freeze_cnt_ = 0;
@ -281,37 +241,70 @@ void ObFreezer::reset()
is_inited_ = false;
}
int ObFreezer::init(ObLSWRSHandler *ls_loop_worker,
ObLSTxService *ls_tx_svr,
ObLSTabletService *ls_tablet_svr,
checkpoint::ObDataCheckpoint *data_checkpoint,
ObILogHandler *log_handler,
const share::ObLSID &ls_id)
int ObFreezer::init(ObLS *ls)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ls_loop_worker) ||
OB_ISNULL(ls_tx_svr) ||
OB_ISNULL(ls_tablet_svr) ||
OB_ISNULL(data_checkpoint) ||
OB_ISNULL(log_handler) ||
!ls_id.is_valid()) {
if (OB_ISNULL(ls)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "[Freezer] invalid argument", K(ret), K(ls_loop_worker), K(ls_tx_svr),
K(ls_tablet_svr), K(data_checkpoint), K(log_handler), K(ls_id));
TRANS_LOG(WARN, "[Freezer] invalid argument", K(ret));
} else {
set(ls_loop_worker, ls_tx_svr, ls_tablet_svr, data_checkpoint, log_handler, ls_id);
freeze_flag_ = 0;
freeze_snapshot_version_.reset();
max_decided_scn_.reset();
ls_ = ls;
stat_.reset();
empty_memtable_cnt_ = 0;
high_priority_freeze_cnt_ = 0;
low_priority_freeze_cnt_ = 0;
need_resubmit_log_ = false;
enable_ = true;
is_inited_ = true;
}
return ret;
}
/* ls info */
share::ObLSID ObFreezer::get_ls_id()
{
return OB_ISNULL(ls_) ? INVALID_LS : ls_->get_ls_id();
}
checkpoint::ObDataCheckpoint* ObFreezer::get_ls_data_checkpoint()
{
return OB_ISNULL(ls_) ? nullptr : ls_->get_data_checkpoint();
}
ObLSTxService* ObFreezer::get_ls_tx_svr()
{
return OB_ISNULL(ls_) ? nullptr : ls_->get_tx_svr();
}
ObLSTabletService* ObFreezer::get_ls_tablet_svr()
{
return OB_ISNULL(ls_) ? nullptr : ls_->get_tablet_svr();
}
logservice::ObILogHandler* ObFreezer::get_ls_log_handler()
{
return OB_ISNULL(ls_) ? nullptr : ls_->get_log_handler();
}
ObLSWRSHandler* ObFreezer::get_ls_wrs_handler()
{
return OB_ISNULL(ls_) ? nullptr : ls_->get_ls_wrs_handler();
}
/* logstream freeze */
int ObFreezer::logstream_freeze()
{
int ret = OB_SUCCESS;
SCN freeze_snapshot_version;
SCN max_decided_scn;
FLOG_INFO("[Freezer] logstream_freeze start", K(ret), K_(ls_id));
share::ObLSID ls_id = get_ls_id();
FLOG_INFO("[Freezer] logstream_freeze start", K(ret), K(ls_id));
stat_.reset();
stat_.start_time_ = ObTimeUtility::current_time();
stat_.state_ = ObFreezeState::NOT_SET_FREEZE_FLAG;
@ -319,35 +312,27 @@ int ObFreezer::logstream_freeze()
ObLSFreezeGuard guard(*this);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id));
LOG_WARN("[Freezer] not inited", K(ret), K(ls_id));
} else if (OB_UNLIKELY(!enable_)) {
LOG_WARN("freezer is offline, can not freeze now", K(ret), K_(ls_id));
LOG_WARN("freezer is offline, can not freeze now", K(ret), K(ls_id));
} else if (OB_FAIL(decide_max_decided_scn(max_decided_scn))) {
TRANS_LOG(WARN, "[Freezer] decide max decided log ts failure", K(ret), K_(ls_id));
TRANS_LOG(WARN, "[Freezer] decide max decided log ts failure", K(ret), K(ls_id));
} else if (OB_FAIL(get_ls_weak_read_scn(freeze_snapshot_version))) {
TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K_(ls_id));
TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K(ls_id));
} else if (ObScnRange::MAX_SCN == freeze_snapshot_version
|| ObScnRange::MIN_SCN >= freeze_snapshot_version) {
ret = OB_NOT_INIT;
LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id), K(freeze_snapshot_version));
LOG_WARN("[Freezer] weak read service not inited", K(ret), K(ls_id), K(freeze_snapshot_version));
} else if (OB_FAIL(set_freeze_flag())) {
FLOG_INFO("[Freezer] freeze is running", K(ret), K_(ls_id));
FLOG_INFO("[Freezer] freeze is running", K(ret), K(ls_id));
} else if (FALSE_IT(max_decided_scn_ = max_decided_scn)) {
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
} else if (FALSE_IT(set_need_resubmit_log(false))) {
} else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) {
} else if (OB_FAIL(inner_logstream_freeze())) {
TRANS_LOG(WARN, "[Freezer] logstream_freeze failure", K(ret), K(ls_id));
undo_freeze_();
TRANS_LOG(WARN, "[Freezer] logstream_freeze failure", K(ret), K_(ls_id));
} else {
stat_.add_diagnose_info("logstream_freeze success");
unset_freeze_();
uint32_t freeze_clock = get_freeze_clock();
FLOG_INFO("[Freezer] logstream_freeze success", K(ret), K_(ls_id), K(freeze_clock));
}
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
print_freezer_statistics();
@ -357,33 +342,75 @@ int ObFreezer::logstream_freeze()
int ObFreezer::inner_logstream_freeze()
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
if (OB_FAIL(get_ls_data_checkpoint()->ls_freeze(SCN::max_scn()))) {
// move memtables from active_list to frozen_list
TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K(ls_id));
stat_.add_diagnose_info("data_checkpoint freeze failed");
} else if (FALSE_IT(submit_log_for_freeze())) {
} else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/))) {
TRANS_LOG(WARN, "failed to submit ls_freeze task", K(ret), K(ls_id));
stat_.add_diagnose_info("fail to submit ls_freeze_task");
} else {
TRANS_LOG(INFO, "[Freezer] succeed to start ls_freeze_task", K(ret), K(ls_id));
}
return ret;
}
void ObFreezer::ls_freeze_task()
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
const int64_t start = ObTimeUtility::current_time();
uint32_t freeze_clock = get_freeze_clock();
TRANS_LOG(INFO, "[Freezer] freeze_clock", K(ret), K_(ls_id), K(freeze_clock));
TRANS_LOG(INFO, "[Freezer] freeze_clock", K(ls_id), K(freeze_clock));
if (OB_FAIL(data_checkpoint_->ls_freeze(SCN::max_scn()))) {
// move memtables from active_list to frozen_list
TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K_(ls_id));
stat_.add_diagnose_info("data_checkpoint freeze failed");
} else {
submit_log_for_freeze();
// wait till all memtables are moved from frozen_list to prepare_list
// this means that all memtables can be dumped
while (!data_checkpoint_->ls_freeze_finished()) {
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 5 * 1000 * 1000) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
if (need_resubmit_log()) {
// wait till all memtables are moved from frozen_list to prepare_list
// this means that all memtables can be dumped
while (!get_ls_data_checkpoint()->ls_freeze_finished()) {
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 5 * 1000 * 1000) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
if (need_resubmit_log()) {
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock);
if (OB_FAIL(check_ls_state())) {
} else {
submit_log_for_freeze();
TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K_(ls_id), K(cost_time));
TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id), K(cost_time));
}
TRANS_LOG(WARN, "[Freezer] finish ls_freeze costs too much time",
K_(ls_id), K(cost_time));
stat_.add_diagnose_info("finish ls_freeze costs too much time");
}
TRANS_LOG(WARN, "[Freezer] finish ls_freeze costs too much time",
K(ls_id), K(cost_time));
stat_.add_diagnose_info("finish ls_freeze costs too much time");
}
ob_usleep(100);
}
ob_usleep(100);
}
stat_.add_diagnose_info("logstream_freeze success");
FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock));
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
unset_freeze_();
}
// must be used under the protection of ls_lock
int ObFreezer::check_ls_state()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(ls_->is_stopped_)) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_->ls_meta));
} else if (OB_UNLIKELY(!(ls_->get_log_handler()->is_replay_enabled()))) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta));
}
return ret;
@ -393,12 +420,13 @@ int ObFreezer::inner_logstream_freeze()
int ObFreezer::tablet_freeze(const ObTabletID &tablet_id)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObTabletHandle handle;
ObTablet *tablet = nullptr;
ObTabletMemtableMgr *memtable_mgr = nullptr;
memtable::ObIMemtable *imemtable = nullptr;
SCN freeze_snapshot_version;
FLOG_INFO("[Freezer] tablet_freeze start", K(ret), K_(ls_id), K(tablet_id));
FLOG_INFO("[Freezer] tablet_freeze start", K(ret), K(ls_id), K(tablet_id));
stat_.reset();
stat_.start_time_ = ObTimeUtility::current_time();
stat_.state_ = ObFreezeState::NOT_SET_FREEZE_FLAG;
@ -407,75 +435,74 @@ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id)
ObTabletFreezeGuard guard(*this, true /* try guard */);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K(ls_id), K(tablet_id));
} else if (OB_UNLIKELY(!enable_)) {
LOG_WARN("freezer is offline, can not freeze now", K(ret), K_(ls_id));
LOG_WARN("freezer is offline, can not freeze now", K(ret), K(ls_id));
} else if (OB_FAIL(guard.try_set_tablet_freeze_begin())) {
// no need freeze now, a ls freeze is running or will be running
ret = OB_SUCCESS;
FLOG_INFO("[Freezer] ls freeze is running, no need freeze again", K(ret), K_(ls_id), K(tablet_id));
FLOG_INFO("[Freezer] ls freeze is running, no need freeze again", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(set_freeze_flag_without_inc_freeze_clock())) {
ret = OB_SUCCESS;
FLOG_INFO("[Freezer] freeze is running", K(ret), K_(ls_id), K(tablet_id));
FLOG_INFO("[Freezer] freeze is running", K(ret), K(ls_id), K(tablet_id));
} else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) {
} else {
// succeed to set freeze flag
if (OB_FAIL(get_ls_weak_read_scn(freeze_snapshot_version))) {
TRANS_LOG(WARN, "[Freezer] get ls weak read scn failure", K(ret), K_(ls_id));
TRANS_LOG(WARN, "[Freezer] get ls weak read scn failure", K(ret), K(ls_id));
} else if (ObScnRange::MAX_SCN == freeze_snapshot_version
|| ObScnRange::MIN_SCN >= freeze_snapshot_version) {
ret = OB_NOT_INIT;
LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id));
LOG_WARN("[Freezer] weak read service not inited", K(ret), K(ls_id));
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
} else if (FALSE_IT(set_need_resubmit_log(false))) {
} else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id,
handle,
ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K_(ls_id), K(tablet_id));
} else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id,
handle,
ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to get tablet");
} else if (FALSE_IT(tablet = handle.get_obj())) {
} else if (OB_ISNULL(memtable_mgr = static_cast<ObTabletMemtableMgr*>(tablet->get_memtable_mgr()))) {
TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(imemtable))) {
if (ret == OB_ENTRY_NOT_EXIST) {
ret = OB_SUCCESS;
TRANS_LOG(INFO, "[Freezer] no need to freeze since there is no active memtable", K(ret),
K_(ls_id), K(tablet_id));
K(ls_id), K(tablet_id));
stat_.add_diagnose_info("no need to freeze since there is no active memtable");
} else {
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to set is_tablet_freeze");
}
} else if (OB_FAIL(handle_memtable_for_tablet_freeze(imemtable))) {
TRANS_LOG(WARN, "[Freezer] fail to handle memtable", K(ret), K_(ls_id), K(tablet_id));
} else if (FALSE_IT(submit_log_for_freeze())) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, imemtable))) {
TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to submit tablet_freeze_task");
} else {
stat_.add_diagnose_info("tablet_freeze success");
TRANS_LOG(INFO, "[Freezer] succeed to start tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
}
if (OB_FAIL(ret) || OB_ISNULL(imemtable)) {
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
print_freezer_statistics();
unset_freeze_();
}
unset_freeze_();
}
if (OB_SUCC(ret)) {
FLOG_INFO("[Freezer] tablet_freeze success", K(ret), K_(ls_id), K(tablet_id));
}
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
print_freezer_statistics();
return ret;
}
int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObTabletHandle handle;
ObTablet *tablet = nullptr;
ObTabletMemtableMgr *memtable_mgr = nullptr;
memtable::ObIMemtable *imemtable = nullptr;
SCN freeze_snapshot_version;
FLOG_INFO("[Freezer] force_tablet_freeze start", K(ret), K_(ls_id), K(tablet_id));
FLOG_INFO("[Freezer] force_tablet_freeze start", K(ret), K(ls_id), K(tablet_id));
stat_.reset();
stat_.start_time_ = ObTimeUtility::current_time();
stat_.state_ = ObFreezeState::NOT_SET_FREEZE_FLAG;
@ -485,57 +512,128 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
ObTabletFreezeGuard guard(*this);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K(ls_id), K(tablet_id));
} else if (OB_UNLIKELY(!enable_)) {
LOG_WARN("freezer is offline, can not freeze now", K(ret), K_(ls_id));
LOG_WARN("freezer is offline, can not freeze now", K(ret), K(ls_id));
} else if (OB_FAIL(loop_set_freeze_flag())) {
TRANS_LOG(WARN, "[Freezer] failed to set freeze_flag", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] failed to set freeze_flag", K(ret), K(ls_id), K(tablet_id));
} else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) {
} else {
// succeed to set freeze flag
if (OB_FAIL(get_ls_weak_read_scn(freeze_snapshot_version))) {
TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K_(ls_id));
TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K(ls_id));
} else if (ObScnRange::MAX_SCN == freeze_snapshot_version
|| ObScnRange::MIN_SCN >= freeze_snapshot_version) {
ret = OB_NOT_INIT;
LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id));
LOG_WARN("[Freezer] weak read service not inited", K(ret), K(ls_id));
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
} else if (FALSE_IT(set_need_resubmit_log(false))) {
} else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id,
handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
TRANS_LOG(WARN, "[Freezer] fail to get tablet for freeze", K(ret), K_(ls_id), K(tablet_id));
} else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id,
handle,
ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
TRANS_LOG(WARN, "[Freezer] fail to get tablet for freeze", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to get tablet");
} else if (FALSE_IT(tablet = handle.get_obj())) {
} else if (OB_FAIL(create_memtable_if_no_active_memtable(tablet))) {
if (OB_NO_NEED_UPDATE == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("[Freezer] fail to create an active memtable for force_tablet_freeze", K(ret), K_(ls_id), K(tablet_id));
LOG_WARN("[Freezer] fail to create an active memtable for force_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to create an active memtable for force_tablet_freeze");
}
} else if (OB_ISNULL(memtable_mgr = static_cast<ObTabletMemtableMgr*>(tablet->get_memtable_mgr()))) {
TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(imemtable, true))) {
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to set is_tablet_freeze");
} else if (OB_FAIL(handle_memtable_for_tablet_freeze(imemtable))) {
TRANS_LOG(WARN, "[Freezer] fail to handle memtable", K(ret), K_(ls_id), K(tablet_id));
} else if (FALSE_IT(submit_log_for_freeze())) {
} else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, imemtable))) {
TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to submit freeze_task");
} else {
stat_.add_diagnose_info("force_tablet_freeze success");
TRANS_LOG(INFO, "[Freezer] succeed to start force_tablet_freeze_task", K(ret), K(ls_id), K(tablet_id));
}
if (OB_FAIL(ret) || OB_ISNULL(imemtable)) {
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
print_freezer_statistics();
unset_freeze_();
}
}
return ret;
}
int ObFreezer::tablet_freeze_task(memtable::ObIMemtable *imemtable)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
memtable::ObMemtable *memtable = nullptr;
if (OB_ISNULL(imemtable)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "memtable cannot be null", K(ret), K(ls_id));
} else {
// succeed to set freeze_flag
if (FALSE_IT(memtable = static_cast<memtable::ObMemtable*>(imemtable))) {
} else if (OB_FAIL(wait_memtable_ready_for_flush_with_ls_lock(memtable))) {
TRANS_LOG(WARN, "[Freezer] fail to wait memtable ready_for_flush", K(ret), K(ls_id));
} else {
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock);
if (OB_FAIL(check_ls_state())) {
} else if (OB_FAIL(memtable->finish_freeze())) {
TRANS_LOG(ERROR, "[Freezer] memtable cannot be flushed",
K(ret), K(ls_id), KPC(memtable));
stat_.add_diagnose_info("memtable cannot be flushed");
} else {
stat_.add_diagnose_info("tablet_freeze success");
FLOG_INFO("[Freezer] tablet_freeze_task success", K(ret), K(ls_id), KPC(memtable));
}
}
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
print_freezer_statistics();
unset_freeze_();
}
if (OB_SUCC(ret)) {
FLOG_INFO("[Freezer] force_tablet_freeze success", K(ret), K_(ls_id), K(tablet_id));
} else {
FLOG_INFO("[Freezer] force_tablet_freeze failed", K(ret), K_(ls_id), K(tablet_id));
}
return ret;
}
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
stat_.ret_code_ = ret;
int ObFreezer::wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable)
{
share::ObLSID ls_id = get_ls_id();
const int64_t start = ObTimeUtility::current_time();
int ret = OB_SUCCESS;
bool ready_for_flush = false;
print_freezer_statistics();
do {
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock);
if (OB_FAIL(check_ls_state())) {
} else if (!memtable->ready_for_flush()) {
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 5 * 1000 * 1000) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
if (need_resubmit_log()) {
submit_log_for_freeze();
TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id), K(cost_time));
}
TRANS_LOG(WARN, "[Freezer] ready_for_flush costs too much time",
K(ls_id), K(cost_time), KPC(memtable));
stat_.add_diagnose_info("ready_for_flush costs too much time");
}
}
ob_usleep(100);
} else {
ready_for_flush = true;
}
} while (OB_SUCC(ret) && !ready_for_flush);
return ret;
}
@ -543,12 +641,13 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id)
int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *&imemtable)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObTabletHandle handle;
ObTablet *tablet = nullptr;
ObTabletMemtableMgr *memtable_mgr = nullptr;
imemtable = nullptr;
SCN freeze_snapshot_version;
FLOG_INFO("[Freezer] tablet_freeze_for_replace_tablet_meta start", K(ret), K_(ls_id), K(tablet_id));
FLOG_INFO("[Freezer] tablet_freeze_for_replace_tablet_meta start", K(ret), K(ls_id), K(tablet_id));
stat_.reset();
stat_.start_time_ = ObTimeUtility::current_time();
stat_.state_ = ObFreezeState::NOT_SET_FREEZE_FLAG;
@ -557,41 +656,41 @@ int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id
ObTabletFreezeGuard guard(*this, true /* try guard */);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(guard.try_set_tablet_freeze_begin())) {
// no need freeze now, a ls freeze is running or will be running
ret = OB_SUCCESS;
FLOG_INFO("[Freezer] ls freeze is running, no need freeze again", K(ret), K_(ls_id), K(tablet_id));
FLOG_INFO("[Freezer] ls freeze is running, no need freeze again", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(set_freeze_flag_without_inc_freeze_clock())) {
ret = OB_SUCCESS;
FLOG_INFO("[Freezer] freeze is running", K(ret), K_(ls_id), K(tablet_id));
FLOG_INFO("[Freezer] freeze is running", K(ret), K(ls_id), K(tablet_id));
} else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) {
} else {
// succeed to set freeze flag
if (OB_FAIL(get_ls_weak_read_scn(freeze_snapshot_version))) {
TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K_(ls_id));
TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K(ls_id));
} else if (freeze_snapshot_version.is_max()
|| !freeze_snapshot_version.is_valid_and_not_min()) {
ret = OB_NOT_INIT;
LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id));
LOG_WARN("[Freezer] weak read service not inited", K(ret), K(ls_id));
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
} else if (FALSE_IT(set_need_resubmit_log(false))) {
} else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id,
} else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id,
handle,
ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to get tablet");
} else if (FALSE_IT(tablet = handle.get_obj())) {
} else if (OB_ISNULL(memtable_mgr = static_cast<ObTabletMemtableMgr*>(tablet->get_memtable_mgr()))) {
TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(imemtable))) {
if (ret == OB_ENTRY_NOT_EXIST) {
ret = OB_SUCCESS;
TRANS_LOG(INFO, "[Freezer] no need to freeze since there is no active memtable", K(ret),
K_(ls_id), K(tablet_id));
K(ls_id), K(tablet_id));
stat_.add_diagnose_info("no need to freeze since there is no active memtable");
} else {
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id));
stat_.add_diagnose_info("fail to set is_tablet_freeze");
}
}
@ -611,16 +710,17 @@ int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id
int ObFreezer::handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *imemtable)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
if (OB_ISNULL(imemtable)) {
ret = OB_SUCCESS;
FLOG_INFO("[Freezer] no need to tablet_freeze_for_replace_tablet_meta", K(ret), K_(ls_id), K(tablet_id));
FLOG_INFO("[Freezer] no need to tablet_freeze_for_replace_tablet_meta", K(ret), K(ls_id), K(tablet_id));
} else {
if (OB_FAIL(handle_memtable_for_tablet_freeze(imemtable))) {
TRANS_LOG(WARN, "[Freezer] fail to handle memtable", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] fail to handle memtable", K(ret), K(ls_id), K(tablet_id));
} else {
stat_.add_diagnose_info("tablet_freeze_for_replace_tablet_meta success");
FLOG_INFO("[Freezer] tablet_freeze_for_replace_tablet_meta success", K(ret), K_(ls_id), K(tablet_id));
FLOG_INFO("[Freezer] tablet_freeze_for_replace_tablet_meta success", K(ret), K(ls_id), K(tablet_id));
}
stat_.state_ = ObFreezeState::FINISH;
stat_.end_time_ = ObTimeUtility::current_time();
@ -636,6 +736,7 @@ int ObFreezer::handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &
int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
if (OB_ISNULL(imemtable)) {
ret = OB_ERR_UNEXPECTED;
@ -645,11 +746,11 @@ int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtabl
wait_memtable_ready_for_flush(memtable);
if (OB_FAIL(memtable->finish_freeze())) {
TRANS_LOG(ERROR, "[Freezer] memtable cannot be flushed",
K(ret), K_(ls_id), K(*memtable));
K(ret), K(ls_id), K(*memtable));
stat_.add_diagnose_info("memtable cannot be flushed");
} else {
TRANS_LOG(INFO, "[Freezer] memtable is ready to be flushed",
K(ret), K_(ls_id), K(*memtable));
K(ret), K(ls_id), K(*memtable));
}
}
@ -659,6 +760,7 @@ int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtabl
int ObFreezer::submit_log_for_freeze()
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
const int64_t start = ObTimeUtility::current_time();
ObTabletID tablet_id(INT64_MAX); // used for diagnose
@ -666,14 +768,14 @@ int ObFreezer::submit_log_for_freeze()
ret = OB_SUCCESS;
transaction::ObTransID fail_tx_id;
if (OB_FAIL(ls_tx_svr_->traverse_trans_to_submit_redo_log(fail_tx_id))) {
if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_redo_log(fail_tx_id))) {
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 1000 * 1000) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
TRANS_LOG(WARN, "[Freezer] failed to traverse trans ctx to submit redo log", K(ret),
K_(ls_id), K(cost_time), K(fail_tx_id));
K(ls_id), K(cost_time), K(fail_tx_id));
ADD_SUSPECT_INFO(MINI_MERGE,
ls_id_, tablet_id,
ls_id, tablet_id,
"traverse_trans_to_submit_redo_log failed",
K(ret),
K(fail_tx_id));
@ -685,10 +787,10 @@ int ObFreezer::submit_log_for_freeze()
ob_usleep(100 * 1000);
}
} while (OB_FAIL(ret));
DEL_SUSPECT_INFO(MINI_MERGE, ls_id_, tablet_id);
DEL_SUSPECT_INFO(MINI_MERGE, ls_id, tablet_id);
if (OB_SUCC(ret)) {
if (OB_FAIL(ls_tx_svr_->traverse_trans_to_submit_next_log())) {
if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_next_log())) {
TRANS_LOG(WARN, "traverse trans ctx to submit next log failed", K(ret));
}
}
@ -698,8 +800,38 @@ int ObFreezer::submit_log_for_freeze()
return ret;
}
int ObFreezer::submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imemtable)
{
int ret = OB_SUCCESS;
ObTenantFreezer *tenant_freezer = nullptr;
share::ObLSID ls_id = get_ls_id();
const int64_t start = ObTimeUtility::current_time();
if (OB_ISNULL(tenant_freezer = MTL(storage::ObTenantFreezer*))) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ObTenantFreezer is null", K(ret), K(ls_id));
} else {
ObSpinLockGuard freeze_thread_pool(tenant_freezer->freeze_thread_pool_lock_);
do {
ret = is_ls_freeze ? tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this]() { ls_freeze_task(); })
: tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, imemtable]() { tablet_freeze_task(imemtable); });
if (OB_FAIL(ret)) {
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 100 * 1000) {
if (TC_REACH_TIME_INTERVAL(100 * 1000)) {
TRANS_LOG(WARN, "[Freezer] failed to start freeze_task", K(ret), K(ls_id), K(is_ls_freeze));
}
}
}
} while (OB_FAIL(ret));
}
return ret;
}
void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable)
{
share::ObLSID ls_id = get_ls_id();
const int64_t start = ObTimeUtility::current_time();
int ret = OB_SUCCESS;
@ -709,10 +841,10 @@ void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable)
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
if (need_resubmit_log()) {
submit_log_for_freeze();
TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K_(ls_id), K(cost_time));
TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id), K(cost_time));
}
TRANS_LOG(WARN, "[Freezer] ready_for_flush costs too much time",
K_(ls_id), K(cost_time), KPC(memtable));
K(ls_id), K(cost_time), KPC(memtable));
stat_.add_diagnose_info("ready_for_flush costs too much time");
memtable->print_ready_for_flush();
}
@ -724,6 +856,7 @@ void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable)
int ObFreezer::create_memtable_if_no_active_memtable(ObTablet *tablet)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObTabletMemtableMgr *memtable_mgr = nullptr;
memtable::ObMemtable *last_frozen_memtable = nullptr;
const common::ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_;
@ -733,34 +866,34 @@ int ObFreezer::create_memtable_if_no_active_memtable(ObTablet *tablet)
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("[Freezer] not inited", K(ret), K_(is_inited), K_(ls_id), K(tablet_id));
LOG_WARN("[Freezer] not inited", K(ret), K_(is_inited), K(ls_id), K(tablet_id));
} else if (OB_FAIL(get_max_consequent_callbacked_scn(max_callbacked_scn))) {
LOG_WARN("[Freezer] fail to get max_consequent_callbacked_scn", K(ret), K_(ls_id), K(tablet_id));
LOG_WARN("[Freezer] fail to get max_consequent_callbacked_scn", K(ret), K(ls_id), K(tablet_id));
} else if (max_callbacked_scn < clog_checkpoint_scn) {
ret = OB_NO_NEED_UPDATE;
LOG_WARN("[Freezer] cannot create memtable because max_callbacked_scn < clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id));
} else if (OB_ISNULL(memtable_mgr = static_cast<ObTabletMemtableMgr *>(tablet->get_memtable_mgr()))) {
LOG_WARN("[Freezer] memtable mgr should not be null", K(ret), K_(ls_id), K(tablet_id));
LOG_WARN("[Freezer] memtable mgr should not be null", K(ret), K(ls_id), K(tablet_id));
} else if (memtable_mgr->has_active_memtable()) {
LOG_INFO("[Freezer] no need to create an active memtable", K(ret), K_(ls_id), K(tablet_id));
LOG_INFO("[Freezer] no need to create an active memtable", K(ret), K(ls_id), K(tablet_id));
} else { // create a new memtable since there is no active memtable
// get schema_version
if (OB_NOT_NULL(last_frozen_memtable = memtable_mgr->get_last_frozen_memtable())) {
schema_version = last_frozen_memtable->get_max_schema_version();
} else if (OB_FAIL(tablet->get_schema_version_from_storage_schema(schema_version))) {
LOG_WARN("[Freezer] failed to get schema version", K(ret), K_(ls_id), K(tablet_id));
LOG_WARN("[Freezer] failed to get schema version", K(ret), K(ls_id), K(tablet_id));
} else {
//do nothing
}
// create new memtable
if (OB_SUCC(ret)) {
if (OB_FAIL(ls_tablet_svr_->create_memtable(tablet_id, schema_version))) {
if (OB_FAIL(get_ls_tablet_svr()->create_memtable(tablet_id, schema_version))) {
if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
LOG_WARN("[Freezer] failed to create memtable", K(ret), K_(ls_id), K(tablet_id),
LOG_WARN("[Freezer] failed to create memtable", K(ret), K(ls_id), K(tablet_id),
K(schema_version));
}
} else {
LOG_INFO("[Freezer] succeed to create new active memtable", K(ret), K_(ls_id),
LOG_INFO("[Freezer] succeed to create new active memtable", K(ret), K(ls_id),
K(tablet_id), K(schema_version));
}
}
@ -773,6 +906,7 @@ int ObFreezer::create_memtable_if_no_active_memtable(ObTablet *tablet)
int ObFreezer::loop_set_freeze_flag()
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
const int64_t start = ObTimeUtility::current_time();
do {
@ -780,7 +914,7 @@ int ObFreezer::loop_set_freeze_flag()
if (OB_FAIL(set_freeze_flag_without_inc_freeze_clock())) {
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 3 * 1000 * 1000) {
TRANS_LOG(WARN, "[Freezer] wait the running freeze too long time", K_(ls_id),
TRANS_LOG(WARN, "[Freezer] wait the running freeze too long time", K(ls_id),
K(cost_time));
break;
}
@ -794,6 +928,7 @@ int ObFreezer::loop_set_freeze_flag()
int ObFreezer::set_freeze_flag_without_inc_freeze_clock()
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
uint32_t old_v = 0;
uint32_t new_v = 0;
@ -801,7 +936,7 @@ int ObFreezer::set_freeze_flag_without_inc_freeze_clock()
old_v = ATOMIC_LOAD(&freeze_flag_);
if (is_freeze(old_v)) {
ret = OB_ENTRY_EXIST;
TRANS_LOG(WARN, "[Freezer] freeze is running!", K(ret), K_(ls_id));
TRANS_LOG(WARN, "[Freezer] freeze is running!", K(ret), K(ls_id));
break;
}
new_v = old_v | (1 << 31);
@ -813,6 +948,7 @@ int ObFreezer::set_freeze_flag_without_inc_freeze_clock()
int ObFreezer::set_freeze_flag()
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
uint32_t old_v = 0;
uint32_t new_v = 0;
@ -823,7 +959,7 @@ int ObFreezer::set_freeze_flag()
old_v = ATOMIC_LOAD(&freeze_flag_);
if (is_freeze(old_v)) {
ret = OB_ENTRY_EXIST;
TRANS_LOG(WARN, "[Freezer] freeze is running!", K(ret), K_(ls_id));
TRANS_LOG(WARN, "[Freezer] freeze is running!", K(ret), K(ls_id));
break;
}
new_v = (old_v + 1) | (1 << 31);
@ -835,6 +971,7 @@ int ObFreezer::set_freeze_flag()
int ObFreezer::inc_freeze_clock()
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
uint32_t old_v = 0;
uint32_t new_v = 0;
@ -845,7 +982,7 @@ int ObFreezer::inc_freeze_clock()
if (!is_freeze(old_v)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "[Freezer] cannot inc freeze clock without setting freeze flag!",
K(ret), K_(ls_id));
K(ret), K(ls_id));
break;
}
new_v = old_v + 1;
@ -911,41 +1048,44 @@ bool ObFreezer::is_freeze(uint32_t freeze_flag) const
int ObFreezer::decide_max_decided_scn(SCN &max_decided_scn)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id));
} else if (OB_FAIL(loghandler_->get_max_decided_scn(max_decided_scn))) {
LOG_WARN("[Freezer] not inited", K(ret), K(ls_id));
} else if (OB_FAIL(get_ls_log_handler()->get_max_decided_scn(max_decided_scn))) {
if (OB_STATE_NOT_MATCH == ret) {
max_decided_scn.reset();
ret = OB_SUCCESS;
} else {
TRANS_LOG(WARN, "[Freezer] fail to get max_decided_scn", K(ret), K_(ls_id),
TRANS_LOG(WARN, "[Freezer] fail to get max_decided_scn", K(ret), K(ls_id),
K(max_decided_scn));
}
}
if (OB_SUCC(ret)) {
TRANS_LOG(TRACE, "[Freezer] decide max decided log ts", K(ret), K_(ls_id), K(max_decided_scn));
TRANS_LOG(TRACE, "[Freezer] decide max decided log ts", K(ret), K(ls_id), K(max_decided_scn));
}
return ret;
}
int ObFreezer::get_max_consequent_callbacked_scn(SCN &max_consequent_callbacked_scn)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id));
} else if (OB_FAIL(loghandler_->get_max_decided_scn(max_consequent_callbacked_scn))) {
LOG_WARN("[Freezer] not inited", K(ret), K(ls_id));
} else if (OB_FAIL(get_ls_log_handler()->get_max_decided_scn(max_consequent_callbacked_scn))) {
if (OB_STATE_NOT_MATCH == ret) {
max_consequent_callbacked_scn.set_min();
ret = OB_SUCCESS;
} else {
TRANS_LOG(WARN, "[Freezer] fail to get min_unreplay_scn", K(ret), K_(ls_id), K(max_consequent_callbacked_scn));
TRANS_LOG(WARN, "[Freezer] fail to get min_unreplay_scn", K(ret), K(ls_id), K(max_consequent_callbacked_scn));
}
} else {
TRANS_LOG(TRACE, "[Freezer] get_max_decided_scn", K(ret), K_(ls_id), K(max_consequent_callbacked_scn));
TRANS_LOG(TRACE, "[Freezer] get_max_decided_scn", K(ret), K(ls_id), K(max_consequent_callbacked_scn));
}
return ret;
}
@ -953,14 +1093,15 @@ int ObFreezer::get_max_consequent_callbacked_scn(SCN &max_consequent_callbacked_
int ObFreezer::get_ls_weak_read_scn(SCN &weak_read_scn)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
weak_read_scn.reset();
if (OB_ISNULL(ls_wrs_handler_)) {
ret = OB_ENTRY_NOT_EXIST;
TRANS_LOG(WARN, "[Freezer] service should not be null", K(ret), K_(ls_id));
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "[Freezer] not inited", K(ret));
} else {
weak_read_scn = ls_wrs_handler_->get_ls_weak_read_ts();
TRANS_LOG(TRACE, "[Freezer] get_ls_weak_read_scn", K(ret), K_(ls_id), K(weak_read_scn));
weak_read_scn = get_ls_wrs_handler()->get_ls_weak_read_ts();
TRANS_LOG(TRACE, "[Freezer] get_ls_weak_read_scn", K(ret), K(ls_id), K(weak_read_scn));
}
return ret;
@ -970,18 +1111,19 @@ int ObFreezer::get_newest_clog_checkpoint_scn(const ObTabletID &tablet_id,
SCN &clog_checkpoint_scn)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObTabletHandle handle;
clog_checkpoint_scn.reset();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "[Freezer] not inited", K(ret));
} else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id,
} else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id,
handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K(ls_id), K(tablet_id));
} else {
clog_checkpoint_scn = handle.get_obj()->get_tablet_meta().clog_checkpoint_scn_;
TRANS_LOG(TRACE, "[Freezer] get_newest_clog_checkpoint_scn", K(ret), K_(ls_id), K(tablet_id),
TRANS_LOG(TRACE, "[Freezer] get_newest_clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id),
K(clog_checkpoint_scn));
}
@ -992,19 +1134,20 @@ int ObFreezer::get_newest_snapshot_version(const ObTabletID &tablet_id,
SCN &snapshot_version)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObTabletHandle handle;
snapshot_version.reset();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "[Freezer] not inited", K(ret));
} else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id,
} else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id,
handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) {
TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(snapshot_version.convert_for_tx(handle.get_obj()->get_snapshot_version()))) {
TRANS_LOG(WARN, "[Freezer] fail to convert from ts", K(ret), K_(ls_id), K(tablet_id));
TRANS_LOG(WARN, "[Freezer] fail to convert from ts", K(ret), K(ls_id), K(tablet_id));
} else {
TRANS_LOG(TRACE, "[Freezer] get_snapshot_version", K(ret), K_(ls_id), K(tablet_id), K(snapshot_version));
TRANS_LOG(TRACE, "[Freezer] get_snapshot_version", K(ret), K(ls_id), K(tablet_id), K(snapshot_version));
}
return ret;
@ -1029,7 +1172,7 @@ void ObFreezer::print_freezer_statistics()
{
// print every 10s
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
TRANS_LOG(INFO, "[Freezer] empty table statistics: ", K_(ls_id), K(get_empty_memtable_cnt()));
TRANS_LOG(INFO, "[Freezer] empty table statistics: ", K(ls_id), K(get_empty_memtable_cnt()));
clear_empty_memtable_cnt();
}
}

View File

@ -22,6 +22,7 @@
#include "storage/checkpoint/ob_freeze_checkpoint.h"
#include "logservice/ob_log_handler.h"
#include "lib/container/ob_array_serialization.h"
#include "share/ob_occam_thread_pool.h"
namespace oceanbase
{
@ -183,27 +184,10 @@ public:
public:
ObFreezer();
ObFreezer(ObLSWRSHandler *ls_loop_worker,
ObLSTxService *ls_tx_svr,
ObLSTabletService *ls_tablet_svr,
checkpoint::ObDataCheckpoint *data_checkpoint,
logservice::ObILogHandler *ob_loghandler,
const share::ObLSID &ls_id);
ObFreezer(ObLS *ls);
~ObFreezer();
int init(ObLSWRSHandler *ls_loop_worker,
ObLSTxService *ls_tx_svr,
ObLSTabletService *ls_tablet_svr,
checkpoint::ObDataCheckpoint *data_checkpoint,
logservice::ObILogHandler *ob_loghandler,
const share::ObLSID &ls_id);
void set(ObLSWRSHandler *ls_loop_worker,
ObLSTxService *ls_tx_svr,
ObLSTabletService *ls_tablet_svr,
checkpoint::ObDataCheckpoint *data_checkpoint,
logservice::ObILogHandler *ob_loghandler,
const share::ObLSID &ls_id,
uint32_t freeze_flag = 0);
int init(ObLS *ls);
void reset();
void offline() { enable_ = false; }
void online() { enable_ = true; }
@ -222,10 +206,12 @@ public:
uint32_t get_freeze_clock() { return ATOMIC_LOAD(&freeze_flag_) & (~(1 << 31)); }
/* ls info */
share::ObLSID &get_ls_id() { return ls_id_; }
checkpoint::ObDataCheckpoint *get_data_checkpoint() { return data_checkpoint_; }
ObLSTxService *get_ls_tx_svr() { return ls_tx_svr_; }
ObLSTabletService *get_ls_tablet_svr() { return ls_tablet_svr_; }
share::ObLSID get_ls_id();
checkpoint::ObDataCheckpoint *get_ls_data_checkpoint();
ObLSTxService *get_ls_tx_svr();
ObLSTabletService *get_ls_tablet_svr();
logservice::ObILogHandler *get_ls_log_handler();
ObLSWRSHandler *get_ls_wrs_handler();
/* freeze_snapshot_version */
share::SCN get_freeze_snapshot_version() { return freeze_snapshot_version_; }
@ -285,15 +271,19 @@ private:
/* inner subfunctions for freeze process */
int inner_logstream_freeze();
int submit_log_for_freeze();
void ls_freeze_task();
int tablet_freeze_task(memtable::ObIMemtable *imemtable);
int submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imemtable = nullptr);
void wait_memtable_ready_for_flush(memtable::ObMemtable *memtable);
int wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable);
int handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable);
int create_memtable_if_no_active_memtable(ObTablet *tablet);
int try_set_tablet_freeze_begin_();
void set_tablet_freeze_begin_();
void set_tablet_freeze_end_();
void set_ls_freeze_begin_();
void set_ls_freeze_end_();
int check_ls_state(); // must be used under the protection of ls_lock
private:
// flag whether the logsteram is freezing
// the first bit: 1, freeze; 0, not freeze
@ -307,14 +297,8 @@ private:
// log ts before which will be smaller than the log ts in the latter memtables
share::SCN max_decided_scn_;
ObLSWRSHandler *ls_wrs_handler_;
ObLSTxService *ls_tx_svr_;
ObLSTabletService *ls_tablet_svr_;
checkpoint::ObDataCheckpoint *data_checkpoint_;
logservice::ObILogHandler *loghandler_;
share::ObLSID ls_id_;
ObLS *ls_;
ObFreezerStat stat_;
int64_t empty_memtable_cnt_;
// make sure ls freeze has higher priority than tablet freeze

View File

@ -59,7 +59,7 @@ const uint64_t ObLS::INNER_TABLET_ID_LIST[TOTAL_INNER_TABLET_NUM] = {
ObLS::ObLS()
: ls_tx_svr_(this),
replay_handler_(this),
ls_freezer_(&ls_wrs_handler_, &ls_tx_svr_, &ls_tablet_svr_, &data_checkpoint_, &log_handler_, ls_meta_.ls_id_),
ls_freezer_(this),
ls_sync_tablet_seq_handler_(),
ls_ddl_log_handler_(),
is_inited_(false),
@ -113,7 +113,7 @@ int ObLS::init(const share::ObLSID &ls_id,
LOG_WARN("failed to init ls meta", K(ret), K(tenant_id), K(ls_id), K(replica_type));
} else {
rs_reporter_ = reporter;
ls_freezer_.init(&ls_wrs_handler_, &ls_tx_svr_, &ls_tablet_svr_, &data_checkpoint_, &log_handler_, ls_meta_.ls_id_);
ls_freezer_.init(this);
transaction::ObTxPalfParam tx_palf_param(get_log_handler());
// tx_table_.init() should after ls_table_svr.init()

View File

@ -118,6 +118,8 @@ class ObLS : public common::ObLink
{
public:
friend ObLSLockGuard;
friend class ObFreezer;
friend class checkpoint::ObDataCheckpoint;
public:
static constexpr int64_t TOTAL_INNER_TABLET_NUM = 3;
static const uint64_t INNER_TABLET_ID_LIST[TOTAL_INNER_TABLET_NUM];

View File

@ -241,7 +241,7 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
if (OB_FAIL(add_memtable_(memtable_handle))) {
LOG_WARN("failed to add memtable", K(ret), K(ls_id), K(tablet_id_), K(memtable_handle));
} else if (FALSE_IT(time_guard.click("add memtable"))) {
} else if (OB_FAIL(memtable->add_to_data_checkpoint(freezer_->get_data_checkpoint()))) {
} else if (OB_FAIL(memtable->add_to_data_checkpoint(freezer_->get_ls_data_checkpoint()))) {
LOG_WARN("add to data_checkpoint failed", K(ret), K(ls_id), KPC(memtable));
clean_tail_memtable_();
} else if (FALSE_IT(time_guard.click("add to data_checkpoint"))) {

View File

@ -41,6 +41,8 @@ ObTenantFreezer::ObTenantFreezer()
rs_mgr_(nullptr),
config_(nullptr),
allocator_mgr_(nullptr),
freeze_thread_pool_(),
freeze_thread_pool_lock_(common::ObLatchIds::FREEZE_THREAD_POOL_LOCK),
exist_ls_freezing_(false),
last_update_ts_(0)
{}
@ -87,6 +89,8 @@ int ObTenantFreezer::init()
K(GCONF.self_addr_));
} else if (OB_FAIL(freeze_trigger_pool_.init_and_start(FREEZE_TRIGGER_THREAD_NUM))) {
LOG_WARN("[TenantFreezer] fail to initialize freeze trigger pool", KR(ret));
} else if (OB_FAIL(freeze_thread_pool_.init_and_start(FREEZE_THREAD_NUM))) {
LOG_WARN("[TenantFreezer] fail to initialize freeze thread pool", KR(ret));
} else if (OB_FAIL(freeze_trigger_timer_.init_and_start(freeze_trigger_pool_,
TIME_WHEEL_PRECISION,
"FrzTrigger"))) {

View File

@ -37,9 +37,11 @@ class ObTenantTxDataFreezeGuard;
class ObTenantFreezer
{
friend ObTenantTxDataFreezeGuard;
friend class ObFreezer;
const static int64_t TIME_WHEEL_PRECISION = 100_ms;
const static int64_t SLOW_FREEZE_INTERVAL = 30_s;
const static int FREEZE_TRIGGER_THREAD_NUM= 1;
const static int FREEZE_THREAD_NUM= 5;
const static int64_t FREEZE_TRIGGER_INTERVAL = 2_s;
const static int64_t UPDATE_INTERVAL = 100_ms;
// replay use 1G/s
@ -172,6 +174,8 @@ private:
common::ObOccamThreadPool freeze_trigger_pool_;
common::ObOccamTimer freeze_trigger_timer_;
common::ObOccamTimerTaskRAIIHandle timer_handle_;
common::ObOccamThreadPool freeze_thread_pool_;
ObSpinLock freeze_thread_pool_lock_;
bool exist_ls_freezing_;
int64_t last_update_ts_;
};

View File

@ -29,6 +29,7 @@
#include "storage/tx/ob_trans_define_v4.h"
#include "storage/memtable/mvcc/ob_mvcc_row.h"
#include "share/scn.h"
#include "storage/ls/ob_ls.h"
namespace oceanbase
{
@ -98,7 +99,7 @@ int ObMvccRow::check_double_insert_(const share::SCN ,
class TestMemtable : public testing::Test
{
public:
TestMemtable() : tenant_base_(1001),tablet_id_(1000),rowkey_cnt_(1) {}
TestMemtable() : tenant_base_(1001),tablet_id_(1000),rowkey_cnt_(1) { freezer_.init(&ls_); }
void SetUp() override {
share::ObTenantEnv::set_tenant(&tenant_base_);
// mock columns
@ -177,6 +178,7 @@ public:
read_info_.reset();
}
public:
ObLS ls_;
share::ObTenantBase tenant_base_;
storage::ObFreezer freezer_;
storage::ObTabletMemtableMgr memtable_mgr_;

View File

@ -302,7 +302,7 @@ int TestCompactionPolicy::mock_memtable(
LOG_WARN("failed to init memtable", K(ret));
} else if (OB_FAIL(mt_mgr->add_memtable_(table_handle))) {
LOG_WARN("failed to add memtable to mgr", K(ret));
} else if (OB_FAIL(memtable->add_to_data_checkpoint(mt_mgr->freezer_->get_data_checkpoint()))) {
} else if (OB_FAIL(memtable->add_to_data_checkpoint(mt_mgr->freezer_->get_ls_data_checkpoint()))) {
LOG_WARN("add to data_checkpoint failed", K(ret), KPC(memtable));
mt_mgr->clean_tail_memtable_();
} else if (OB_MAX_SCN_TS_NS != end_border) { // frozen memtable

View File

@ -65,12 +65,7 @@ public:
tablet_id_(LS_TX_DATA_TABLET),
ls_id_(1),
tenant_id_(1001),
freezer_((ObLSWRSHandler *)(0x1),
(ObLSTxService *)(0x1),
(ObLSTabletService *)(0x1),
(checkpoint::ObDataCheckpoint *)(0x1),
(logservice::ObILogHandler *)(0x1),
ls_id_),
freezer_(&ls_),
t3m_(common::OB_SERVER_TENANT_ID),
mt_mgr_(nullptr),
ctx_mt_mgr_(nullptr),
@ -95,7 +90,7 @@ protected:
virtual void SetUp() override
{
ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01));
freezer_.init(&ls_loop_worker_, &ls_tx_service_, &ls_tablet_service_, &ls_data_checkpoint_, &log_handler_, ls_id_);
freezer_.init(&ls_);
EXPECT_EQ(OB_SUCCESS, t3m_.init());
EXPECT_EQ(OB_SUCCESS,
ls_tx_ctx_mgr_.init(tenant_id_, /*tenant_id*/