replace log_ts by SCN in checkpoint

This commit is contained in:
obdev 2022-11-28 01:52:02 +00:00 committed by ob-robot
parent 92c51091d7
commit c4c13f6296
50 changed files with 285 additions and 328 deletions

View File

@ -430,22 +430,12 @@ int ObGCHandler::replay(const void *buffer,
return ret;
}
void ObGCHandler::get_rec_log_scn(SCN &scn)
palf::SCN ObGCHandler::get_rec_scn()
{
scn.set_max();
return palf::SCN::max_scn();
}
int64_t ObGCHandler::get_rec_log_ts()
{
return INT64_MAX;
}
int ObGCHandler::flush(const SCN &scn)
{
// do nothing
return OB_SUCCESS;
}
int ObGCHandler::flush(int64_t rec_log_ts)
int ObGCHandler::flush(SCN &scn)
{
// do nothing
return OB_SUCCESS;

View File

@ -207,10 +207,8 @@ public:
virtual int resume_leader() override;
// for checkpoint
virtual int64_t get_rec_log_ts() override;
virtual void get_rec_log_scn(palf::SCN &scn);
virtual int flush(int64_t rec_log_ts) override;
virtual int flush(const palf::SCN &scn) ;
virtual palf::SCN get_rec_scn() override;
virtual int flush(palf::SCN &scn) override;
TO_STRING_KV(K(is_inited_),
K(gc_seq_invalid_member_));

View File

@ -158,8 +158,8 @@ public:
class ObICheckpointSubHandler
{
public:
virtual int64_t get_rec_log_ts() = 0;
virtual int flush(int64_t rec_log_ts) = 0;
virtual palf::SCN get_rec_scn() = 0;
virtual int flush(palf::SCN &scn) = 0;
};
#define REGISTER_TO_LOGSERVICE(type, subhandler) \

View File

@ -202,7 +202,7 @@ int ObAllVirtualCheckpointInfo::process_curr_tenant(ObNewRow *&row)
}
case OB_APP_MIN_COLUMN_ID + 5: {
//TODO:SCN
cur_row_.cells_[i].set_uint64(checkpoint.rec_log_ts < 0 ? 0 : checkpoint.rec_log_ts);
cur_row_.cells_[i].set_uint64(checkpoint.rec_scn.is_valid() ? checkpoint.rec_scn.get_val_for_inner_table_field() : 0);
break;
}
default:

View File

@ -189,7 +189,7 @@ int ObAllVirtualTransCheckpointInfo::process_curr_tenant(ObNewRow *&row)
break;
}
case OB_APP_MIN_COLUMN_ID + 5: {
cur_row_.cells_[i].set_uint64((common_checkpoint.rec_log_ts < 0 ? 0 : common_checkpoint.rec_log_ts));
cur_row_.cells_[i].set_uint64((common_checkpoint.rec_scn.is_valid() ? common_checkpoint.rec_scn.get_val_for_inner_table_field() : 0));
break;
}
case OB_APP_MIN_COLUMN_ID + 6:

View File

@ -190,7 +190,7 @@ int ObAllVirtualFreezeCheckpointInfo::process_curr_tenant(ObNewRow *&row)
break;
}
case OB_APP_MIN_COLUMN_ID + 5: {
cur_row_.cells_[i].set_uint64(freeze_checkpoint.rec_log_ts < 0 ? 0 : freeze_checkpoint.rec_log_ts);
cur_row_.cells_[i].set_uint64(freeze_checkpoint.rec_scn.is_valid() ? freeze_checkpoint.rec_scn.get_val_for_inner_table_field() : 0);
break;
}
case OB_APP_MIN_COLUMN_ID + 6:
@ -205,7 +205,7 @@ int ObAllVirtualFreezeCheckpointInfo::process_curr_tenant(ObNewRow *&row)
}
break;
case OB_APP_MIN_COLUMN_ID + 7:
cur_row_.cells_[i].set_int(freeze_checkpoint.rec_log_ts_is_stable ? 1 : 0);
cur_row_.cells_[i].set_int(freeze_checkpoint.rec_scn_is_stable ? 1 : 0);
break;
default:
ret = OB_ERR_UNEXPECTED;

View File

@ -38,21 +38,12 @@ public:
int init(uint64_t tenant_id);
// clog checkpoint, do nothing
int flush(int64_t rec_log_ts)
{
UNUSED(rec_log_ts);
return OB_SUCCESS;
}
int flush(palf::SCN &rec_scn)
{
UNUSED(rec_scn);
return OB_SUCCESS;
}
int64_t get_rec_log_ts() { return INT64_MAX; }
palf::SCN get_rec_log_scn() { return palf::SCN::max_scn(); }
palf::SCN get_rec_scn() override { return palf::SCN::max_scn(); }
// for replay, do nothing
int replay(const void *buffer,

View File

@ -320,11 +320,8 @@ public:
virtual void do_work() override;
public:
//TODO(SCN)yaoying.yyy: get_rec_log_scn() and flush() override
virtual int64_t get_rec_log_ts() override { return INT64_MAX;}
virtual const palf::SCN get_rec_log_scn(){ return palf::SCN::max_scn();}
virtual int flush(int64_t rec_log_ts) override { return OB_SUCCESS; }
virtual int flush(palf::SCN &scn) { return OB_SUCCESS; }
virtual palf::SCN get_rec_scn() override { return palf::SCN::max_scn();}
virtual int flush(palf::SCN &scn) override { return OB_SUCCESS; }
int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const palf::SCN &scn) override
{
UNUSED(buffer);

View File

@ -57,8 +57,8 @@ public:
virtual void do_work() override;
void destroy();
public:
virtual int64_t get_rec_log_ts() override { return INT64_MAX;}
virtual int flush(int64_t rec_log_ts) override { return OB_SUCCESS; }
virtual palf::SCN get_rec_scn() override { return palf::SCN::max_scn();}
virtual int flush(palf::SCN &rec_scn) override { return OB_SUCCESS; }
int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const palf::SCN &scn) override
{
UNUSED(buffer);

View File

@ -153,15 +153,15 @@ public:
}
// for checkpoint, do nothing
int64_t get_rec_log_ts() override final
palf::SCN get_rec_scn() override final
{
return INT64_MAX;
return palf::SCN::max_scn();;
}
int flush(int64_t rec_log_ts) override final
int flush(palf::SCN &scn) override final
{
int ret = OB_SUCCESS;
UNUSED(rec_log_ts);
UNUSED(scn);
return ret;
}

View File

@ -83,12 +83,14 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob
ObLSMetaPackage ls_meta_package;
int64_t i = 0;
const int64_t start_ts = ObTimeUtility::current_time();
palf::SCN tmp;
tmp.convert_for_lsn_allocator(start_scn);
do {
const int64_t cur_ts = ObTimeUtility::current_time();
if (cur_ts - start_ts > advance_checkpoint_timeout) {
ret = OB_BACKUP_ADVANCE_CHECKPOINT_TIMEOUT;
LOG_WARN("backup advance checkpoint by flush timeout", K(ret), K(tenant_id), K(ls_id), K(start_scn));
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(start_scn))) {
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(tmp))) {
if (OB_NO_NEED_UPDATE == ret) {
// clog checkpoint ts has passed start log ts
ret = OB_SUCCESS;

View File

@ -106,11 +106,11 @@ void ObCheckpointExecutor::offline()
update_checkpoint_enabled_ = false;
}
inline void get_min_rec_log_ts_service_type_by_index_(int index, char* service_type)
inline void get_min_rec_scn_service_type_by_index_(int index, char* service_type)
{
int ret = OB_SUCCESS;
if (index == 0) {
strncpy(service_type ,"MAX_DECIDED_LOG_TS", common::MAX_SERVICE_TYPE_BUF_LENGTH);
strncpy(service_type ,"MAX_DECIDED_SCN", common::MAX_SERVICE_TYPE_BUF_LENGTH);
} else if (OB_FAIL(log_base_type_to_string(ObLogBaseType(index),
service_type,
common::MAX_SERVICE_TYPE_BUF_LENGTH))) {
@ -126,51 +126,52 @@ int ObCheckpointExecutor::update_clog_checkpoint()
if (update_checkpoint_enabled_) {
ObFreezer *freezer = ls_->get_freezer();
if (OB_NOT_NULL(freezer)) {
int64_t checkpoint_ts = INT64_MAX;
if (OB_FAIL(freezer->get_max_consequent_callbacked_log_ts(checkpoint_ts))) {
STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", K(ret), K(freezer->get_ls_id()));
SCN checkpoint_scn;
checkpoint_scn.set_max();
if (OB_FAIL(freezer->get_max_consequent_callbacked_scn(checkpoint_scn))) {
STORAGE_LOG(WARN, "get_max_consequent_callbacked_scn failed", K(ret), K(freezer->get_ls_id()));
} else {
// used to record which handler provide the smallest rec_log_ts
int min_rec_log_ts_service_type_index = 0;
// used to record which handler provide the smallest rec_scn
int min_rec_scn_service_type_index = 0;
char service_type[common::MAX_SERVICE_TYPE_BUF_LENGTH];
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
if (OB_NOT_NULL(handlers_[i])) {
int64_t rec_log_ts = handlers_[i]->get_rec_log_ts();
if (rec_log_ts > 0 && rec_log_ts < checkpoint_ts) {
checkpoint_ts = rec_log_ts;
min_rec_log_ts_service_type_index = i;
SCN rec_scn = handlers_[i]->get_rec_scn();
if (rec_scn.is_valid() && rec_scn < checkpoint_scn) {
checkpoint_scn = rec_scn;
min_rec_scn_service_type_index = i;
}
}
}
get_min_rec_log_ts_service_type_by_index_(min_rec_log_ts_service_type_index, service_type);
get_min_rec_scn_service_type_by_index_(min_rec_scn_service_type_index, service_type);
const int64_t checkpoint_ts_in_ls_meta = ls_->get_clog_checkpoint_ts();
const SCN checkpoint_scn_in_ls_meta = ls_->get_clog_checkpoint_scn();
const share::ObLSID ls_id = ls_->get_ls_id();
LSN clog_checkpoint_lsn;
if (checkpoint_ts == checkpoint_ts_in_ls_meta) {
STORAGE_LOG(INFO, "[CHECKPOINT] clog checkpoint no change", K(checkpoint_ts),
K(checkpoint_ts_in_ls_meta), K(ls_id), K(service_type));
} else if (checkpoint_ts < checkpoint_ts_in_ls_meta) {
if (min_rec_log_ts_service_type_index == 0) {
if (checkpoint_scn == checkpoint_scn_in_ls_meta) {
STORAGE_LOG(INFO, "[CHECKPOINT] clog checkpoint no change", K(checkpoint_scn),
K(checkpoint_scn_in_ls_meta), K(ls_id), K(service_type));
} else if (checkpoint_scn < checkpoint_scn_in_ls_meta) {
if (min_rec_scn_service_type_index == 0) {
STORAGE_LOG(INFO, "[CHECKPOINT] expexted when no log callbacked or replayed",
K(checkpoint_ts), K(checkpoint_ts_in_ls_meta), K(ls_id));
K(checkpoint_scn), K(checkpoint_scn_in_ls_meta), K(ls_id));
} else {
STORAGE_LOG(ERROR, "[CHECKPOINT] can not advance clog checkpoint", K(checkpoint_ts),
K(checkpoint_ts_in_ls_meta), K(ls_id), K(service_type));
STORAGE_LOG(ERROR, "[CHECKPOINT] can not advance clog checkpoint", K(checkpoint_scn),
K(checkpoint_scn_in_ls_meta), K(ls_id), K(service_type));
}
} else if (OB_FAIL(loghandler_->locate_by_ts_ns_coarsely(checkpoint_ts, clog_checkpoint_lsn))) {
} else if (OB_FAIL(loghandler_->locate_by_scn_coarsely(checkpoint_scn, clog_checkpoint_lsn))) {
if (OB_ENTRY_NOT_EXIST == ret) {
STORAGE_LOG(WARN, "no file in disk", K(ret), K(ls_id), K(checkpoint_ts));
STORAGE_LOG(WARN, "no file in disk", K(ret), K(ls_id), K(checkpoint_scn));
ret = OB_SUCCESS;
} else if (OB_NOT_INIT == ret) {
STORAGE_LOG(WARN, "palf has been disabled", K(ret), K(checkpoint_ts), K(ls_->get_ls_id()));
STORAGE_LOG(WARN, "palf has been disabled", K(ret), K(checkpoint_scn), K(ls_->get_ls_id()));
ret = OB_SUCCESS;
} else {
STORAGE_LOG(ERROR, "locate lsn by logts failed", K(ret), K(ls_id),
K(checkpoint_ts), K(checkpoint_ts_in_ls_meta));
K(checkpoint_scn), K(checkpoint_scn_in_ls_meta));
}
} else if (OB_FAIL(ls_->set_clog_checkpoint_without_lock(clog_checkpoint_lsn, checkpoint_ts))) {
STORAGE_LOG(WARN, "set clog checkpoint failed", K(ret), K(clog_checkpoint_lsn), K(checkpoint_ts), K(ls_id));
} else if (OB_FAIL(ls_->set_clog_checkpoint_without_lock(clog_checkpoint_lsn, checkpoint_scn))) {
STORAGE_LOG(WARN, "set clog checkpoint failed", K(ret), K(clog_checkpoint_lsn), K(checkpoint_scn), K(ls_id));
} else if (OB_FAIL(loghandler_->advance_base_lsn(clog_checkpoint_lsn))) {
if (OB_NOT_INIT == ret) {
STORAGE_LOG(WARN, "palf has been disabled", K(ret), K(clog_checkpoint_lsn), K(ls_->get_ls_id()));
@ -181,7 +182,7 @@ int ObCheckpointExecutor::update_clog_checkpoint()
} else {
ATOMIC_STORE(&wait_advance_checkpoint_, false);
FLOG_INFO("[CHECKPOINT] update clog checkpoint successfully",
K(clog_checkpoint_lsn), K(checkpoint_ts), K(ls_id),
K(clog_checkpoint_lsn), K(checkpoint_scn), K(ls_id),
K(service_type));
}
}
@ -195,49 +196,48 @@ int ObCheckpointExecutor::update_clog_checkpoint()
return ret;
}
int ObCheckpointExecutor::advance_checkpoint_by_flush(int64_t recycle_ts) {
int ObCheckpointExecutor::advance_checkpoint_by_flush(SCN recycle_scn) {
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
// calcu recycle_ts according to clog disk situation
if (recycle_ts == 0) {
// calcu recycle_ according to clog disk situation
if (!recycle_scn.is_valid()) {
LSN end_lsn;
int64_t calcu_recycle_ts = INT64_MAX;
if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) {
STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id()));
} else {
LSN clog_checkpoint_lsn = ls_->get_clog_base_lsn();
LSN calcu_recycle_lsn = clog_checkpoint_lsn
+ ((end_lsn - clog_checkpoint_lsn) * CLOG_GC_PERCENT / 100);
if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_ts))) {
STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_ts), K(calcu_recycle_lsn),
K(recycle_ts), K(ls_->get_ls_id()));
if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_scn))) {
STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_lsn),
K(recycle_scn), K(ls_->get_ls_id()));
} else {
STORAGE_LOG(INFO, "advance checkpoint by flush to avoid clog disk full",
K(recycle_ts), K(end_lsn), K(clog_checkpoint_lsn),
K(recycle_scn), K(end_lsn), K(clog_checkpoint_lsn),
K(calcu_recycle_lsn), K(ls_->get_ls_id()));
}
}
// the log of end_log_lsn and the log of clog_checkpoint_lsn may be in a block
if (recycle_ts < ls_->get_clog_checkpoint_ts()) {
recycle_ts = INT64_MAX;
if (recycle_scn < ls_->get_clog_checkpoint_scn()) {
recycle_scn.set_max();
}
}
if (OB_SUCC(ret)) {
if (recycle_ts < ls_->get_clog_checkpoint_ts()) {
if (recycle_scn < ls_->get_clog_checkpoint_scn()) {
ret = OB_NO_NEED_UPDATE;
STORAGE_LOG(WARN, "recycle_ts should not smaller than checkpoint_log_ts",
K(recycle_ts), K(ls_->get_clog_checkpoint_ts()), K(ls_->get_ls_id()));
STORAGE_LOG(WARN, "recycle_scn should not smaller than checkpoint_scn",
K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id()));
} else {
STORAGE_LOG(INFO, "start flush",
K(recycle_ts),
K(ls_->get_clog_checkpoint_ts()),
K(recycle_scn),
K(ls_->get_clog_checkpoint_scn()),
K(ls_->get_ls_id()));
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
if (OB_NOT_NULL(handlers_[i])
&& OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_ts)))) {
STORAGE_LOG(WARN, "handler flush failed", K(recycle_ts), K(tmp_ret),
&& OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_scn)))) {
STORAGE_LOG(WARN, "handler flush failed", K(recycle_scn), K(tmp_ret),
K(i), K(ls_->get_ls_id()));
}
}
@ -254,7 +254,7 @@ int ObCheckpointExecutor::get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &chec
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
if (OB_NOT_NULL(handlers_[i])) {
ObCheckpointVTInfo info;
info.rec_log_ts = handlers_[i]->get_rec_log_ts();
info.rec_scn = handlers_[i]->get_rec_scn();
info.service_type = i;
checkpoint_array.push_back(info);
}
@ -266,13 +266,13 @@ bool ObCheckpointExecutor::need_flush()
{
int ret = OB_SUCCESS;
bool need_flush = false;
int64_t end_log_ts = 0;
if (OB_FAIL(loghandler_->get_end_ts_ns(end_log_ts))) {
STORAGE_LOG(WARN, "get_end_ts_ns failed", K(ret));
} else if (end_log_ts -
ls_->get_clog_checkpoint_ts() > MAX_NEED_REPLAY_CLOG_INTERVAL) {
palf::SCN end_scn;
if (OB_FAIL(loghandler_->get_end_scn(end_scn))) {
STORAGE_LOG(WARN, "get_end_scn failed", K(ret));
} else if (end_scn.convert_to_ts() - ls_->get_clog_checkpoint_scn().convert_to_ts()
> MAX_NEED_REPLAY_CLOG_INTERVAL) {
STORAGE_LOG(INFO, "over max need replay clog interval",
K(end_log_ts), K(ls_->get_clog_checkpoint_ts()));
K(end_scn), K(ls_->get_clog_checkpoint_scn()));
need_flush = true;
}
@ -290,10 +290,10 @@ bool ObCheckpointExecutor::is_wait_advance_checkpoint()
return ATOMIC_LOAD(&wait_advance_checkpoint_);
}
void ObCheckpointExecutor::set_wait_advance_checkpoint(int64_t checkpoint_log_ts)
void ObCheckpointExecutor::set_wait_advance_checkpoint(palf::SCN &checkpoint_scn)
{
ObSpinLockGuard guard(lock_);
if (checkpoint_log_ts == ls_->get_clog_checkpoint_ts()) {
if (checkpoint_scn == ls_->get_clog_checkpoint_scn()) {
ATOMIC_STORE(&wait_advance_checkpoint_, true);
last_set_wait_advance_checkpoint_time_ = ObTimeUtility::current_time();
}

View File

@ -16,9 +16,15 @@
#include "lib/lock/ob_spin_lock.h"
#include "logservice/ob_log_base_type.h"
#include "logservice/ob_log_handler.h"
#include "logservice/palf/scn.h"
namespace oceanbase
{
namespace palf
{
class SCN;
}
namespace storage
{
class ObLS;
@ -27,11 +33,11 @@ namespace checkpoint
struct ObCheckpointVTInfo
{
int64_t rec_log_ts;
palf::SCN rec_scn;
int service_type;
TO_STRING_KV(
K(rec_log_ts),
K(rec_scn),
K(service_type)
);
};
@ -56,8 +62,8 @@ public:
int update_clog_checkpoint();
// the service will flush and advance checkpoint
// after flush, checkpoint_log_ts will be equal or greater than recycle_ts
int advance_checkpoint_by_flush(int64_t recycle_ts = 0);
// after flush, checkpoint_scn will be equal or greater than recycle_scn
int advance_checkpoint_by_flush(palf::SCN recycle_scn);
// for __all_virtual_checkpoint
int get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &checkpoint_array);
@ -67,13 +73,13 @@ public:
bool is_wait_advance_checkpoint();
void set_wait_advance_checkpoint(int64_t checkpoint_log_ts);
void set_wait_advance_checkpoint(palf::SCN &checkpoint_scn);
int64_t get_cannot_recycle_log_size();
private:
static const int64_t CLOG_GC_PERCENT = 60;
static const int64_t MAX_NEED_REPLAY_CLOG_INTERVAL = (int64_t)60 * 60 * 1000 * 1000 * 1000; //ns
static const int64_t MAX_NEED_REPLAY_CLOG_INTERVAL = (int64_t)60 * 60 * 1000 * 1000; //us
ObLS *ls_;
logservice::ObILogHandler *loghandler_;

View File

@ -66,13 +66,13 @@ int common_checkpoint_type_to_string(const ObCommonCheckpointType common_checkpo
struct ObCommonCheckpointVTInfo
{
ObTabletID tablet_id;
int64_t rec_log_ts;
palf::SCN rec_scn;
int checkpoint_type;
bool is_flushing;
TO_STRING_KV(
K(tablet_id),
K(rec_log_ts),
K(rec_scn),
K(checkpoint_type),
K(is_flushing)
);
@ -87,14 +87,12 @@ inline bool is_valid_log_base_type(const ObCommonCheckpointType &type)
// and register into ls_tx_service's common_list
// the checkpoint units:
// 1. write TRANS_SERVICE_LOG_BASE_TYPE clog
// 2. have no freeze operation and rec_log_ts can't become smaller
// 2. have no freeze operation and rec_scn can't become smaller
class ObCommonCheckpoint
{
public:
virtual int64_t get_rec_log_ts() = 0;
virtual palf::SCN get_rec_scn() { return palf::SCN::min_scn(); }
virtual int flush(int64_t recycle_log_ts, bool need_freeze = true) = 0;
virtual int flush(palf::SCN recycle_scn, bool need_freeze = true) { return OB_NOT_SUPPORTED; }
virtual palf::SCN get_rec_scn() = 0;
virtual int flush(palf::SCN recycle_scn, bool need_freeze = true) = 0;
virtual ObTabletID get_tablet_id() const = 0;
virtual bool is_flushing() const = 0;

View File

@ -16,6 +16,7 @@
namespace oceanbase
{
using namespace palf;
namespace storage
{
namespace checkpoint
@ -57,7 +58,7 @@ int ObCheckpointDList::insert(ObFreezeCheckpoint *ob_freeze_checkpoint, bool ord
{
int ret = OB_SUCCESS;
if (ordered) {
ObFreezeCheckpoint *next = get_first_greater(ob_freeze_checkpoint->get_rec_log_ts());
ObFreezeCheckpoint *next = get_first_greater(ob_freeze_checkpoint->get_rec_scn());
if (!checkpoint_list_.add_before(next, ob_freeze_checkpoint)) {
STORAGE_LOG(ERROR, "add_before failed");
ret = OB_ERR_UNEXPECTED;
@ -81,39 +82,39 @@ void ObCheckpointDList::get_iterator(ObCheckpointIterator &iterator)
iterator.init(this);
}
int64_t ObCheckpointDList::get_min_rec_log_ts_in_list(bool ordered)
SCN ObCheckpointDList::get_min_rec_scn_in_list(bool ordered)
{
int64_t min_rec_log_ts = INT64_MAX;
SCN min_rec_scn = SCN::max_scn();
if (!checkpoint_list_.is_empty()) {
ObFreezeCheckpoint *freeze_checkpoint = nullptr;
if (ordered) {
min_rec_log_ts = checkpoint_list_.get_first()->get_rec_log_ts();
min_rec_scn = checkpoint_list_.get_first()->get_rec_scn();
freeze_checkpoint = checkpoint_list_.get_first();
} else {
auto *header = checkpoint_list_.get_header();
auto *cur = header->get_next();
while (cur != header) {
if (cur->get_rec_log_ts() < min_rec_log_ts) {
min_rec_log_ts = cur->get_rec_log_ts();
if (cur->get_rec_scn() < min_rec_scn) {
min_rec_scn = cur->get_rec_scn();
freeze_checkpoint = cur;
}
cur = cur->get_next();
}
}
if (OB_NOT_NULL(freeze_checkpoint)) {
STORAGE_LOG(DEBUG, "[CHECKPOINT] get_min_rec_log_ts_in_list", K(min_rec_log_ts),
K(*freeze_checkpoint));
STORAGE_LOG(DEBUG, "[CHECKPOINT] get_min_rec_scn_in_list", K(min_rec_scn),
K(*freeze_checkpoint));
}
}
return min_rec_log_ts;
return min_rec_scn;
}
ObFreezeCheckpoint *ObCheckpointDList::get_first_greater(const int64_t rec_log_ts)
ObFreezeCheckpoint *ObCheckpointDList::get_first_greater(const SCN rec_scn)
{
auto *cur = checkpoint_list_.get_header();
if (!checkpoint_list_.is_empty()) {
auto *prev = cur->get_prev();
while (prev != checkpoint_list_.get_header() && prev->get_rec_log_ts() > rec_log_ts) {
while (prev != checkpoint_list_.get_header() && prev->get_rec_scn() > rec_scn) {
cur = prev;
prev = cur->get_prev();
}
@ -131,8 +132,8 @@ int ObCheckpointDList::get_freezecheckpoint_info(
auto ob_freeze_checkpoint = iterator.get_next();
ObFreezeCheckpointVTInfo info;
info.tablet_id = ob_freeze_checkpoint->get_tablet_id().id();
info.rec_log_ts = ob_freeze_checkpoint->get_rec_log_ts();
info.rec_log_ts_is_stable = ob_freeze_checkpoint->rec_log_ts_is_stable();
info.rec_scn = ob_freeze_checkpoint->get_rec_scn();
info.rec_scn_is_stable = ob_freeze_checkpoint->rec_scn_is_stable();
info.location = ob_freeze_checkpoint->location_;
freeze_checkpoint_array.push_back(info);
}
@ -194,36 +195,35 @@ int ObDataCheckpoint::safe_to_destroy()
return ret;
}
int64_t ObDataCheckpoint::get_rec_log_ts()
SCN ObDataCheckpoint::get_rec_scn()
{
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObSpinLockGuard guard(lock_);
int ret = OB_SUCCESS;
int64_t min_rec_log_ts = INT64_MAX;
int64_t tmp = INT64_MAX;
SCN min_rec_scn = SCN::max_scn();
SCN tmp = SCN::max_scn();
// memtable and tx_data_memtable
if ((tmp = new_create_list_.get_min_rec_log_ts_in_list(false)) < min_rec_log_ts && tmp != 0) {
min_rec_log_ts = tmp;
if ((tmp = new_create_list_.get_min_rec_scn_in_list(false)) < min_rec_scn) {
min_rec_scn = tmp;
}
if ((tmp = active_list_.get_min_rec_log_ts_in_list()) < min_rec_log_ts && tmp != 0) {
min_rec_log_ts = tmp;
if ((tmp = active_list_.get_min_rec_scn_in_list()) < min_rec_scn) {
min_rec_scn = tmp;
}
if ((tmp = ls_frozen_list_.get_min_rec_log_ts_in_list()) < min_rec_log_ts && tmp != 0) {
min_rec_log_ts = tmp;
if ((tmp = ls_frozen_list_.get_min_rec_scn_in_list()) < min_rec_scn) {
min_rec_scn = tmp;
}
if ((tmp = prepare_list_.get_min_rec_log_ts_in_list()) < min_rec_log_ts && tmp != 0) {
min_rec_log_ts = tmp;
if ((tmp = prepare_list_.get_min_rec_scn_in_list()) < min_rec_scn) {
min_rec_scn = tmp;
}
return min_rec_log_ts;
return min_rec_scn;
}
int ObDataCheckpoint::flush(int64_t recycle_log_ts, bool need_freeze)
int ObDataCheckpoint::flush(SCN recycle_scn, bool need_freeze)
{
int ret = OB_SUCCESS;
if (need_freeze) {
if (get_rec_log_ts() <= recycle_log_ts) {
if (get_rec_scn() <= recycle_scn) {
if (OB_FAIL(ls_->logstream_freeze())) {
STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id()));
}
@ -235,12 +235,12 @@ int ObDataCheckpoint::flush(int64_t recycle_log_ts, bool need_freeze)
return ret;
}
int ObDataCheckpoint::ls_freeze(int64_t rec_log_ts)
int ObDataCheckpoint::ls_freeze(palf::SCN rec_scn)
{
int ret = OB_SUCCESS;
ObCheckPointService *checkpoint_srv = MTL(ObCheckPointService *);
set_ls_freeze_finished_(false);
if (OB_FAIL(checkpoint_srv->add_ls_freeze_task(this, rec_log_ts))) {
if (OB_FAIL(checkpoint_srv->add_ls_freeze_task(this, rec_scn))) {
STORAGE_LOG(ERROR, "ls_freeze add task failed", K(ret));
set_ls_freeze_finished_(true);
}
@ -290,7 +290,7 @@ void ObDataCheckpoint::print_list_(ObCheckpointDList &list)
}
}
void ObDataCheckpoint::road_to_flush(int64_t rec_log_ts)
void ObDataCheckpoint::road_to_flush(palf::SCN rec_scn)
{
if (OB_UNLIKELY(!is_inited_)) {
STORAGE_LOG(WARN, "ObDataCheckpoint not init", K(is_inited_));
@ -312,7 +312,7 @@ void ObDataCheckpoint::road_to_flush(int64_t rec_log_ts)
ObFreezeCheckpoint *last = nullptr;
{
ObSpinLockGuard guard(lock_);
last = active_list_.get_first_greater(rec_log_ts);
last = active_list_.get_first_greater(rec_scn);
}
pop_range_to_ls_frozen_(last, active_list_);
last_time = common::ObTimeUtility::fast_current_time();
@ -518,8 +518,8 @@ int ObDataCheckpoint::traversal_flush_()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
// Because prepare list is ordered based on rec_log_ts and we want to flush
// based on the order of rec_log_ts. So we should can simply use a small
// Because prepare list is ordered based on rec_scn and we want to flush
// based on the order of rec_scn. So we should can simply use a small
// number for flush tasks.
const int MAX_DATA_CHECKPOINT_FLUSH_COUNT = 10000;
ObSEArray<ObTableHandleV2, 64> flush_tasks;

View File

@ -17,6 +17,7 @@
#include "storage/checkpoint/ob_common_checkpoint.h"
#include "lib/lock/ob_spin_lock.h"
#include "storage/checkpoint/ob_freeze_checkpoint.h"
#include "logservice/palf/scn.h"
namespace oceanbase
{
@ -39,8 +40,8 @@ struct ObCheckpointDList
int unlink(ObFreezeCheckpoint *ob_freeze_checkpoint);
int insert(ObFreezeCheckpoint *ob_freeze_checkpoint, bool ordered = true);
void get_iterator(ObCheckpointIterator &iterator);
int64_t get_min_rec_log_ts_in_list(bool ordered = true);
ObFreezeCheckpoint *get_first_greater(const int64_t rec_log_ts);
palf::SCN get_min_rec_scn_in_list(bool ordered = true);
ObFreezeCheckpoint *get_first_greater(const palf::SCN rec_scn);
int get_freezecheckpoint_info(
ObIArray<checkpoint::ObFreezeCheckpointVTInfo> &freeze_checkpoint_array);
@ -65,7 +66,6 @@ private:
};
// responsible for maintenance transaction checkpoint unit
// including data_memtable, tx_data_memtable
class ObDataCheckpoint : public ObCommonCheckpoint
{
friend class ObFreezeCheckpoint;
@ -88,21 +88,21 @@ public:
static const uint64_t LS_DATA_CHECKPOINT_TABLET_ID = 40000;
int init(ObLS *ls);
int safe_to_destroy();
int64_t get_rec_log_ts() override;
// if min_rec_log_ts <= the input rec_log_ts
palf::SCN get_rec_scn();
// if min_rec_scn <= the input rec_scn
// logstream freeze
int flush(int64_t recycle_log_ts, bool need_freeze = true) override;
// if min_rec_log_ts <= the input rec_log_ts
int flush(palf::SCN recycle_scn, bool need_freeze = true);
// if min_rec_scn <= the input rec_scn
// add ls_freeze task
// logstream freeze optimization
int ls_freeze(int64_t rec_log_ts);
int ls_freeze(palf::SCN rec_scn);
// logstream_freeze schedule and minor merge schedule
void road_to_flush(int64_t rec_log_ts);
void road_to_flush(palf::SCN rec_scn);
// ObFreezeCheckpoint register into ObDataCheckpoint
int add_to_new_create(ObFreezeCheckpoint *ob_freeze_checkpoint);
// remove from prepare_list when finish minor_merge
int unlink_from_prepare(ObFreezeCheckpoint *ob_freeze_checkpoint);
// timer to tranfer freeze_checkpoint that rec_log_ts is stable from new_create_list to
// timer to tranfer freeze_checkpoint that rec_scn is stable from new_create_list to
// active_list
int check_can_move_to_active_in_newcreate();

View File

@ -100,8 +100,8 @@ int ObFreezeCheckpoint::check_can_move_to_active(bool is_ls_freeze)
{
int ret = OB_SUCCESS;
if (location_ != ACTIVE) {
// only when the unit rec_log_ts is stable that can be moved to ordered_active_list
if (rec_log_ts_is_stable()) {
// only when the unit rec_scn is stable that can be moved to ordered_active_list
if (rec_scn_is_stable()) {
if (OB_FAIL(move_to_active_(is_ls_freeze))) {
STORAGE_LOG(ERROR, "transfer to active failed", K(ret));
}

View File

@ -17,6 +17,7 @@
#include "lib/utility/ob_print_utils.h"
#include "share/ob_ls_id.h"
#include "common/ob_tablet_id.h"
#include "logservice/palf/scn.h"
namespace oceanbase
{
@ -60,21 +61,21 @@ int freeze_checkpoint_location_to_string(const ObFreezeCheckpointLocation locati
struct ObFreezeCheckpointVTInfo
{
ObTabletID tablet_id;
int64_t rec_log_ts;
palf::SCN rec_scn;
ObFreezeCheckpointLocation location;
bool rec_log_ts_is_stable;
bool rec_scn_is_stable;
TO_STRING_KV(
K(tablet_id),
K(rec_log_ts),
K(rec_scn),
K(location),
K(rec_log_ts_is_stable)
K(rec_scn_is_stable)
);
};
// checkpoint unit like data_memtable and memtable that
// 1. write TRANS_SERVICE_LOG_BASE_TYPE clog
// 2. have freeze operation and rec_log_ts can become smaller
// 2. have freeze operation and rec_scn can become smaller
// inherit from ObFreezeCheckpoint
// register into ObDataCheckpoint
class ObFreezeCheckpoint : public common::ObDLinkBase<ObFreezeCheckpoint>
@ -86,12 +87,12 @@ public:
ObFreezeCheckpoint() : location_(OUT), data_checkpoint_(nullptr) {}
virtual ~ObFreezeCheckpoint() {}
virtual void remove_from_data_checkpoint(bool need_lock_data_checkpoint = true);
virtual int64_t get_rec_log_ts() = 0;
virtual palf::SCN get_rec_scn() = 0;
virtual int flush(share::ObLSID ls_id) = 0;
// judge rec_log_ts of the checkpoint unit won't get smaller
// by comparing with max_consequent_callbacked_log_ts
// a unit will only be moved once by rec_log_ts_stable_
virtual bool rec_log_ts_is_stable() = 0;
// judge rec_scn of the checkpoint unit won't get smaller
// by comparing with max_consequent_callbacked_scn
// a unit will only be moved once by rec_scn_stable_
virtual bool rec_scn_is_stable() = 0;
// Whether the dump conditions are met
virtual bool ready_for_flush() = 0;
// avoid active checkpoint block minor merge
@ -104,7 +105,7 @@ public:
int add_to_data_checkpoint(ObDataCheckpoint *data_checkpoint);
bool is_in_prepare_list_of_data_checkpoint();
// transfer to active_list in ObDataCheckpoint
// when the checkpoint unit rec_log_ts_is_stable
// when the checkpoint unit rec_scn_is_stable
// @param[in] is_ls_freeze, whether the process is triggered by logstream_freeze
int check_can_move_to_active(bool is_ls_freeze = false);
// after checkpoint ready_for_flush

View File

@ -1026,6 +1026,8 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_()
LOG_WARN("checkpoint executor should not be NULL", K(ret), KPC(ctx_), KP(checkpoint_executor));
} else {
const int64_t wait_checkpoint_push_start_ts = ObTimeUtility::current_time();
palf::SCN tmp;
tmp.convert_for_lsn_allocator(ctx_->log_sync_scn_);
while (OB_SUCC(ret)) {
if (ctx_->is_failed()) {
ret = OB_CANCELED;
@ -1041,7 +1043,7 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_()
const int64_t cost_ts = ObTimeUtility::current_time() - wait_checkpoint_push_start_ts;
LOG_INFO("succeed wait clog checkpoint ts push", "cost", cost_ts, "ls_id", ctx_->arg_.ls_id_);
break;
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(ctx_->log_sync_scn_))) {
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(tmp))) {
if (OB_NO_NEED_UPDATE == ret) {
ret = OB_SUCCESS;
} else {

View File

@ -343,7 +343,7 @@ int ObFreezer::inner_logstream_freeze()
uint32_t freeze_clock = get_freeze_clock();
TRANS_LOG(INFO, "[Freezer] freeze_clock", K(ret), K_(ls_id), K(freeze_clock));
if (OB_FAIL(data_checkpoint_->ls_freeze(INT64_MAX))) {
if (OB_FAIL(data_checkpoint_->ls_freeze(palf::SCN::max_scn()))) {
// move memtables from active_list to frozen_list
TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K_(ls_id));
stat_.add_diagnose_info("data_checkpoint freeze failed");

View File

@ -1208,13 +1208,14 @@ int ObLS::flush_if_need(const bool need_flush)
int ObLS::flush_if_need_(const bool need_flush)
{
int ret = OB_SUCCESS;
int64_t clog_checkpoint_ts = get_clog_checkpoint_ts();
palf::SCN clog_checkpoint_scn = get_clog_checkpoint_scn();
palf::SCN invalid_scn;
if ((!need_flush && !checkpoint_executor_.need_flush()) || checkpoint_executor_.is_wait_advance_checkpoint()) {
STORAGE_LOG(INFO, "the ls no need flush to advance_checkpoint", K(get_ls_id()));
} else if (OB_FAIL(checkpoint_executor_.advance_checkpoint_by_flush())) {
} else if (OB_FAIL(checkpoint_executor_.advance_checkpoint_by_flush(invalid_scn))) {
STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", KR(ret), K(get_ls_id()));
} else {
checkpoint_executor_.set_wait_advance_checkpoint(clog_checkpoint_ts);
checkpoint_executor_.set_wait_advance_checkpoint(clog_checkpoint_scn);
}
return ret;
}

View File

@ -306,6 +306,7 @@ public:
UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_clog_checkpoint);
UPDATE_LSMETA_WITHOUT_LOCK(ls_meta_, set_clog_checkpoint);
CONST_DELEGATE_WITH_RET(ls_meta_, get_clog_checkpoint_ts, int64_t);
CONST_DELEGATE_WITH_RET(ls_meta_, get_clog_checkpoint_scn, palf::SCN);
DELEGATE_WITH_RET(ls_meta_, get_clog_base_lsn, palf::LSN &);
DELEGATE_WITH_RET(ls_meta_, get_saved_info, int);
// int build_saved_info();

View File

@ -20,6 +20,7 @@
#include "storage/tablet/ob_tablet_iterator.h"
#include "storage/ddl/ob_tablet_ddl_kv_mgr.h"
#include "logservice/ob_log_base_header.h"
#include "logservice/palf/scn.h"
namespace oceanbase
{
@ -211,7 +212,7 @@ int ObLSDDLLogHandler::resume_leader()
return ret;
}
int ObLSDDLLogHandler::flush(int64_t rec_log_ts)
int ObLSDDLLogHandler::flush(palf::SCN &rec_scn)
{
int ret = OB_SUCCESS;
ObLSTabletIterator tablet_iter(ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US);
@ -242,7 +243,7 @@ int ObLSDDLLogHandler::flush(int64_t rec_log_ts)
ObDDLTableMergeDagParam param;
param.ls_id_ = ls_->get_ls_id();
param.tablet_id_ = tablet_handle.get_obj()->get_tablet_meta().tablet_id_;
param.rec_log_ts_ = rec_log_ts;
param.rec_log_ts_ = rec_scn.convert_to_ts();
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
LOG_WARN("failed to schedule ddl kv merge dag", K(ret));
@ -255,6 +256,13 @@ int ObLSDDLLogHandler::flush(int64_t rec_log_ts)
return OB_SUCCESS;
}
palf::SCN ObLSDDLLogHandler::get_rec_scn()
{
palf::SCN tmp;
tmp.convert_for_lsn_allocator(get_rec_log_ts());
return tmp;
}
int64_t ObLSDDLLogHandler::get_rec_log_ts()
{
int ret = OB_SUCCESS;

View File

@ -53,8 +53,9 @@ public:
int resume_leader() override final;
// for checkpoint
int flush(int64_t rec_log_ts) override final;
int64_t get_rec_log_ts() override final;
int flush(palf::SCN &rec_scn) override final;
int64_t get_rec_log_ts();
palf::SCN get_rec_scn() override final;
private:
int replay_ddl_redo_log_(const char *log_buf, const int64_t buf_size, int64_t pos, const int64_t log_ts);
int replay_ddl_prepare_log_(const char *log_buf, const int64_t buf_size, int64_t pos, const int64_t log_ts);

View File

@ -49,7 +49,7 @@ ObLSMeta::ObLSMeta()
ls_id_(),
replica_type_(REPLICA_TYPE_MAX),
ls_create_status_(ObInnerLSStatus::CREATING),
clog_checkpoint_ts_(0),
clog_checkpoint_scn_(),
clog_base_lsn_(PALF_INITIAL_LSN_VAL),
rebuild_seq_(0),
migration_status_(ObMigrationStatus::OB_MIGRATION_STATUS_MAX),
@ -69,7 +69,7 @@ ObLSMeta::ObLSMeta(const ObLSMeta &ls_meta)
ls_id_(ls_meta.ls_id_),
replica_type_(ls_meta.replica_type_),
ls_create_status_(ls_meta.ls_create_status_),
clog_checkpoint_ts_(ls_meta.clog_checkpoint_ts_),
clog_checkpoint_scn_(ls_meta.clog_checkpoint_scn_),
clog_base_lsn_(ls_meta.clog_base_lsn_),
rebuild_seq_(ls_meta.rebuild_seq_),
migration_status_(ls_meta.migration_status_),
@ -94,7 +94,7 @@ ObLSMeta &ObLSMeta::operator=(const ObLSMeta &other)
rebuild_seq_ = other.rebuild_seq_;
migration_status_ = other.migration_status_;
clog_base_lsn_ = other.clog_base_lsn_;
clog_checkpoint_ts_ = other.clog_checkpoint_ts_;
clog_checkpoint_scn_ = other.clog_checkpoint_scn_;
gc_state_ = other.gc_state_;
offline_scn_ = other.offline_scn_;
restore_status_ = other.restore_status_;
@ -113,7 +113,7 @@ void ObLSMeta::reset()
ls_id_.reset();
replica_type_ = REPLICA_TYPE_MAX;
clog_base_lsn_.reset();
clog_checkpoint_ts_ = 0;
clog_checkpoint_scn_.reset();
rebuild_seq_ = 0;
migration_status_ = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
gc_state_ = LSGCState::INVALID_LS_GC_STATE;
@ -130,20 +130,26 @@ LSN &ObLSMeta::get_clog_base_lsn()
return clog_base_lsn_;
}
SCN ObLSMeta::get_clog_checkpoint_scn() const
{
ObSpinLockTimeGuard guard(lock_);
return clog_checkpoint_scn_;
}
int64_t ObLSMeta::get_clog_checkpoint_ts() const
{
ObSpinLockTimeGuard guard(lock_);
return clog_checkpoint_ts_;
return clog_checkpoint_scn_.get_val_for_lsn_allocator();
}
int ObLSMeta::set_clog_checkpoint(const LSN &clog_checkpoint_lsn,
const int64_t clog_checkpoint_ts,
const SCN clog_checkpoint_scn,
const bool write_slog)
{
ObSpinLockTimeGuard guard(lock_);
ObLSMeta tmp(*this);
tmp.clog_base_lsn_ = clog_checkpoint_lsn;
tmp.clog_checkpoint_ts_ = clog_checkpoint_ts;
tmp.clog_checkpoint_scn_ = clog_checkpoint_scn;
int ret = OB_SUCCESS;
if (write_slog) {
@ -153,7 +159,7 @@ int ObLSMeta::set_clog_checkpoint(const LSN &clog_checkpoint_lsn,
}
clog_base_lsn_ = clog_checkpoint_lsn;
clog_checkpoint_ts_ = clog_checkpoint_ts;
clog_checkpoint_scn_ = clog_checkpoint_scn;
return ret;
}
@ -401,7 +407,7 @@ int ObLSMeta::update_ls_meta(
ObSpinLockTimeGuard guard(lock_);
ObLSMeta tmp(*this);
tmp.clog_base_lsn_ = src_ls_meta.clog_base_lsn_;
tmp.clog_checkpoint_ts_ = src_ls_meta.clog_checkpoint_ts_;
tmp.clog_checkpoint_scn_ = src_ls_meta.clog_checkpoint_scn_;
tmp.replayable_point_ = src_ls_meta.replayable_point_;
tmp.tablet_change_checkpoint_scn_ = src_ls_meta.tablet_change_checkpoint_scn_;
if (update_restore_status) {
@ -409,7 +415,7 @@ int ObLSMeta::update_ls_meta(
}
guard.click();
tmp.all_id_meta_.update_all_id_meta(src_ls_meta.all_id_meta_);
if (tmp.clog_checkpoint_ts_ < clog_checkpoint_ts_) {
if (tmp.clog_checkpoint_scn_ < clog_checkpoint_scn_) {
// TODO: now do not allow clog checkpoint ts rollback, may support it in 4.1
ret = OB_ERR_UNEXPECTED;
LOG_WARN("do not allow clog checkpoint ts rollback", K(ret), K(src_ls_meta), KPC(this));
@ -418,7 +424,7 @@ int ObLSMeta::update_ls_meta(
} else {
guard.click();
clog_base_lsn_ = src_ls_meta.clog_base_lsn_;
clog_checkpoint_ts_ = src_ls_meta.clog_checkpoint_ts_;
clog_checkpoint_scn_ = src_ls_meta.clog_checkpoint_scn_;
replayable_point_ = src_ls_meta.replayable_point_;
tablet_change_checkpoint_scn_ = src_ls_meta.tablet_change_checkpoint_scn_;
@ -535,7 +541,7 @@ int ObLSMeta::build_saved_info()
ret = OB_ERR_UNEXPECTED;
LOG_WARN("saved info is not empty, can not build saved info", K(ret), K(*this));
} else {
saved_info.clog_checkpoint_ts_ = clog_checkpoint_ts_;
saved_info.clog_checkpoint_ts_ = clog_checkpoint_scn_.convert_to_ts();
saved_info.clog_base_lsn_ = clog_base_lsn_;
saved_info.tablet_change_checkpoint_scn_ = tablet_change_checkpoint_scn_;
ObLSMeta tmp(*this);
@ -592,7 +598,7 @@ int ObLSMeta::init(
ls_id_ = ls_id;
replica_type_ = replica_type;
ls_create_status_ = ObInnerLSStatus::CREATING;
clog_checkpoint_ts_ = create_scn;
clog_checkpoint_scn_.convert_for_lsn_allocator(create_scn);
clog_base_lsn_.val_ = PALF_INITIAL_LSN_VAL;
rebuild_seq_ = 0;
migration_status_ = migration_status;
@ -641,7 +647,7 @@ OB_SERIALIZE_MEMBER(ObLSMeta,
ls_id_,
replica_type_,
ls_create_status_,
clog_checkpoint_ts_,
clog_checkpoint_scn_,
clog_base_lsn_,
rebuild_seq_,
migration_status_,

View File

@ -46,10 +46,11 @@ public:
ObLSMeta(const ObLSMeta &ls_meta);
~ObLSMeta() {}
ObLSMeta &operator=(const ObLSMeta &other);
palf::SCN get_clog_checkpoint_scn() const;
int64_t get_clog_checkpoint_ts() const;
palf::LSN &get_clog_base_lsn();
int set_clog_checkpoint(const palf::LSN &clog_checkpoint_lsn,
const int64_t clog_checkpoint_ts,
const palf::SCN clog_checkpoint_scn,
const bool write_slog = true);
void reset();
bool is_valid() const;
@ -108,7 +109,7 @@ public:
ObSpinLockGuard lock_guard_;
};
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(replica_type), K_(ls_create_status),
K_(clog_checkpoint_ts), K_(clog_base_lsn),
K_(clog_checkpoint_scn), K_(clog_base_lsn),
K_(rebuild_seq), K_(migration_status), K(gc_state_), K(offline_scn_),
K_(restore_status), K_(replayable_point), K_(tablet_change_checkpoint_scn),
K_(all_id_meta));
@ -125,10 +126,10 @@ private:
void set_write_slog_func_(WriteSlog write_slog);
static WriteSlog write_slog_;
// clog_checkpoint_ts_, meaning:
// 1. dump points of all modules have exceeded clog_checkpoint_ts_
// 2. all clog entries which log_ts are smaller than clog_checkpoint_ts_ can be recycled
int64_t clog_checkpoint_ts_;
// clog_checkpoint_scn_, meaning:
// 1. dump points of all modules have exceeded clog_checkpoint_scn_
// 2. all clog entries which log_scn are smaller than clog_checkpoint_scn_ can be recycled
palf::SCN clog_checkpoint_scn_;
// clog_base_lsn_, meaning:
// 1. all clog entries which lsn are smaller than clog_base_lsn_ have been recycled
// 2. log_ts of log entry that clog_base_lsn_ points to is smaller than/equal to clog_checkpoint_ts_

View File

@ -112,17 +112,16 @@ int ObLSSyncTabletSeqHandler::resume_leader()
return ret;
}
int ObLSSyncTabletSeqHandler::flush(int64_t rec_log_ts)
int ObLSSyncTabletSeqHandler::flush(palf::SCN &scn)
{
// TODO
UNUSED(rec_log_ts);
UNUSED(scn);
return OB_SUCCESS;
}
int64_t ObLSSyncTabletSeqHandler::get_rec_log_ts()
palf::SCN ObLSSyncTabletSeqHandler::get_rec_scn()
{
// TODO
return INT64_MAX;
return palf::SCN::max_scn();
}
}

View File

@ -49,8 +49,8 @@ public:
int resume_leader() override final;
// for checkpoint
int flush(int64_t rec_log_ts) override final;
int64_t get_rec_log_ts() override final;
int flush(palf::SCN &scn) override final;
palf::SCN get_rec_scn() override final;
private:
bool is_inited_;

View File

@ -21,6 +21,7 @@
#include "logservice/ob_log_base_header.h"
#include "logservice/ob_log_base_type.h"
#include "logservice/ob_log_service.h"
#include "logservice/palf/scn.h"
#include "observer/report/ob_i_meta_report.h"
#include "share/ob_disk_usage_table_operator.h"
#include "share/ob_rpc_struct.h"
@ -197,15 +198,15 @@ int ObLSTabletService::resume_leader()
return ret;
}
int ObLSTabletService::flush(int64_t rec_log_ts)
int ObLSTabletService::flush(palf::SCN &recycle_scn)
{
UNUSED(rec_log_ts);
UNUSED(recycle_scn);
return OB_SUCCESS;
}
int64_t ObLSTabletService::get_rec_log_ts()
palf::SCN ObLSTabletService::get_rec_scn()
{
return INT64_MAX;
return palf::SCN::max_scn();
}
int ObLSTabletService::prepare_for_safe_destroy()

View File

@ -122,8 +122,9 @@ private:
virtual int resume_leader() override;
// for checkpoint
virtual int flush(int64_t rec_log_ts) override;
virtual int64_t get_rec_log_ts() override;
virtual int flush(palf::SCN &recycle_scn) override;
virtual palf::SCN get_rec_scn() override;
public:
int prepare_for_safe_destroy();
int safe_to_destroy(bool &is_safe);

View File

@ -399,8 +399,8 @@ int ObLSTxService::resume_leader()
}
inline
void get_min_rec_log_ts_common_checkpoint_type_by_index_(int index,
char *common_checkpoint_type)
void get_min_rec_scn_common_checkpoint_type_by_index_(int index,
char *common_checkpoint_type)
{
int ret = OB_SUCCESS;
if (index == 0) {
@ -415,39 +415,39 @@ void get_min_rec_log_ts_common_checkpoint_type_by_index_(int index,
}
}
int64_t ObLSTxService::get_rec_log_ts()
palf::SCN ObLSTxService::get_rec_scn()
{
int64_t min_rec_log_ts = INT64_MAX;
int min_rec_log_ts_common_checkpoint_type_index = 0;
palf::SCN min_rec_scn = palf::SCN::max_scn();
int min_rec_scn_common_checkpoint_type_index = 0;
char common_checkpoint_type[common::MAX_CHECKPOINT_TYPE_BUF_LENGTH];
for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) {
if (OB_NOT_NULL(common_checkpoints_[i])) {
int64_t rec_log_ts = common_checkpoints_[i]->get_rec_log_ts();
if (rec_log_ts > 0 && rec_log_ts < min_rec_log_ts) {
min_rec_log_ts = rec_log_ts;
min_rec_log_ts_common_checkpoint_type_index = i;
palf::SCN rec_scn = common_checkpoints_[i]->get_rec_scn();
if (rec_scn.is_valid() && rec_scn < min_rec_scn) {
min_rec_scn = rec_scn;
min_rec_scn_common_checkpoint_type_index = i;
}
}
}
get_min_rec_log_ts_common_checkpoint_type_by_index_(min_rec_log_ts_common_checkpoint_type_index,
common_checkpoint_type);
get_min_rec_scn_common_checkpoint_type_by_index_(min_rec_scn_common_checkpoint_type_index,
common_checkpoint_type);
TRANS_LOG(INFO, "[CHECKPOINT] ObLSTxService::get_rec_log_ts",
TRANS_LOG(INFO, "[CHECKPOINT] ObLSTxService::get_rec_scn",
K(common_checkpoint_type),
KPC(common_checkpoints_[min_rec_log_ts_common_checkpoint_type_index]),
K(min_rec_log_ts), K(ls_id_));
KPC(common_checkpoints_[min_rec_scn_common_checkpoint_type_index]),
K(min_rec_scn), K(ls_id_));
return min_rec_log_ts;
return min_rec_scn;
}
int ObLSTxService::flush(int64_t rec_log_ts)
int ObLSTxService::flush(palf::SCN &recycle_scn)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) {
// only flush the common_checkpoint that whose clog need recycle
if (OB_NOT_NULL(common_checkpoints_[i]) && rec_log_ts >= common_checkpoints_[i]->get_rec_log_ts()) {
if (OB_SUCCESS != (tmp_ret = common_checkpoints_[i]->flush(rec_log_ts))) {
if (OB_NOT_NULL(common_checkpoints_[i]) && recycle_scn >= common_checkpoints_[i]->get_rec_scn()) {
if (OB_SUCCESS != (tmp_ret = common_checkpoints_[i]->flush(recycle_scn))) {
TRANS_LOG(WARN, "obCommonCheckpoint flush failed", K(tmp_ret), K(common_checkpoints_[i]));
}
}
@ -467,7 +467,7 @@ int ObLSTxService::get_common_checkpoint_info(
} else {
ObCommonCheckpointVTInfo info;
info.tablet_id = common_checkpoint->get_tablet_id();
info.rec_log_ts = common_checkpoint->get_rec_log_ts();
info.rec_scn = common_checkpoint->get_rec_scn();
info.checkpoint_type = i;
info.is_flushing = common_checkpoint->is_flushing();
common_checkpoint_array.push_back(info);
@ -540,7 +540,7 @@ int ObLSTxService::traversal_flush()
int tmp_ret = OB_SUCCESS;
for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) {
if (OB_NOT_NULL(common_checkpoints_[i]) &&
OB_SUCCESS != (tmp_ret = common_checkpoints_[i]->flush(INT64_MAX, false))) {
OB_SUCCESS != (tmp_ret = common_checkpoints_[i]->flush(palf::SCN::max_scn(), false))) {
TRANS_LOG(WARN, "obCommonCheckpoint flush failed", K(tmp_ret), KP(common_checkpoints_[i]));
}
}

View File

@ -136,8 +136,8 @@ public:
int switch_to_follower_gracefully();
int resume_leader();
int64_t get_rec_log_ts();
int flush(int64_t rec_log_ts);
palf::SCN get_rec_scn() override;
int flush(palf::SCN &recycle_scn) override;
int get_common_checkpoint_info(
ObIArray<checkpoint::ObCommonCheckpointVTInfo> &common_checkpoint_array);

View File

@ -1578,7 +1578,7 @@ int ObMemtable::set_max_end_log_ts(const int64_t log_ts)
return ret;
}
bool ObMemtable::rec_log_ts_is_stable()
bool ObMemtable::rec_scn_is_stable()
{
int ret = OB_SUCCESS;
bool rec_log_ts_is_stable = false;
@ -1890,6 +1890,13 @@ int ObMemtable::flush(share::ObLSID ls_id)
return ret;
}
palf::SCN ObMemtable::get_rec_scn()
{
palf::SCN tmp;
tmp.convert_for_lsn_allocator(get_rec_log_ts());
return tmp;
}
bool ObMemtable::is_active_memtable() const
{
return !is_frozen_memtable();

View File

@ -370,11 +370,12 @@ public:
/* freeze */
virtual int set_frozen() override { local_allocator_.set_frozen(); return OB_SUCCESS; }
virtual bool rec_log_ts_is_stable() override;
virtual bool rec_scn_is_stable() override;
virtual bool ready_for_flush() override;
void print_ready_for_flush();
virtual int flush(share::ObLSID ls_id) override;
virtual int64_t get_rec_log_ts() override { return ATOMIC_LOAD(&rec_log_ts_); }
virtual int64_t get_rec_log_ts() { return ATOMIC_LOAD(&rec_log_ts_); }
virtual palf::SCN get_rec_scn();
virtual bool is_frozen_checkpoint() const override { return is_frozen_memtable();}
virtual bool is_active_checkpoint() const override { return is_active_memtable();}

View File

@ -735,17 +735,6 @@ int ObLockMemtable::get_frozen_schema_version(int64_t &schema_version) const
return OB_NOT_SUPPORTED;
}
int64_t ObLockMemtable::get_rec_log_ts()
{
// no need lock because rec_scn_ aesc except INT64_MAX
// TODO: cxf remove this
palf::SCN tmp;
int64_t tmp_rec_log_ts = INT64_MAX;
tmp = get_rec_scn();
tmp_rec_log_ts = tmp == palf::SCN::max_scn() ? INT64_MAX : tmp.get_val_for_lsn_allocator();
return tmp_rec_log_ts;
}
palf::SCN ObLockMemtable::get_rec_scn()
{
// no need lock because rec_scn_ aesc except INT64_MAX
@ -805,17 +794,8 @@ bool ObLockMemtable::is_active_memtable() const
return !ATOMIC_LOAD(&is_frozen_);
}
int ObLockMemtable::flush(int64_t recycle_log_ts, bool need_freeze)
{
int ret = OB_SUCCESS;
palf::SCN tmp;
tmp.convert_for_lsn_allocator(recycle_log_ts);
ret = flush(tmp, need_freeze);
return ret;
}
int ObLockMemtable::flush(const palf::SCN &recycle_scn,
const bool need_freeze)
int ObLockMemtable::flush(palf::SCN recycle_scn,
bool need_freeze)
{
int ret = OB_SUCCESS;
UNUSED(need_freeze);

View File

@ -118,11 +118,8 @@ public:
bool is_active_memtable() const override;
// =========== INHERITED FROM ObCommonCheckPoint ==========
virtual int64_t get_rec_log_ts() override;
virtual palf::SCN get_rec_scn();
virtual int flush(int64_t recycle_log_ts, bool need_freeze = true) override;
virtual int flush(const palf::SCN &recycle_scn,
const bool need_freeze = true);
virtual int flush(palf::SCN recycle_scn, bool need_freeze = true);
virtual ObTabletID get_tablet_id() const;

View File

@ -252,12 +252,12 @@ int ObIDService::update_ls_id_meta(const bool write_slog)
return ret;
}
int ObIDService::flush(int64_t rec_log_ts)
int ObIDService::flush(palf::SCN &rec_scn)
{
int ret = OB_SUCCESS;
WLockGuard guard(rwlock_);
palf::SCN latest_rec_log_ts = rec_log_ts_.atomic_get();
if (latest_rec_log_ts.get_val_for_lsn_allocator() <= rec_log_ts) {
if (latest_rec_log_ts <= rec_scn) {
latest_rec_log_ts = rec_log_ts_.atomic_get();
if (OB_FAIL(update_ls_id_meta(true))) {
TRANS_LOG(WARN, "update id meta of ls meta fail", K(ret), K(service_type_));
@ -289,11 +289,11 @@ int ObIDService::check_leader(bool &leader)
return ret;
}
int64_t ObIDService::get_rec_log_ts()
palf::SCN ObIDService::get_rec_scn()
{
const palf::SCN rec_log_ts = rec_log_ts_.atomic_get();
TRANS_LOG(INFO, "get rec log ts", K(service_type_), K(rec_log_ts));
return rec_log_ts.get_val_for_lsn_allocator();
TRANS_LOG(INFO, "get rec log scn", K(service_type_), K(rec_log_ts));
return rec_log_ts;
}
int ObIDService::switch_to_follower_gracefully()

View File

@ -18,6 +18,7 @@
#include "logservice/ob_append_callback.h"
#include "logservice/ob_log_base_type.h"
#include "logservice/ob_log_handler.h"
#include "logservice/palf/scn.h"
namespace oceanbase
{
@ -112,7 +113,8 @@ public:
int handle_replay_result(const int64_t last_id, const int64_t limited_id, const palf::SCN log_ts);
// clog checkpoint
int flush(int64_t rec_log_ts);
int flush(palf::SCN &scn);
palf::SCN get_rec_scn();
int64_t get_rec_log_ts();
// for clog replay

View File

@ -114,8 +114,8 @@ public:
int switch_to_leader() { ATOMIC_STORE(&is_master_,true); return OB_SUCCESS;}
int switch_to_follower_gracefully() { ATOMIC_STORE(&is_master_,false); return OB_SUCCESS;}
int resume_leader() { ATOMIC_STORE(&is_master_,true);return OB_SUCCESS; }
int64_t get_rec_log_ts() { return INT64_MAX; }
int flush(int64_t rec_log_ts) { return OB_SUCCESS;}
palf::SCN get_rec_scn() { return palf::SCN::max_scn(); }
int flush(palf::SCN &rec_scn) { return OB_SUCCESS;}
private:
bool check_gts_();

View File

@ -42,7 +42,6 @@ int ObAdvanceLSCkptTask::try_advance_ls_ckpt_ts()
int ret = OB_SUCCESS;
storage::ObLSHandle ls_handle;
int64_t target_ckpt_ts = 0;
if (OB_ISNULL(MTL(ObLSService *))
|| OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, storage::ObLSGetMod::TRANS_MOD))
@ -52,10 +51,8 @@ int ObAdvanceLSCkptTask::try_advance_ls_ckpt_ts()
}
TRANS_LOG(WARN, "get ls faild", K(ret), K(MTL(ObLSService *)));
} else if (ls_handle.get_ls()->get_checkpoint_executor()->advance_checkpoint_by_flush(
target_ckpt_ts)) {
TRANS_LOG(WARN, "advance checkpoint ts failed", K(ret), K(ls_id_), K(target_ckpt_ts));
} else if (OB_FAIL(target_ckpt_ts_.convert_for_lsn_allocator(target_ckpt_ts))) {
TRANS_LOG(WARN, "convert for lsn fail", K(target_ckpt_ts));
target_ckpt_ts_)) {
TRANS_LOG(WARN, "advance checkpoint ts failed", K(ret), K(ls_id_), K(target_ckpt_ts_));
}
if (OB_SUCC(ret)) {

View File

@ -106,10 +106,10 @@ void ObCheckPointService::wait()
int ObCheckPointService::add_ls_freeze_task(
ObDataCheckpoint *data_checkpoint,
int64_t rec_log_ts)
palf::SCN rec_scn)
{
int ret = OB_SUCCESS;
if (OB_FAIL(freeze_thread_.add_task(data_checkpoint, rec_log_ts))) {
if (OB_FAIL(freeze_thread_.add_task(data_checkpoint, rec_scn))) {
STORAGE_LOG(WARN, "logstream freeze task failed", K(ret));
}
return ret;
@ -358,7 +358,7 @@ int ObCheckPointService::do_minor_freeze()
ObCheckpointExecutor *checkpoint_executor = nullptr;
if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) {
STORAGE_LOG(WARN, "checkpoint_executor should not be null", K(ls->get_ls_id()));
} else if (OB_SUCCESS != (tmp_ret = (checkpoint_executor->advance_checkpoint_by_flush(INT64_MAX)))) {
} else if (OB_SUCCESS != (tmp_ret = (checkpoint_executor->advance_checkpoint_by_flush(palf::SCN::max_scn())))) {
STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", K(tmp_ret), K(ls->get_ls_id()));
}
}

View File

@ -15,6 +15,7 @@
#include "storage/tx_storage/ob_ls_freeze_thread.h"
#include "lib/lock/ob_spin_lock.h"
#include "lib/task/ob_timer.h"
#include "logservice/palf/scn.h"
namespace oceanbase
{
@ -46,11 +47,11 @@ public:
void destroy();
// add ls checkpoint freeze task
// @param [in] data_checkpoint, the data_checkpoint of this task.
// @param [in] rec_log_ts, freeze all the memtable whose rec_log_ts is
// @param [in] rec_scn, freeze all the memtable whose rec_scn is
// smaller than this one.
int add_ls_freeze_task(
ObDataCheckpoint *data_checkpoint,
int64_t rec_log_ts);
palf::SCN rec_scn);
int do_minor_freeze();

View File

@ -27,10 +27,10 @@ using namespace checkpoint;
void ObLSFreezeTask::set_task(ObLSFreezeThread *host,
ObDataCheckpoint *data_checkpoint,
int64_t rec_log_ts)
palf::SCN rec_scn)
{
host_ = host;
rec_log_ts_ = rec_log_ts;
rec_scn_ = rec_scn;
data_checkpoint_ = data_checkpoint;
}
@ -38,7 +38,7 @@ void ObLSFreezeTask::handle()
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(data_checkpoint_)) {
data_checkpoint_->road_to_flush(rec_log_ts_);
data_checkpoint_->road_to_flush(rec_scn_);
}
if (OB_NOT_NULL(host_)) {
if (OB_FAIL(host_->push_back_(this))) {
@ -109,7 +109,7 @@ int ObLSFreezeThread::init(int tg_id)
}
int ObLSFreezeThread::add_task(ObDataCheckpoint *data_checkpoint,
int64_t rec_log_ts)
palf::SCN rec_scn)
{
int ret = OB_SUCCESS;
ObLSFreezeTask *task = NULL;
@ -124,7 +124,7 @@ int ObLSFreezeThread::add_task(ObDataCheckpoint *data_checkpoint,
}
}
if (OB_SUCC(ret)) {
task->set_task(this, data_checkpoint, rec_log_ts);
task->set_task(this, data_checkpoint, rec_scn);
if (OB_FAIL(TG_PUSH_TASK(tg_id_, task))) {
STORAGE_LOG(WARN, "schedule timer task failed", K(ret));
}

View File

@ -15,6 +15,7 @@
#include "lib/lock/ob_spin_lock.h"
#include "lib/thread/thread_mgr_interface.h"
#include "logservice/palf/scn.h"
namespace oceanbase
{
@ -34,12 +35,12 @@ class ObLSFreezeTask
public:
void set_task(ObLSFreezeThread *host,
checkpoint::ObDataCheckpoint *data_checkpoint,
int64_t rec_log_ts);
palf::SCN rec_scn);
void handle();
private:
int64_t rec_log_ts_;
palf::SCN rec_scn_;
ObLSFreezeThread *host_;
checkpoint::ObDataCheckpoint *data_checkpoint_;
};
@ -59,7 +60,7 @@ public:
int init(int tg_id);
void destroy();
int add_task(checkpoint::ObDataCheckpoint *data_checkpoint, int64_t rec_log_ts);
int add_task(checkpoint::ObDataCheckpoint *data_checkpoint, palf::SCN rec_scn);
void handle(void *task);
int get_tg_id() { return tg_id_; }

View File

@ -734,7 +734,7 @@ int ObLSService::restore_update_ls_(const ObLSMetaPackage &meta_package)
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls is null", K(meta_package));
} else if (OB_FAIL(ls->set_clog_checkpoint(ls_meta.get_clog_base_lsn(), ls_meta.get_clog_checkpoint_ts()))) {
} else if (OB_FAIL(ls->set_clog_checkpoint(ls_meta.get_clog_base_lsn(), ls_meta.get_clog_checkpoint_scn()))) {
LOG_WARN("failed to set clog checkpoint", K(meta_package));
} else if (OB_FAIL(ls->advance_base_info(meta_package.palf_meta_, is_rebuild))) {
LOG_WARN("failed to advance base lsn", K(meta_package));

View File

@ -219,19 +219,6 @@ transaction::ObLSTxCtxMgr *ObTxCtxMemtable::get_ls_tx_ctx_mgr()
return ls_ctx_mgr_guard_.get_ls_tx_ctx_mgr();
}
// TODO: handle exception
int64_t ObTxCtxMemtable::get_rec_log_ts()
{
int ret = OB_SUCCESS;
// TODO: cxf remove this
palf::SCN tmp;
int64_t tmp_rec_log_ts = INT64_MAX;
tmp = get_rec_scn();
tmp_rec_log_ts = tmp == palf::SCN::max_scn() ? INT64_MAX : tmp.get_val_for_lsn_allocator();
return tmp_rec_log_ts;
}
palf::SCN ObTxCtxMemtable::get_rec_scn()
{
int ret = OB_SUCCESS;
@ -272,15 +259,15 @@ bool ObTxCtxMemtable::is_active_memtable() const
return !ATOMIC_LOAD(&is_frozen_);
}
int ObTxCtxMemtable::flush(int64_t recycle_log_ts, bool need_freeze)
int ObTxCtxMemtable::flush(palf::SCN recycle_scn, bool need_freeze)
{
int ret = OB_SUCCESS;
ObSpinLockGuard guard(flush_lock_);
if (need_freeze) {
int64_t rec_log_ts = get_rec_log_ts();
if (rec_log_ts >= recycle_log_ts) {
TRANS_LOG(INFO, "no need to freeze", K(rec_log_ts), K(recycle_log_ts));
palf::SCN rec_scn = get_rec_scn();
if (rec_scn >= recycle_scn) {
TRANS_LOG(INFO, "no need to freeze", K(rec_scn), K(recycle_scn));
} else if (is_active_memtable()) {
int64_t cur_ts = common::ObClockGenerator::getClock();
ObScnRange scn_range;

View File

@ -66,9 +66,8 @@ public:
virtual int64_t get_occupied_size() const override { return 0; }
// ================ INHERITED FROM ObCommonCheckpoint ===============
virtual int64_t get_rec_log_ts() override;
virtual palf::SCN get_rec_scn();
virtual int flush(int64_t recycle_log_ts, bool need_freeze = true) override;
virtual int flush(palf::SCN recycle_scn, bool need_freeze = true);
virtual ObTabletID get_tablet_id() const override;

View File

@ -369,11 +369,6 @@ int ObTxDataMemtableMgr::get_all_memtables_for_write(ObTxDataMemtableWriteGuard
return ret;
}
int64_t ObTxDataMemtableMgr::get_rec_log_ts()
{
return get_rec_scn().get_val_for_lsn_allocator();
}
palf::SCN ObTxDataMemtableMgr::get_rec_scn()
{
int ret = OB_SUCCESS;
@ -420,7 +415,6 @@ int ObTxDataMemtableMgr::flush_all_frozen_memtables_(ObTableHdlArray &memtable_h
return ret;
}
int ObTxDataMemtableMgr::flush(palf::SCN recycle_scn, bool need_freeze)
{
int ret = OB_SUCCESS;
@ -459,17 +453,6 @@ int ObTxDataMemtableMgr::flush(palf::SCN recycle_scn, bool need_freeze)
return ret;
}
int ObTxDataMemtableMgr::flush(int64_t recycle_log_ts, bool need_freeze)
{
palf::SCN recycle_scn;
if (INT64_MAX == recycle_log_ts) {
recycle_scn.set_max();
} else {
recycle_scn.convert_for_lsn_allocator(recycle_log_ts);
}
return flush(recycle_scn, need_freeze);
}
ObTabletID ObTxDataMemtableMgr::get_tablet_id() const
{
return LS_TX_DATA_TABLET;

View File

@ -94,10 +94,8 @@ public: // ObTxDataMemtableMgr
int get_memtable_range(int64_t &memtable_head, int64_t &memtable_tail);
// ================ INHERITED FROM ObCommonCheckpoint ===============
virtual int64_t get_rec_log_ts() override;
virtual palf::SCN get_rec_scn() override;
virtual int flush(int64_t recycle_log_ts, bool need_freeze = true) override;
virtual int flush(palf::SCN recycle_scn, bool need_freeze = true) override;
virtual ObTabletID get_tablet_id() const override;

View File

@ -783,7 +783,7 @@ int ObTxDataTable::self_freeze_task()
STORAGE_LOG(INFO, "start tx data table self freeze task", K(get_ls_id()));
if (OB_FAIL(memtable_mgr_->flush(INT64_MAX, true))) {
if (OB_FAIL(memtable_mgr_->flush(palf::SCN::max_scn(), true))) {
share::ObLSID ls_id = get_ls_id();
STORAGE_LOG(WARN, "self freeze of tx data memtable failed.", KR(ret), K(ls_id), KPC(memtable_mgr_));
}