diff --git a/src/logservice/ob_garbage_collector.cpp b/src/logservice/ob_garbage_collector.cpp index dd6ceff15..fb92bd3fe 100644 --- a/src/logservice/ob_garbage_collector.cpp +++ b/src/logservice/ob_garbage_collector.cpp @@ -430,22 +430,12 @@ int ObGCHandler::replay(const void *buffer, return ret; } -void ObGCHandler::get_rec_log_scn(SCN &scn) +palf::SCN ObGCHandler::get_rec_scn() { - scn.set_max(); + return palf::SCN::max_scn(); } -int64_t ObGCHandler::get_rec_log_ts() -{ - return INT64_MAX; -} - -int ObGCHandler::flush(const SCN &scn) -{ - // do nothing - return OB_SUCCESS; -} -int ObGCHandler::flush(int64_t rec_log_ts) +int ObGCHandler::flush(SCN &scn) { // do nothing return OB_SUCCESS; diff --git a/src/logservice/ob_garbage_collector.h b/src/logservice/ob_garbage_collector.h index 47d2e94fd..0a2da8447 100644 --- a/src/logservice/ob_garbage_collector.h +++ b/src/logservice/ob_garbage_collector.h @@ -207,10 +207,8 @@ public: virtual int resume_leader() override; // for checkpoint - virtual int64_t get_rec_log_ts() override; - virtual void get_rec_log_scn(palf::SCN &scn); - virtual int flush(int64_t rec_log_ts) override; - virtual int flush(const palf::SCN &scn) ; + virtual palf::SCN get_rec_scn() override; + virtual int flush(palf::SCN &scn) override; TO_STRING_KV(K(is_inited_), K(gc_seq_invalid_member_)); diff --git a/src/logservice/ob_log_base_type.h b/src/logservice/ob_log_base_type.h index 9ca6937e2..2596346c5 100644 --- a/src/logservice/ob_log_base_type.h +++ b/src/logservice/ob_log_base_type.h @@ -158,8 +158,8 @@ public: class ObICheckpointSubHandler { public: - virtual int64_t get_rec_log_ts() = 0; - virtual int flush(int64_t rec_log_ts) = 0; + virtual palf::SCN get_rec_scn() = 0; + virtual int flush(palf::SCN &scn) = 0; }; #define REGISTER_TO_LOGSERVICE(type, subhandler) \ diff --git a/src/observer/virtual_table/ob_all_virtual_checkpoint.cpp b/src/observer/virtual_table/ob_all_virtual_checkpoint.cpp index 92e17ad91..d505c423a 100644 --- a/src/observer/virtual_table/ob_all_virtual_checkpoint.cpp +++ b/src/observer/virtual_table/ob_all_virtual_checkpoint.cpp @@ -202,7 +202,7 @@ int ObAllVirtualCheckpointInfo::process_curr_tenant(ObNewRow *&row) } case OB_APP_MIN_COLUMN_ID + 5: { //TODO:SCN - cur_row_.cells_[i].set_uint64(checkpoint.rec_log_ts < 0 ? 0 : checkpoint.rec_log_ts); + cur_row_.cells_[i].set_uint64(checkpoint.rec_scn.is_valid() ? checkpoint.rec_scn.get_val_for_inner_table_field() : 0); break; } default: diff --git a/src/observer/virtual_table/ob_all_virtual_transaction_checkpoint.cpp b/src/observer/virtual_table/ob_all_virtual_transaction_checkpoint.cpp index 70517dcce..d4b92bcd8 100644 --- a/src/observer/virtual_table/ob_all_virtual_transaction_checkpoint.cpp +++ b/src/observer/virtual_table/ob_all_virtual_transaction_checkpoint.cpp @@ -189,7 +189,7 @@ int ObAllVirtualTransCheckpointInfo::process_curr_tenant(ObNewRow *&row) break; } case OB_APP_MIN_COLUMN_ID + 5: { - cur_row_.cells_[i].set_uint64((common_checkpoint.rec_log_ts < 0 ? 0 : common_checkpoint.rec_log_ts)); + cur_row_.cells_[i].set_uint64((common_checkpoint.rec_scn.is_valid() ? common_checkpoint.rec_scn.get_val_for_inner_table_field() : 0)); break; } case OB_APP_MIN_COLUMN_ID + 6: diff --git a/src/observer/virtual_table/ob_all_virtual_transaction_freeze_checkpoint.cpp b/src/observer/virtual_table/ob_all_virtual_transaction_freeze_checkpoint.cpp index 14c1f370b..0a2aa34c8 100644 --- a/src/observer/virtual_table/ob_all_virtual_transaction_freeze_checkpoint.cpp +++ b/src/observer/virtual_table/ob_all_virtual_transaction_freeze_checkpoint.cpp @@ -190,7 +190,7 @@ int ObAllVirtualFreezeCheckpointInfo::process_curr_tenant(ObNewRow *&row) break; } case OB_APP_MIN_COLUMN_ID + 5: { - cur_row_.cells_[i].set_uint64(freeze_checkpoint.rec_log_ts < 0 ? 0 : freeze_checkpoint.rec_log_ts); + cur_row_.cells_[i].set_uint64(freeze_checkpoint.rec_scn.is_valid() ? freeze_checkpoint.rec_scn.get_val_for_inner_table_field() : 0); break; } case OB_APP_MIN_COLUMN_ID + 6: @@ -205,7 +205,7 @@ int ObAllVirtualFreezeCheckpointInfo::process_curr_tenant(ObNewRow *&row) } break; case OB_APP_MIN_COLUMN_ID + 7: - cur_row_.cells_[i].set_int(freeze_checkpoint.rec_log_ts_is_stable ? 1 : 0); + cur_row_.cells_[i].set_int(freeze_checkpoint.rec_scn_is_stable ? 1 : 0); break; default: ret = OB_ERR_UNEXPECTED; diff --git a/src/rootserver/freeze/ob_major_freeze_service.h b/src/rootserver/freeze/ob_major_freeze_service.h index 6aa66fcf3..69237a29c 100644 --- a/src/rootserver/freeze/ob_major_freeze_service.h +++ b/src/rootserver/freeze/ob_major_freeze_service.h @@ -38,21 +38,12 @@ public: int init(uint64_t tenant_id); - // clog checkpoint, do nothing - int flush(int64_t rec_log_ts) - { - UNUSED(rec_log_ts); - return OB_SUCCESS; - } - int flush(palf::SCN &rec_scn) { UNUSED(rec_scn); return OB_SUCCESS; } - int64_t get_rec_log_ts() { return INT64_MAX; } - - palf::SCN get_rec_log_scn() { return palf::SCN::max_scn(); } + palf::SCN get_rec_scn() override { return palf::SCN::max_scn(); } // for replay, do nothing int replay(const void *buffer, diff --git a/src/rootserver/ob_primary_ls_service.h b/src/rootserver/ob_primary_ls_service.h index f781c3183..c99c33fd8 100644 --- a/src/rootserver/ob_primary_ls_service.h +++ b/src/rootserver/ob_primary_ls_service.h @@ -320,11 +320,8 @@ public: virtual void do_work() override; public: - //TODO(SCN)yaoying.yyy: get_rec_log_scn() and flush() override - virtual int64_t get_rec_log_ts() override { return INT64_MAX;} - virtual const palf::SCN get_rec_log_scn(){ return palf::SCN::max_scn();} - virtual int flush(int64_t rec_log_ts) override { return OB_SUCCESS; } - virtual int flush(palf::SCN &scn) { return OB_SUCCESS; } + virtual palf::SCN get_rec_scn() override { return palf::SCN::max_scn();} + virtual int flush(palf::SCN &scn) override { return OB_SUCCESS; } int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const palf::SCN &scn) override { UNUSED(buffer); diff --git a/src/rootserver/restore/ob_restore_scheduler.h b/src/rootserver/restore/ob_restore_scheduler.h index c95ec98bc..62263f579 100644 --- a/src/rootserver/restore/ob_restore_scheduler.h +++ b/src/rootserver/restore/ob_restore_scheduler.h @@ -57,8 +57,8 @@ public: virtual void do_work() override; void destroy(); public: - virtual int64_t get_rec_log_ts() override { return INT64_MAX;} - virtual int flush(int64_t rec_log_ts) override { return OB_SUCCESS; } + virtual palf::SCN get_rec_scn() override { return palf::SCN::max_scn();} + virtual int flush(palf::SCN &rec_scn) override { return OB_SUCCESS; } int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const palf::SCN &scn) override { UNUSED(buffer); diff --git a/src/share/ob_global_autoinc_service.h b/src/share/ob_global_autoinc_service.h index d7a90b25a..c06a48598 100644 --- a/src/share/ob_global_autoinc_service.h +++ b/src/share/ob_global_autoinc_service.h @@ -153,15 +153,15 @@ public: } // for checkpoint, do nothing - int64_t get_rec_log_ts() override final + palf::SCN get_rec_scn() override final { - return INT64_MAX; + return palf::SCN::max_scn();; } - int flush(int64_t rec_log_ts) override final + int flush(palf::SCN &scn) override final { int ret = OB_SUCCESS; - UNUSED(rec_log_ts); + UNUSED(scn); return ret; } diff --git a/src/storage/backup/ob_backup_task.cpp b/src/storage/backup/ob_backup_task.cpp index 1c98e8532..c2dbd25ab 100644 --- a/src/storage/backup/ob_backup_task.cpp +++ b/src/storage/backup/ob_backup_task.cpp @@ -83,12 +83,14 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob ObLSMetaPackage ls_meta_package; int64_t i = 0; const int64_t start_ts = ObTimeUtility::current_time(); + palf::SCN tmp; + tmp.convert_for_lsn_allocator(start_scn); do { const int64_t cur_ts = ObTimeUtility::current_time(); if (cur_ts - start_ts > advance_checkpoint_timeout) { ret = OB_BACKUP_ADVANCE_CHECKPOINT_TIMEOUT; LOG_WARN("backup advance checkpoint by flush timeout", K(ret), K(tenant_id), K(ls_id), K(start_scn)); - } else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(start_scn))) { + } else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(tmp))) { if (OB_NO_NEED_UPDATE == ret) { // clog checkpoint ts has passed start log ts ret = OB_SUCCESS; diff --git a/src/storage/checkpoint/ob_checkpoint_executor.cpp b/src/storage/checkpoint/ob_checkpoint_executor.cpp index 6ecd3d2e5..730b54325 100644 --- a/src/storage/checkpoint/ob_checkpoint_executor.cpp +++ b/src/storage/checkpoint/ob_checkpoint_executor.cpp @@ -106,11 +106,11 @@ void ObCheckpointExecutor::offline() update_checkpoint_enabled_ = false; } -inline void get_min_rec_log_ts_service_type_by_index_(int index, char* service_type) +inline void get_min_rec_scn_service_type_by_index_(int index, char* service_type) { int ret = OB_SUCCESS; if (index == 0) { - strncpy(service_type ,"MAX_DECIDED_LOG_TS", common::MAX_SERVICE_TYPE_BUF_LENGTH); + strncpy(service_type ,"MAX_DECIDED_SCN", common::MAX_SERVICE_TYPE_BUF_LENGTH); } else if (OB_FAIL(log_base_type_to_string(ObLogBaseType(index), service_type, common::MAX_SERVICE_TYPE_BUF_LENGTH))) { @@ -126,51 +126,52 @@ int ObCheckpointExecutor::update_clog_checkpoint() if (update_checkpoint_enabled_) { ObFreezer *freezer = ls_->get_freezer(); if (OB_NOT_NULL(freezer)) { - int64_t checkpoint_ts = INT64_MAX; - if (OB_FAIL(freezer->get_max_consequent_callbacked_log_ts(checkpoint_ts))) { - STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", K(ret), K(freezer->get_ls_id())); + SCN checkpoint_scn; + checkpoint_scn.set_max(); + if (OB_FAIL(freezer->get_max_consequent_callbacked_scn(checkpoint_scn))) { + STORAGE_LOG(WARN, "get_max_consequent_callbacked_scn failed", K(ret), K(freezer->get_ls_id())); } else { - // used to record which handler provide the smallest rec_log_ts - int min_rec_log_ts_service_type_index = 0; + // used to record which handler provide the smallest rec_scn + int min_rec_scn_service_type_index = 0; char service_type[common::MAX_SERVICE_TYPE_BUF_LENGTH]; for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { if (OB_NOT_NULL(handlers_[i])) { - int64_t rec_log_ts = handlers_[i]->get_rec_log_ts(); - if (rec_log_ts > 0 && rec_log_ts < checkpoint_ts) { - checkpoint_ts = rec_log_ts; - min_rec_log_ts_service_type_index = i; + SCN rec_scn = handlers_[i]->get_rec_scn(); + if (rec_scn.is_valid() && rec_scn < checkpoint_scn) { + checkpoint_scn = rec_scn; + min_rec_scn_service_type_index = i; } } } - get_min_rec_log_ts_service_type_by_index_(min_rec_log_ts_service_type_index, service_type); + get_min_rec_scn_service_type_by_index_(min_rec_scn_service_type_index, service_type); - const int64_t checkpoint_ts_in_ls_meta = ls_->get_clog_checkpoint_ts(); + const SCN checkpoint_scn_in_ls_meta = ls_->get_clog_checkpoint_scn(); const share::ObLSID ls_id = ls_->get_ls_id(); LSN clog_checkpoint_lsn; - if (checkpoint_ts == checkpoint_ts_in_ls_meta) { - STORAGE_LOG(INFO, "[CHECKPOINT] clog checkpoint no change", K(checkpoint_ts), - K(checkpoint_ts_in_ls_meta), K(ls_id), K(service_type)); - } else if (checkpoint_ts < checkpoint_ts_in_ls_meta) { - if (min_rec_log_ts_service_type_index == 0) { + if (checkpoint_scn == checkpoint_scn_in_ls_meta) { + STORAGE_LOG(INFO, "[CHECKPOINT] clog checkpoint no change", K(checkpoint_scn), + K(checkpoint_scn_in_ls_meta), K(ls_id), K(service_type)); + } else if (checkpoint_scn < checkpoint_scn_in_ls_meta) { + if (min_rec_scn_service_type_index == 0) { STORAGE_LOG(INFO, "[CHECKPOINT] expexted when no log callbacked or replayed", - K(checkpoint_ts), K(checkpoint_ts_in_ls_meta), K(ls_id)); + K(checkpoint_scn), K(checkpoint_scn_in_ls_meta), K(ls_id)); } else { - STORAGE_LOG(ERROR, "[CHECKPOINT] can not advance clog checkpoint", K(checkpoint_ts), - K(checkpoint_ts_in_ls_meta), K(ls_id), K(service_type)); + STORAGE_LOG(ERROR, "[CHECKPOINT] can not advance clog checkpoint", K(checkpoint_scn), + K(checkpoint_scn_in_ls_meta), K(ls_id), K(service_type)); } - } else if (OB_FAIL(loghandler_->locate_by_ts_ns_coarsely(checkpoint_ts, clog_checkpoint_lsn))) { + } else if (OB_FAIL(loghandler_->locate_by_scn_coarsely(checkpoint_scn, clog_checkpoint_lsn))) { if (OB_ENTRY_NOT_EXIST == ret) { - STORAGE_LOG(WARN, "no file in disk", K(ret), K(ls_id), K(checkpoint_ts)); + STORAGE_LOG(WARN, "no file in disk", K(ret), K(ls_id), K(checkpoint_scn)); ret = OB_SUCCESS; } else if (OB_NOT_INIT == ret) { - STORAGE_LOG(WARN, "palf has been disabled", K(ret), K(checkpoint_ts), K(ls_->get_ls_id())); + STORAGE_LOG(WARN, "palf has been disabled", K(ret), K(checkpoint_scn), K(ls_->get_ls_id())); ret = OB_SUCCESS; } else { STORAGE_LOG(ERROR, "locate lsn by logts failed", K(ret), K(ls_id), - K(checkpoint_ts), K(checkpoint_ts_in_ls_meta)); + K(checkpoint_scn), K(checkpoint_scn_in_ls_meta)); } - } else if (OB_FAIL(ls_->set_clog_checkpoint_without_lock(clog_checkpoint_lsn, checkpoint_ts))) { - STORAGE_LOG(WARN, "set clog checkpoint failed", K(ret), K(clog_checkpoint_lsn), K(checkpoint_ts), K(ls_id)); + } else if (OB_FAIL(ls_->set_clog_checkpoint_without_lock(clog_checkpoint_lsn, checkpoint_scn))) { + STORAGE_LOG(WARN, "set clog checkpoint failed", K(ret), K(clog_checkpoint_lsn), K(checkpoint_scn), K(ls_id)); } else if (OB_FAIL(loghandler_->advance_base_lsn(clog_checkpoint_lsn))) { if (OB_NOT_INIT == ret) { STORAGE_LOG(WARN, "palf has been disabled", K(ret), K(clog_checkpoint_lsn), K(ls_->get_ls_id())); @@ -181,7 +182,7 @@ int ObCheckpointExecutor::update_clog_checkpoint() } else { ATOMIC_STORE(&wait_advance_checkpoint_, false); FLOG_INFO("[CHECKPOINT] update clog checkpoint successfully", - K(clog_checkpoint_lsn), K(checkpoint_ts), K(ls_id), + K(clog_checkpoint_lsn), K(checkpoint_scn), K(ls_id), K(service_type)); } } @@ -195,49 +196,48 @@ int ObCheckpointExecutor::update_clog_checkpoint() return ret; } -int ObCheckpointExecutor::advance_checkpoint_by_flush(int64_t recycle_ts) { +int ObCheckpointExecutor::advance_checkpoint_by_flush(SCN recycle_scn) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - // calcu recycle_ts according to clog disk situation - if (recycle_ts == 0) { + // calcu recycle_ according to clog disk situation + if (!recycle_scn.is_valid()) { LSN end_lsn; - int64_t calcu_recycle_ts = INT64_MAX; if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) { STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id())); } else { LSN clog_checkpoint_lsn = ls_->get_clog_base_lsn(); LSN calcu_recycle_lsn = clog_checkpoint_lsn + ((end_lsn - clog_checkpoint_lsn) * CLOG_GC_PERCENT / 100); - if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_ts))) { - STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_ts), K(calcu_recycle_lsn), - K(recycle_ts), K(ls_->get_ls_id())); + if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_scn))) { + STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_lsn), + K(recycle_scn), K(ls_->get_ls_id())); } else { STORAGE_LOG(INFO, "advance checkpoint by flush to avoid clog disk full", - K(recycle_ts), K(end_lsn), K(clog_checkpoint_lsn), + K(recycle_scn), K(end_lsn), K(clog_checkpoint_lsn), K(calcu_recycle_lsn), K(ls_->get_ls_id())); } } // the log of end_log_lsn and the log of clog_checkpoint_lsn may be in a block - if (recycle_ts < ls_->get_clog_checkpoint_ts()) { - recycle_ts = INT64_MAX; + if (recycle_scn < ls_->get_clog_checkpoint_scn()) { + recycle_scn.set_max(); } } if (OB_SUCC(ret)) { - if (recycle_ts < ls_->get_clog_checkpoint_ts()) { + if (recycle_scn < ls_->get_clog_checkpoint_scn()) { ret = OB_NO_NEED_UPDATE; - STORAGE_LOG(WARN, "recycle_ts should not smaller than checkpoint_log_ts", - K(recycle_ts), K(ls_->get_clog_checkpoint_ts()), K(ls_->get_ls_id())); + STORAGE_LOG(WARN, "recycle_scn should not smaller than checkpoint_scn", + K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id())); } else { STORAGE_LOG(INFO, "start flush", - K(recycle_ts), - K(ls_->get_clog_checkpoint_ts()), + K(recycle_scn), + K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id())); for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { if (OB_NOT_NULL(handlers_[i]) - && OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_ts)))) { - STORAGE_LOG(WARN, "handler flush failed", K(recycle_ts), K(tmp_ret), + && OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_scn)))) { + STORAGE_LOG(WARN, "handler flush failed", K(recycle_scn), K(tmp_ret), K(i), K(ls_->get_ls_id())); } } @@ -254,7 +254,7 @@ int ObCheckpointExecutor::get_checkpoint_info(ObIArray &chec for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { if (OB_NOT_NULL(handlers_[i])) { ObCheckpointVTInfo info; - info.rec_log_ts = handlers_[i]->get_rec_log_ts(); + info.rec_scn = handlers_[i]->get_rec_scn(); info.service_type = i; checkpoint_array.push_back(info); } @@ -266,13 +266,13 @@ bool ObCheckpointExecutor::need_flush() { int ret = OB_SUCCESS; bool need_flush = false; - int64_t end_log_ts = 0; - if (OB_FAIL(loghandler_->get_end_ts_ns(end_log_ts))) { - STORAGE_LOG(WARN, "get_end_ts_ns failed", K(ret)); - } else if (end_log_ts - - ls_->get_clog_checkpoint_ts() > MAX_NEED_REPLAY_CLOG_INTERVAL) { + palf::SCN end_scn; + if (OB_FAIL(loghandler_->get_end_scn(end_scn))) { + STORAGE_LOG(WARN, "get_end_scn failed", K(ret)); + } else if (end_scn.convert_to_ts() - ls_->get_clog_checkpoint_scn().convert_to_ts() + > MAX_NEED_REPLAY_CLOG_INTERVAL) { STORAGE_LOG(INFO, "over max need replay clog interval", - K(end_log_ts), K(ls_->get_clog_checkpoint_ts())); + K(end_scn), K(ls_->get_clog_checkpoint_scn())); need_flush = true; } @@ -290,10 +290,10 @@ bool ObCheckpointExecutor::is_wait_advance_checkpoint() return ATOMIC_LOAD(&wait_advance_checkpoint_); } -void ObCheckpointExecutor::set_wait_advance_checkpoint(int64_t checkpoint_log_ts) +void ObCheckpointExecutor::set_wait_advance_checkpoint(palf::SCN &checkpoint_scn) { ObSpinLockGuard guard(lock_); - if (checkpoint_log_ts == ls_->get_clog_checkpoint_ts()) { + if (checkpoint_scn == ls_->get_clog_checkpoint_scn()) { ATOMIC_STORE(&wait_advance_checkpoint_, true); last_set_wait_advance_checkpoint_time_ = ObTimeUtility::current_time(); } diff --git a/src/storage/checkpoint/ob_checkpoint_executor.h b/src/storage/checkpoint/ob_checkpoint_executor.h index 9450b8784..58fd13a4a 100644 --- a/src/storage/checkpoint/ob_checkpoint_executor.h +++ b/src/storage/checkpoint/ob_checkpoint_executor.h @@ -16,9 +16,15 @@ #include "lib/lock/ob_spin_lock.h" #include "logservice/ob_log_base_type.h" #include "logservice/ob_log_handler.h" +#include "logservice/palf/scn.h" namespace oceanbase { + +namespace palf +{ +class SCN; +} namespace storage { class ObLS; @@ -27,11 +33,11 @@ namespace checkpoint struct ObCheckpointVTInfo { - int64_t rec_log_ts; + palf::SCN rec_scn; int service_type; TO_STRING_KV( - K(rec_log_ts), + K(rec_scn), K(service_type) ); }; @@ -56,8 +62,8 @@ public: int update_clog_checkpoint(); // the service will flush and advance checkpoint - // after flush, checkpoint_log_ts will be equal or greater than recycle_ts - int advance_checkpoint_by_flush(int64_t recycle_ts = 0); + // after flush, checkpoint_scn will be equal or greater than recycle_scn + int advance_checkpoint_by_flush(palf::SCN recycle_scn); // for __all_virtual_checkpoint int get_checkpoint_info(ObIArray &checkpoint_array); @@ -67,13 +73,13 @@ public: bool is_wait_advance_checkpoint(); - void set_wait_advance_checkpoint(int64_t checkpoint_log_ts); + void set_wait_advance_checkpoint(palf::SCN &checkpoint_scn); int64_t get_cannot_recycle_log_size(); private: static const int64_t CLOG_GC_PERCENT = 60; - static const int64_t MAX_NEED_REPLAY_CLOG_INTERVAL = (int64_t)60 * 60 * 1000 * 1000 * 1000; //ns + static const int64_t MAX_NEED_REPLAY_CLOG_INTERVAL = (int64_t)60 * 60 * 1000 * 1000; //us ObLS *ls_; logservice::ObILogHandler *loghandler_; diff --git a/src/storage/checkpoint/ob_common_checkpoint.h b/src/storage/checkpoint/ob_common_checkpoint.h index 6a8c11e69..e36682e02 100644 --- a/src/storage/checkpoint/ob_common_checkpoint.h +++ b/src/storage/checkpoint/ob_common_checkpoint.h @@ -66,13 +66,13 @@ int common_checkpoint_type_to_string(const ObCommonCheckpointType common_checkpo struct ObCommonCheckpointVTInfo { ObTabletID tablet_id; - int64_t rec_log_ts; + palf::SCN rec_scn; int checkpoint_type; bool is_flushing; TO_STRING_KV( K(tablet_id), - K(rec_log_ts), + K(rec_scn), K(checkpoint_type), K(is_flushing) ); @@ -87,14 +87,12 @@ inline bool is_valid_log_base_type(const ObCommonCheckpointType &type) // and register into ls_tx_service's common_list // the checkpoint units: // 1. write TRANS_SERVICE_LOG_BASE_TYPE clog -// 2. have no freeze operation and rec_log_ts can't become smaller +// 2. have no freeze operation and rec_scn can't become smaller class ObCommonCheckpoint { public: - virtual int64_t get_rec_log_ts() = 0; - virtual palf::SCN get_rec_scn() { return palf::SCN::min_scn(); } - virtual int flush(int64_t recycle_log_ts, bool need_freeze = true) = 0; - virtual int flush(palf::SCN recycle_scn, bool need_freeze = true) { return OB_NOT_SUPPORTED; } + virtual palf::SCN get_rec_scn() = 0; + virtual int flush(palf::SCN recycle_scn, bool need_freeze = true) = 0; virtual ObTabletID get_tablet_id() const = 0; virtual bool is_flushing() const = 0; diff --git a/src/storage/checkpoint/ob_data_checkpoint.cpp b/src/storage/checkpoint/ob_data_checkpoint.cpp index e5ccebc31..35bdda86f 100644 --- a/src/storage/checkpoint/ob_data_checkpoint.cpp +++ b/src/storage/checkpoint/ob_data_checkpoint.cpp @@ -16,6 +16,7 @@ namespace oceanbase { +using namespace palf; namespace storage { namespace checkpoint @@ -57,7 +58,7 @@ int ObCheckpointDList::insert(ObFreezeCheckpoint *ob_freeze_checkpoint, bool ord { int ret = OB_SUCCESS; if (ordered) { - ObFreezeCheckpoint *next = get_first_greater(ob_freeze_checkpoint->get_rec_log_ts()); + ObFreezeCheckpoint *next = get_first_greater(ob_freeze_checkpoint->get_rec_scn()); if (!checkpoint_list_.add_before(next, ob_freeze_checkpoint)) { STORAGE_LOG(ERROR, "add_before failed"); ret = OB_ERR_UNEXPECTED; @@ -81,39 +82,39 @@ void ObCheckpointDList::get_iterator(ObCheckpointIterator &iterator) iterator.init(this); } -int64_t ObCheckpointDList::get_min_rec_log_ts_in_list(bool ordered) +SCN ObCheckpointDList::get_min_rec_scn_in_list(bool ordered) { - int64_t min_rec_log_ts = INT64_MAX; + SCN min_rec_scn = SCN::max_scn(); if (!checkpoint_list_.is_empty()) { ObFreezeCheckpoint *freeze_checkpoint = nullptr; if (ordered) { - min_rec_log_ts = checkpoint_list_.get_first()->get_rec_log_ts(); + min_rec_scn = checkpoint_list_.get_first()->get_rec_scn(); freeze_checkpoint = checkpoint_list_.get_first(); } else { auto *header = checkpoint_list_.get_header(); auto *cur = header->get_next(); while (cur != header) { - if (cur->get_rec_log_ts() < min_rec_log_ts) { - min_rec_log_ts = cur->get_rec_log_ts(); + if (cur->get_rec_scn() < min_rec_scn) { + min_rec_scn = cur->get_rec_scn(); freeze_checkpoint = cur; } cur = cur->get_next(); } } if (OB_NOT_NULL(freeze_checkpoint)) { - STORAGE_LOG(DEBUG, "[CHECKPOINT] get_min_rec_log_ts_in_list", K(min_rec_log_ts), - K(*freeze_checkpoint)); + STORAGE_LOG(DEBUG, "[CHECKPOINT] get_min_rec_scn_in_list", K(min_rec_scn), + K(*freeze_checkpoint)); } } - return min_rec_log_ts; + return min_rec_scn; } -ObFreezeCheckpoint *ObCheckpointDList::get_first_greater(const int64_t rec_log_ts) +ObFreezeCheckpoint *ObCheckpointDList::get_first_greater(const SCN rec_scn) { auto *cur = checkpoint_list_.get_header(); if (!checkpoint_list_.is_empty()) { auto *prev = cur->get_prev(); - while (prev != checkpoint_list_.get_header() && prev->get_rec_log_ts() > rec_log_ts) { + while (prev != checkpoint_list_.get_header() && prev->get_rec_scn() > rec_scn) { cur = prev; prev = cur->get_prev(); } @@ -131,8 +132,8 @@ int ObCheckpointDList::get_freezecheckpoint_info( auto ob_freeze_checkpoint = iterator.get_next(); ObFreezeCheckpointVTInfo info; info.tablet_id = ob_freeze_checkpoint->get_tablet_id().id(); - info.rec_log_ts = ob_freeze_checkpoint->get_rec_log_ts(); - info.rec_log_ts_is_stable = ob_freeze_checkpoint->rec_log_ts_is_stable(); + info.rec_scn = ob_freeze_checkpoint->get_rec_scn(); + info.rec_scn_is_stable = ob_freeze_checkpoint->rec_scn_is_stable(); info.location = ob_freeze_checkpoint->location_; freeze_checkpoint_array.push_back(info); } @@ -194,36 +195,35 @@ int ObDataCheckpoint::safe_to_destroy() return ret; } -int64_t ObDataCheckpoint::get_rec_log_ts() +SCN ObDataCheckpoint::get_rec_scn() { ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_); ObSpinLockGuard guard(lock_); int ret = OB_SUCCESS; - int64_t min_rec_log_ts = INT64_MAX; - int64_t tmp = INT64_MAX; + SCN min_rec_scn = SCN::max_scn(); + SCN tmp = SCN::max_scn(); - // memtable and tx_data_memtable - if ((tmp = new_create_list_.get_min_rec_log_ts_in_list(false)) < min_rec_log_ts && tmp != 0) { - min_rec_log_ts = tmp; + if ((tmp = new_create_list_.get_min_rec_scn_in_list(false)) < min_rec_scn) { + min_rec_scn = tmp; } - if ((tmp = active_list_.get_min_rec_log_ts_in_list()) < min_rec_log_ts && tmp != 0) { - min_rec_log_ts = tmp; + if ((tmp = active_list_.get_min_rec_scn_in_list()) < min_rec_scn) { + min_rec_scn = tmp; } - if ((tmp = ls_frozen_list_.get_min_rec_log_ts_in_list()) < min_rec_log_ts && tmp != 0) { - min_rec_log_ts = tmp; + if ((tmp = ls_frozen_list_.get_min_rec_scn_in_list()) < min_rec_scn) { + min_rec_scn = tmp; } - if ((tmp = prepare_list_.get_min_rec_log_ts_in_list()) < min_rec_log_ts && tmp != 0) { - min_rec_log_ts = tmp; + if ((tmp = prepare_list_.get_min_rec_scn_in_list()) < min_rec_scn) { + min_rec_scn = tmp; } - return min_rec_log_ts; + return min_rec_scn; } -int ObDataCheckpoint::flush(int64_t recycle_log_ts, bool need_freeze) +int ObDataCheckpoint::flush(SCN recycle_scn, bool need_freeze) { int ret = OB_SUCCESS; if (need_freeze) { - if (get_rec_log_ts() <= recycle_log_ts) { + if (get_rec_scn() <= recycle_scn) { if (OB_FAIL(ls_->logstream_freeze())) { STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id())); } @@ -235,12 +235,12 @@ int ObDataCheckpoint::flush(int64_t recycle_log_ts, bool need_freeze) return ret; } -int ObDataCheckpoint::ls_freeze(int64_t rec_log_ts) +int ObDataCheckpoint::ls_freeze(palf::SCN rec_scn) { int ret = OB_SUCCESS; ObCheckPointService *checkpoint_srv = MTL(ObCheckPointService *); set_ls_freeze_finished_(false); - if (OB_FAIL(checkpoint_srv->add_ls_freeze_task(this, rec_log_ts))) { + if (OB_FAIL(checkpoint_srv->add_ls_freeze_task(this, rec_scn))) { STORAGE_LOG(ERROR, "ls_freeze add task failed", K(ret)); set_ls_freeze_finished_(true); } @@ -290,7 +290,7 @@ void ObDataCheckpoint::print_list_(ObCheckpointDList &list) } } -void ObDataCheckpoint::road_to_flush(int64_t rec_log_ts) +void ObDataCheckpoint::road_to_flush(palf::SCN rec_scn) { if (OB_UNLIKELY(!is_inited_)) { STORAGE_LOG(WARN, "ObDataCheckpoint not init", K(is_inited_)); @@ -312,7 +312,7 @@ void ObDataCheckpoint::road_to_flush(int64_t rec_log_ts) ObFreezeCheckpoint *last = nullptr; { ObSpinLockGuard guard(lock_); - last = active_list_.get_first_greater(rec_log_ts); + last = active_list_.get_first_greater(rec_scn); } pop_range_to_ls_frozen_(last, active_list_); last_time = common::ObTimeUtility::fast_current_time(); @@ -518,8 +518,8 @@ int ObDataCheckpoint::traversal_flush_() { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - // Because prepare list is ordered based on rec_log_ts and we want to flush - // based on the order of rec_log_ts. So we should can simply use a small + // Because prepare list is ordered based on rec_scn and we want to flush + // based on the order of rec_scn. So we should can simply use a small // number for flush tasks. const int MAX_DATA_CHECKPOINT_FLUSH_COUNT = 10000; ObSEArray flush_tasks; diff --git a/src/storage/checkpoint/ob_data_checkpoint.h b/src/storage/checkpoint/ob_data_checkpoint.h index ec74129cb..82ce8df97 100644 --- a/src/storage/checkpoint/ob_data_checkpoint.h +++ b/src/storage/checkpoint/ob_data_checkpoint.h @@ -17,6 +17,7 @@ #include "storage/checkpoint/ob_common_checkpoint.h" #include "lib/lock/ob_spin_lock.h" #include "storage/checkpoint/ob_freeze_checkpoint.h" +#include "logservice/palf/scn.h" namespace oceanbase { @@ -39,8 +40,8 @@ struct ObCheckpointDList int unlink(ObFreezeCheckpoint *ob_freeze_checkpoint); int insert(ObFreezeCheckpoint *ob_freeze_checkpoint, bool ordered = true); void get_iterator(ObCheckpointIterator &iterator); - int64_t get_min_rec_log_ts_in_list(bool ordered = true); - ObFreezeCheckpoint *get_first_greater(const int64_t rec_log_ts); + palf::SCN get_min_rec_scn_in_list(bool ordered = true); + ObFreezeCheckpoint *get_first_greater(const palf::SCN rec_scn); int get_freezecheckpoint_info( ObIArray &freeze_checkpoint_array); @@ -65,7 +66,6 @@ private: }; // responsible for maintenance transaction checkpoint unit -// including data_memtable, tx_data_memtable class ObDataCheckpoint : public ObCommonCheckpoint { friend class ObFreezeCheckpoint; @@ -88,21 +88,21 @@ public: static const uint64_t LS_DATA_CHECKPOINT_TABLET_ID = 40000; int init(ObLS *ls); int safe_to_destroy(); - int64_t get_rec_log_ts() override; - // if min_rec_log_ts <= the input rec_log_ts + palf::SCN get_rec_scn(); + // if min_rec_scn <= the input rec_scn // logstream freeze - int flush(int64_t recycle_log_ts, bool need_freeze = true) override; - // if min_rec_log_ts <= the input rec_log_ts + int flush(palf::SCN recycle_scn, bool need_freeze = true); + // if min_rec_scn <= the input rec_scn // add ls_freeze task // logstream freeze optimization - int ls_freeze(int64_t rec_log_ts); + int ls_freeze(palf::SCN rec_scn); // logstream_freeze schedule and minor merge schedule - void road_to_flush(int64_t rec_log_ts); + void road_to_flush(palf::SCN rec_scn); // ObFreezeCheckpoint register into ObDataCheckpoint int add_to_new_create(ObFreezeCheckpoint *ob_freeze_checkpoint); // remove from prepare_list when finish minor_merge int unlink_from_prepare(ObFreezeCheckpoint *ob_freeze_checkpoint); - // timer to tranfer freeze_checkpoint that rec_log_ts is stable from new_create_list to + // timer to tranfer freeze_checkpoint that rec_scn is stable from new_create_list to // active_list int check_can_move_to_active_in_newcreate(); diff --git a/src/storage/checkpoint/ob_freeze_checkpoint.cpp b/src/storage/checkpoint/ob_freeze_checkpoint.cpp index bd29938ea..8b21af511 100644 --- a/src/storage/checkpoint/ob_freeze_checkpoint.cpp +++ b/src/storage/checkpoint/ob_freeze_checkpoint.cpp @@ -100,8 +100,8 @@ int ObFreezeCheckpoint::check_can_move_to_active(bool is_ls_freeze) { int ret = OB_SUCCESS; if (location_ != ACTIVE) { - // only when the unit rec_log_ts is stable that can be moved to ordered_active_list - if (rec_log_ts_is_stable()) { + // only when the unit rec_scn is stable that can be moved to ordered_active_list + if (rec_scn_is_stable()) { if (OB_FAIL(move_to_active_(is_ls_freeze))) { STORAGE_LOG(ERROR, "transfer to active failed", K(ret)); } diff --git a/src/storage/checkpoint/ob_freeze_checkpoint.h b/src/storage/checkpoint/ob_freeze_checkpoint.h index 0dd1695fe..2e33f3d38 100644 --- a/src/storage/checkpoint/ob_freeze_checkpoint.h +++ b/src/storage/checkpoint/ob_freeze_checkpoint.h @@ -17,6 +17,7 @@ #include "lib/utility/ob_print_utils.h" #include "share/ob_ls_id.h" #include "common/ob_tablet_id.h" +#include "logservice/palf/scn.h" namespace oceanbase { @@ -60,21 +61,21 @@ int freeze_checkpoint_location_to_string(const ObFreezeCheckpointLocation locati struct ObFreezeCheckpointVTInfo { ObTabletID tablet_id; - int64_t rec_log_ts; + palf::SCN rec_scn; ObFreezeCheckpointLocation location; - bool rec_log_ts_is_stable; + bool rec_scn_is_stable; TO_STRING_KV( K(tablet_id), - K(rec_log_ts), + K(rec_scn), K(location), - K(rec_log_ts_is_stable) + K(rec_scn_is_stable) ); }; // checkpoint unit like data_memtable and memtable that // 1. write TRANS_SERVICE_LOG_BASE_TYPE clog -// 2. have freeze operation and rec_log_ts can become smaller +// 2. have freeze operation and rec_scn can become smaller // inherit from ObFreezeCheckpoint // register into ObDataCheckpoint class ObFreezeCheckpoint : public common::ObDLinkBase @@ -86,12 +87,12 @@ public: ObFreezeCheckpoint() : location_(OUT), data_checkpoint_(nullptr) {} virtual ~ObFreezeCheckpoint() {} virtual void remove_from_data_checkpoint(bool need_lock_data_checkpoint = true); - virtual int64_t get_rec_log_ts() = 0; + virtual palf::SCN get_rec_scn() = 0; virtual int flush(share::ObLSID ls_id) = 0; - // judge rec_log_ts of the checkpoint unit won't get smaller - // by comparing with max_consequent_callbacked_log_ts - // a unit will only be moved once by rec_log_ts_stable_ - virtual bool rec_log_ts_is_stable() = 0; + // judge rec_scn of the checkpoint unit won't get smaller + // by comparing with max_consequent_callbacked_scn + // a unit will only be moved once by rec_scn_stable_ + virtual bool rec_scn_is_stable() = 0; // Whether the dump conditions are met virtual bool ready_for_flush() = 0; // avoid active checkpoint block minor merge @@ -104,7 +105,7 @@ public: int add_to_data_checkpoint(ObDataCheckpoint *data_checkpoint); bool is_in_prepare_list_of_data_checkpoint(); // transfer to active_list in ObDataCheckpoint - // when the checkpoint unit rec_log_ts_is_stable + // when the checkpoint unit rec_scn_is_stable // @param[in] is_ls_freeze, whether the process is triggered by logstream_freeze int check_can_move_to_active(bool is_ls_freeze = false); // after checkpoint ready_for_flush diff --git a/src/storage/high_availability/ob_ls_prepare_migration.cpp b/src/storage/high_availability/ob_ls_prepare_migration.cpp index 26fcfabdd..a6e839012 100644 --- a/src/storage/high_availability/ob_ls_prepare_migration.cpp +++ b/src/storage/high_availability/ob_ls_prepare_migration.cpp @@ -1026,6 +1026,8 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_() LOG_WARN("checkpoint executor should not be NULL", K(ret), KPC(ctx_), KP(checkpoint_executor)); } else { const int64_t wait_checkpoint_push_start_ts = ObTimeUtility::current_time(); + palf::SCN tmp; + tmp.convert_for_lsn_allocator(ctx_->log_sync_scn_); while (OB_SUCC(ret)) { if (ctx_->is_failed()) { ret = OB_CANCELED; @@ -1041,7 +1043,7 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_() const int64_t cost_ts = ObTimeUtility::current_time() - wait_checkpoint_push_start_ts; LOG_INFO("succeed wait clog checkpoint ts push", "cost", cost_ts, "ls_id", ctx_->arg_.ls_id_); break; - } else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(ctx_->log_sync_scn_))) { + } else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(tmp))) { if (OB_NO_NEED_UPDATE == ret) { ret = OB_SUCCESS; } else { diff --git a/src/storage/ls/ob_freezer.cpp b/src/storage/ls/ob_freezer.cpp index 49c790300..7cbd46166 100644 --- a/src/storage/ls/ob_freezer.cpp +++ b/src/storage/ls/ob_freezer.cpp @@ -343,7 +343,7 @@ int ObFreezer::inner_logstream_freeze() uint32_t freeze_clock = get_freeze_clock(); TRANS_LOG(INFO, "[Freezer] freeze_clock", K(ret), K_(ls_id), K(freeze_clock)); - if (OB_FAIL(data_checkpoint_->ls_freeze(INT64_MAX))) { + if (OB_FAIL(data_checkpoint_->ls_freeze(palf::SCN::max_scn()))) { // move memtables from active_list to frozen_list TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K_(ls_id)); stat_.add_diagnose_info("data_checkpoint freeze failed"); diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index e83eaeff2..14a576283 100644 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -1208,13 +1208,14 @@ int ObLS::flush_if_need(const bool need_flush) int ObLS::flush_if_need_(const bool need_flush) { int ret = OB_SUCCESS; - int64_t clog_checkpoint_ts = get_clog_checkpoint_ts(); + palf::SCN clog_checkpoint_scn = get_clog_checkpoint_scn(); + palf::SCN invalid_scn; if ((!need_flush && !checkpoint_executor_.need_flush()) || checkpoint_executor_.is_wait_advance_checkpoint()) { STORAGE_LOG(INFO, "the ls no need flush to advance_checkpoint", K(get_ls_id())); - } else if (OB_FAIL(checkpoint_executor_.advance_checkpoint_by_flush())) { + } else if (OB_FAIL(checkpoint_executor_.advance_checkpoint_by_flush(invalid_scn))) { STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", KR(ret), K(get_ls_id())); } else { - checkpoint_executor_.set_wait_advance_checkpoint(clog_checkpoint_ts); + checkpoint_executor_.set_wait_advance_checkpoint(clog_checkpoint_scn); } return ret; } diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 6c2c50384..1bef7ca5e 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -306,6 +306,7 @@ public: UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_clog_checkpoint); UPDATE_LSMETA_WITHOUT_LOCK(ls_meta_, set_clog_checkpoint); CONST_DELEGATE_WITH_RET(ls_meta_, get_clog_checkpoint_ts, int64_t); + CONST_DELEGATE_WITH_RET(ls_meta_, get_clog_checkpoint_scn, palf::SCN); DELEGATE_WITH_RET(ls_meta_, get_clog_base_lsn, palf::LSN &); DELEGATE_WITH_RET(ls_meta_, get_saved_info, int); // int build_saved_info(); diff --git a/src/storage/ls/ob_ls_ddl_log_handler.cpp b/src/storage/ls/ob_ls_ddl_log_handler.cpp index 9c2b7ac56..886dcbca9 100644 --- a/src/storage/ls/ob_ls_ddl_log_handler.cpp +++ b/src/storage/ls/ob_ls_ddl_log_handler.cpp @@ -20,6 +20,7 @@ #include "storage/tablet/ob_tablet_iterator.h" #include "storage/ddl/ob_tablet_ddl_kv_mgr.h" #include "logservice/ob_log_base_header.h" +#include "logservice/palf/scn.h" namespace oceanbase { @@ -211,7 +212,7 @@ int ObLSDDLLogHandler::resume_leader() return ret; } -int ObLSDDLLogHandler::flush(int64_t rec_log_ts) +int ObLSDDLLogHandler::flush(palf::SCN &rec_scn) { int ret = OB_SUCCESS; ObLSTabletIterator tablet_iter(ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US); @@ -242,7 +243,7 @@ int ObLSDDLLogHandler::flush(int64_t rec_log_ts) ObDDLTableMergeDagParam param; param.ls_id_ = ls_->get_ls_id(); param.tablet_id_ = tablet_handle.get_obj()->get_tablet_meta().tablet_id_; - param.rec_log_ts_ = rec_log_ts; + param.rec_log_ts_ = rec_scn.convert_to_ts(); if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) { if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) { LOG_WARN("failed to schedule ddl kv merge dag", K(ret)); @@ -255,6 +256,13 @@ int ObLSDDLLogHandler::flush(int64_t rec_log_ts) return OB_SUCCESS; } +palf::SCN ObLSDDLLogHandler::get_rec_scn() +{ + palf::SCN tmp; + tmp.convert_for_lsn_allocator(get_rec_log_ts()); + return tmp; +} + int64_t ObLSDDLLogHandler::get_rec_log_ts() { int ret = OB_SUCCESS; diff --git a/src/storage/ls/ob_ls_ddl_log_handler.h b/src/storage/ls/ob_ls_ddl_log_handler.h index 8f7198956..2378c264e 100644 --- a/src/storage/ls/ob_ls_ddl_log_handler.h +++ b/src/storage/ls/ob_ls_ddl_log_handler.h @@ -53,8 +53,9 @@ public: int resume_leader() override final; // for checkpoint - int flush(int64_t rec_log_ts) override final; - int64_t get_rec_log_ts() override final; + int flush(palf::SCN &rec_scn) override final; + int64_t get_rec_log_ts(); + palf::SCN get_rec_scn() override final; private: int replay_ddl_redo_log_(const char *log_buf, const int64_t buf_size, int64_t pos, const int64_t log_ts); int replay_ddl_prepare_log_(const char *log_buf, const int64_t buf_size, int64_t pos, const int64_t log_ts); diff --git a/src/storage/ls/ob_ls_meta.cpp b/src/storage/ls/ob_ls_meta.cpp index e2823b77a..2ce776345 100644 --- a/src/storage/ls/ob_ls_meta.cpp +++ b/src/storage/ls/ob_ls_meta.cpp @@ -49,7 +49,7 @@ ObLSMeta::ObLSMeta() ls_id_(), replica_type_(REPLICA_TYPE_MAX), ls_create_status_(ObInnerLSStatus::CREATING), - clog_checkpoint_ts_(0), + clog_checkpoint_scn_(), clog_base_lsn_(PALF_INITIAL_LSN_VAL), rebuild_seq_(0), migration_status_(ObMigrationStatus::OB_MIGRATION_STATUS_MAX), @@ -69,7 +69,7 @@ ObLSMeta::ObLSMeta(const ObLSMeta &ls_meta) ls_id_(ls_meta.ls_id_), replica_type_(ls_meta.replica_type_), ls_create_status_(ls_meta.ls_create_status_), - clog_checkpoint_ts_(ls_meta.clog_checkpoint_ts_), + clog_checkpoint_scn_(ls_meta.clog_checkpoint_scn_), clog_base_lsn_(ls_meta.clog_base_lsn_), rebuild_seq_(ls_meta.rebuild_seq_), migration_status_(ls_meta.migration_status_), @@ -94,7 +94,7 @@ ObLSMeta &ObLSMeta::operator=(const ObLSMeta &other) rebuild_seq_ = other.rebuild_seq_; migration_status_ = other.migration_status_; clog_base_lsn_ = other.clog_base_lsn_; - clog_checkpoint_ts_ = other.clog_checkpoint_ts_; + clog_checkpoint_scn_ = other.clog_checkpoint_scn_; gc_state_ = other.gc_state_; offline_scn_ = other.offline_scn_; restore_status_ = other.restore_status_; @@ -113,7 +113,7 @@ void ObLSMeta::reset() ls_id_.reset(); replica_type_ = REPLICA_TYPE_MAX; clog_base_lsn_.reset(); - clog_checkpoint_ts_ = 0; + clog_checkpoint_scn_.reset(); rebuild_seq_ = 0; migration_status_ = ObMigrationStatus::OB_MIGRATION_STATUS_MAX; gc_state_ = LSGCState::INVALID_LS_GC_STATE; @@ -130,20 +130,26 @@ LSN &ObLSMeta::get_clog_base_lsn() return clog_base_lsn_; } +SCN ObLSMeta::get_clog_checkpoint_scn() const +{ + ObSpinLockTimeGuard guard(lock_); + return clog_checkpoint_scn_; +} + int64_t ObLSMeta::get_clog_checkpoint_ts() const { ObSpinLockTimeGuard guard(lock_); - return clog_checkpoint_ts_; + return clog_checkpoint_scn_.get_val_for_lsn_allocator(); } int ObLSMeta::set_clog_checkpoint(const LSN &clog_checkpoint_lsn, - const int64_t clog_checkpoint_ts, + const SCN clog_checkpoint_scn, const bool write_slog) { ObSpinLockTimeGuard guard(lock_); ObLSMeta tmp(*this); tmp.clog_base_lsn_ = clog_checkpoint_lsn; - tmp.clog_checkpoint_ts_ = clog_checkpoint_ts; + tmp.clog_checkpoint_scn_ = clog_checkpoint_scn; int ret = OB_SUCCESS; if (write_slog) { @@ -153,7 +159,7 @@ int ObLSMeta::set_clog_checkpoint(const LSN &clog_checkpoint_lsn, } clog_base_lsn_ = clog_checkpoint_lsn; - clog_checkpoint_ts_ = clog_checkpoint_ts; + clog_checkpoint_scn_ = clog_checkpoint_scn; return ret; } @@ -401,7 +407,7 @@ int ObLSMeta::update_ls_meta( ObSpinLockTimeGuard guard(lock_); ObLSMeta tmp(*this); tmp.clog_base_lsn_ = src_ls_meta.clog_base_lsn_; - tmp.clog_checkpoint_ts_ = src_ls_meta.clog_checkpoint_ts_; + tmp.clog_checkpoint_scn_ = src_ls_meta.clog_checkpoint_scn_; tmp.replayable_point_ = src_ls_meta.replayable_point_; tmp.tablet_change_checkpoint_scn_ = src_ls_meta.tablet_change_checkpoint_scn_; if (update_restore_status) { @@ -409,7 +415,7 @@ int ObLSMeta::update_ls_meta( } guard.click(); tmp.all_id_meta_.update_all_id_meta(src_ls_meta.all_id_meta_); - if (tmp.clog_checkpoint_ts_ < clog_checkpoint_ts_) { + if (tmp.clog_checkpoint_scn_ < clog_checkpoint_scn_) { // TODO: now do not allow clog checkpoint ts rollback, may support it in 4.1 ret = OB_ERR_UNEXPECTED; LOG_WARN("do not allow clog checkpoint ts rollback", K(ret), K(src_ls_meta), KPC(this)); @@ -418,7 +424,7 @@ int ObLSMeta::update_ls_meta( } else { guard.click(); clog_base_lsn_ = src_ls_meta.clog_base_lsn_; - clog_checkpoint_ts_ = src_ls_meta.clog_checkpoint_ts_; + clog_checkpoint_scn_ = src_ls_meta.clog_checkpoint_scn_; replayable_point_ = src_ls_meta.replayable_point_; tablet_change_checkpoint_scn_ = src_ls_meta.tablet_change_checkpoint_scn_; @@ -535,7 +541,7 @@ int ObLSMeta::build_saved_info() ret = OB_ERR_UNEXPECTED; LOG_WARN("saved info is not empty, can not build saved info", K(ret), K(*this)); } else { - saved_info.clog_checkpoint_ts_ = clog_checkpoint_ts_; + saved_info.clog_checkpoint_ts_ = clog_checkpoint_scn_.convert_to_ts(); saved_info.clog_base_lsn_ = clog_base_lsn_; saved_info.tablet_change_checkpoint_scn_ = tablet_change_checkpoint_scn_; ObLSMeta tmp(*this); @@ -592,7 +598,7 @@ int ObLSMeta::init( ls_id_ = ls_id; replica_type_ = replica_type; ls_create_status_ = ObInnerLSStatus::CREATING; - clog_checkpoint_ts_ = create_scn; + clog_checkpoint_scn_.convert_for_lsn_allocator(create_scn); clog_base_lsn_.val_ = PALF_INITIAL_LSN_VAL; rebuild_seq_ = 0; migration_status_ = migration_status; @@ -641,7 +647,7 @@ OB_SERIALIZE_MEMBER(ObLSMeta, ls_id_, replica_type_, ls_create_status_, - clog_checkpoint_ts_, + clog_checkpoint_scn_, clog_base_lsn_, rebuild_seq_, migration_status_, diff --git a/src/storage/ls/ob_ls_meta.h b/src/storage/ls/ob_ls_meta.h index 1680e24cb..ba15141b3 100644 --- a/src/storage/ls/ob_ls_meta.h +++ b/src/storage/ls/ob_ls_meta.h @@ -46,10 +46,11 @@ public: ObLSMeta(const ObLSMeta &ls_meta); ~ObLSMeta() {} ObLSMeta &operator=(const ObLSMeta &other); + palf::SCN get_clog_checkpoint_scn() const; int64_t get_clog_checkpoint_ts() const; palf::LSN &get_clog_base_lsn(); int set_clog_checkpoint(const palf::LSN &clog_checkpoint_lsn, - const int64_t clog_checkpoint_ts, + const palf::SCN clog_checkpoint_scn, const bool write_slog = true); void reset(); bool is_valid() const; @@ -108,7 +109,7 @@ public: ObSpinLockGuard lock_guard_; }; TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(replica_type), K_(ls_create_status), - K_(clog_checkpoint_ts), K_(clog_base_lsn), + K_(clog_checkpoint_scn), K_(clog_base_lsn), K_(rebuild_seq), K_(migration_status), K(gc_state_), K(offline_scn_), K_(restore_status), K_(replayable_point), K_(tablet_change_checkpoint_scn), K_(all_id_meta)); @@ -125,10 +126,10 @@ private: void set_write_slog_func_(WriteSlog write_slog); static WriteSlog write_slog_; - // clog_checkpoint_ts_, meaning: - // 1. dump points of all modules have exceeded clog_checkpoint_ts_ - // 2. all clog entries which log_ts are smaller than clog_checkpoint_ts_ can be recycled - int64_t clog_checkpoint_ts_; + // clog_checkpoint_scn_, meaning: + // 1. dump points of all modules have exceeded clog_checkpoint_scn_ + // 2. all clog entries which log_scn are smaller than clog_checkpoint_scn_ can be recycled + palf::SCN clog_checkpoint_scn_; // clog_base_lsn_, meaning: // 1. all clog entries which lsn are smaller than clog_base_lsn_ have been recycled // 2. log_ts of log entry that clog_base_lsn_ points to is smaller than/equal to clog_checkpoint_ts_ diff --git a/src/storage/ls/ob_ls_sync_tablet_seq_handler.cpp b/src/storage/ls/ob_ls_sync_tablet_seq_handler.cpp index 8b87e4d67..7c3dd1d8d 100644 --- a/src/storage/ls/ob_ls_sync_tablet_seq_handler.cpp +++ b/src/storage/ls/ob_ls_sync_tablet_seq_handler.cpp @@ -112,17 +112,16 @@ int ObLSSyncTabletSeqHandler::resume_leader() return ret; } -int ObLSSyncTabletSeqHandler::flush(int64_t rec_log_ts) +int ObLSSyncTabletSeqHandler::flush(palf::SCN &scn) { // TODO - UNUSED(rec_log_ts); + UNUSED(scn); return OB_SUCCESS; } -int64_t ObLSSyncTabletSeqHandler::get_rec_log_ts() +palf::SCN ObLSSyncTabletSeqHandler::get_rec_scn() { - // TODO - return INT64_MAX; + return palf::SCN::max_scn(); } } diff --git a/src/storage/ls/ob_ls_sync_tablet_seq_handler.h b/src/storage/ls/ob_ls_sync_tablet_seq_handler.h index 11592e7cc..b6d693b72 100644 --- a/src/storage/ls/ob_ls_sync_tablet_seq_handler.h +++ b/src/storage/ls/ob_ls_sync_tablet_seq_handler.h @@ -49,8 +49,8 @@ public: int resume_leader() override final; // for checkpoint - int flush(int64_t rec_log_ts) override final; - int64_t get_rec_log_ts() override final; + int flush(palf::SCN &scn) override final; + palf::SCN get_rec_scn() override final; private: bool is_inited_; diff --git a/src/storage/ls/ob_ls_tablet_service.cpp b/src/storage/ls/ob_ls_tablet_service.cpp index d35bd4e0c..7f75f06bf 100644 --- a/src/storage/ls/ob_ls_tablet_service.cpp +++ b/src/storage/ls/ob_ls_tablet_service.cpp @@ -21,6 +21,7 @@ #include "logservice/ob_log_base_header.h" #include "logservice/ob_log_base_type.h" #include "logservice/ob_log_service.h" +#include "logservice/palf/scn.h" #include "observer/report/ob_i_meta_report.h" #include "share/ob_disk_usage_table_operator.h" #include "share/ob_rpc_struct.h" @@ -197,15 +198,15 @@ int ObLSTabletService::resume_leader() return ret; } -int ObLSTabletService::flush(int64_t rec_log_ts) +int ObLSTabletService::flush(palf::SCN &recycle_scn) { - UNUSED(rec_log_ts); + UNUSED(recycle_scn); return OB_SUCCESS; } -int64_t ObLSTabletService::get_rec_log_ts() +palf::SCN ObLSTabletService::get_rec_scn() { - return INT64_MAX; + return palf::SCN::max_scn(); } int ObLSTabletService::prepare_for_safe_destroy() diff --git a/src/storage/ls/ob_ls_tablet_service.h b/src/storage/ls/ob_ls_tablet_service.h index 0d4562c9e..02242e9b0 100644 --- a/src/storage/ls/ob_ls_tablet_service.h +++ b/src/storage/ls/ob_ls_tablet_service.h @@ -122,8 +122,9 @@ private: virtual int resume_leader() override; // for checkpoint - virtual int flush(int64_t rec_log_ts) override; - virtual int64_t get_rec_log_ts() override; + virtual int flush(palf::SCN &recycle_scn) override; + virtual palf::SCN get_rec_scn() override; + public: int prepare_for_safe_destroy(); int safe_to_destroy(bool &is_safe); diff --git a/src/storage/ls/ob_ls_tx_service.cpp b/src/storage/ls/ob_ls_tx_service.cpp index 278644174..aaa9b1f59 100644 --- a/src/storage/ls/ob_ls_tx_service.cpp +++ b/src/storage/ls/ob_ls_tx_service.cpp @@ -399,8 +399,8 @@ int ObLSTxService::resume_leader() } inline -void get_min_rec_log_ts_common_checkpoint_type_by_index_(int index, - char *common_checkpoint_type) +void get_min_rec_scn_common_checkpoint_type_by_index_(int index, + char *common_checkpoint_type) { int ret = OB_SUCCESS; if (index == 0) { @@ -415,39 +415,39 @@ void get_min_rec_log_ts_common_checkpoint_type_by_index_(int index, } } -int64_t ObLSTxService::get_rec_log_ts() +palf::SCN ObLSTxService::get_rec_scn() { - int64_t min_rec_log_ts = INT64_MAX; - int min_rec_log_ts_common_checkpoint_type_index = 0; + palf::SCN min_rec_scn = palf::SCN::max_scn(); + int min_rec_scn_common_checkpoint_type_index = 0; char common_checkpoint_type[common::MAX_CHECKPOINT_TYPE_BUF_LENGTH]; for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) { if (OB_NOT_NULL(common_checkpoints_[i])) { - int64_t rec_log_ts = common_checkpoints_[i]->get_rec_log_ts(); - if (rec_log_ts > 0 && rec_log_ts < min_rec_log_ts) { - min_rec_log_ts = rec_log_ts; - min_rec_log_ts_common_checkpoint_type_index = i; + palf::SCN rec_scn = common_checkpoints_[i]->get_rec_scn(); + if (rec_scn.is_valid() && rec_scn < min_rec_scn) { + min_rec_scn = rec_scn; + min_rec_scn_common_checkpoint_type_index = i; } } } - get_min_rec_log_ts_common_checkpoint_type_by_index_(min_rec_log_ts_common_checkpoint_type_index, - common_checkpoint_type); + get_min_rec_scn_common_checkpoint_type_by_index_(min_rec_scn_common_checkpoint_type_index, + common_checkpoint_type); - TRANS_LOG(INFO, "[CHECKPOINT] ObLSTxService::get_rec_log_ts", + TRANS_LOG(INFO, "[CHECKPOINT] ObLSTxService::get_rec_scn", K(common_checkpoint_type), - KPC(common_checkpoints_[min_rec_log_ts_common_checkpoint_type_index]), - K(min_rec_log_ts), K(ls_id_)); + KPC(common_checkpoints_[min_rec_scn_common_checkpoint_type_index]), + K(min_rec_scn), K(ls_id_)); - return min_rec_log_ts; + return min_rec_scn; } -int ObLSTxService::flush(int64_t rec_log_ts) +int ObLSTxService::flush(palf::SCN &recycle_scn) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) { // only flush the common_checkpoint that whose clog need recycle - if (OB_NOT_NULL(common_checkpoints_[i]) && rec_log_ts >= common_checkpoints_[i]->get_rec_log_ts()) { - if (OB_SUCCESS != (tmp_ret = common_checkpoints_[i]->flush(rec_log_ts))) { + if (OB_NOT_NULL(common_checkpoints_[i]) && recycle_scn >= common_checkpoints_[i]->get_rec_scn()) { + if (OB_SUCCESS != (tmp_ret = common_checkpoints_[i]->flush(recycle_scn))) { TRANS_LOG(WARN, "obCommonCheckpoint flush failed", K(tmp_ret), K(common_checkpoints_[i])); } } @@ -467,7 +467,7 @@ int ObLSTxService::get_common_checkpoint_info( } else { ObCommonCheckpointVTInfo info; info.tablet_id = common_checkpoint->get_tablet_id(); - info.rec_log_ts = common_checkpoint->get_rec_log_ts(); + info.rec_scn = common_checkpoint->get_rec_scn(); info.checkpoint_type = i; info.is_flushing = common_checkpoint->is_flushing(); common_checkpoint_array.push_back(info); @@ -540,7 +540,7 @@ int ObLSTxService::traversal_flush() int tmp_ret = OB_SUCCESS; for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) { if (OB_NOT_NULL(common_checkpoints_[i]) && - OB_SUCCESS != (tmp_ret = common_checkpoints_[i]->flush(INT64_MAX, false))) { + OB_SUCCESS != (tmp_ret = common_checkpoints_[i]->flush(palf::SCN::max_scn(), false))) { TRANS_LOG(WARN, "obCommonCheckpoint flush failed", K(tmp_ret), KP(common_checkpoints_[i])); } } diff --git a/src/storage/ls/ob_ls_tx_service.h b/src/storage/ls/ob_ls_tx_service.h index b67f637d4..0d6b14a23 100644 --- a/src/storage/ls/ob_ls_tx_service.h +++ b/src/storage/ls/ob_ls_tx_service.h @@ -136,8 +136,8 @@ public: int switch_to_follower_gracefully(); int resume_leader(); - int64_t get_rec_log_ts(); - int flush(int64_t rec_log_ts); + palf::SCN get_rec_scn() override; + int flush(palf::SCN &recycle_scn) override; int get_common_checkpoint_info( ObIArray &common_checkpoint_array); diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 0535d683f..9fa52c644 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -1578,7 +1578,7 @@ int ObMemtable::set_max_end_log_ts(const int64_t log_ts) return ret; } -bool ObMemtable::rec_log_ts_is_stable() +bool ObMemtable::rec_scn_is_stable() { int ret = OB_SUCCESS; bool rec_log_ts_is_stable = false; @@ -1890,6 +1890,13 @@ int ObMemtable::flush(share::ObLSID ls_id) return ret; } +palf::SCN ObMemtable::get_rec_scn() +{ + palf::SCN tmp; + tmp.convert_for_lsn_allocator(get_rec_log_ts()); + return tmp; +} + bool ObMemtable::is_active_memtable() const { return !is_frozen_memtable(); diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index b738c7ed4..71d7389bf 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -370,11 +370,12 @@ public: /* freeze */ virtual int set_frozen() override { local_allocator_.set_frozen(); return OB_SUCCESS; } - virtual bool rec_log_ts_is_stable() override; + virtual bool rec_scn_is_stable() override; virtual bool ready_for_flush() override; void print_ready_for_flush(); virtual int flush(share::ObLSID ls_id) override; - virtual int64_t get_rec_log_ts() override { return ATOMIC_LOAD(&rec_log_ts_); } + virtual int64_t get_rec_log_ts() { return ATOMIC_LOAD(&rec_log_ts_); } + virtual palf::SCN get_rec_scn(); virtual bool is_frozen_checkpoint() const override { return is_frozen_memtable();} virtual bool is_active_checkpoint() const override { return is_active_memtable();} diff --git a/src/storage/tablelock/ob_lock_memtable.cpp b/src/storage/tablelock/ob_lock_memtable.cpp index dcfcbc1ce..39de6d3e7 100644 --- a/src/storage/tablelock/ob_lock_memtable.cpp +++ b/src/storage/tablelock/ob_lock_memtable.cpp @@ -735,17 +735,6 @@ int ObLockMemtable::get_frozen_schema_version(int64_t &schema_version) const return OB_NOT_SUPPORTED; } -int64_t ObLockMemtable::get_rec_log_ts() -{ - // no need lock because rec_scn_ aesc except INT64_MAX - // TODO: cxf remove this - palf::SCN tmp; - int64_t tmp_rec_log_ts = INT64_MAX; - tmp = get_rec_scn(); - tmp_rec_log_ts = tmp == palf::SCN::max_scn() ? INT64_MAX : tmp.get_val_for_lsn_allocator(); - return tmp_rec_log_ts; -} - palf::SCN ObLockMemtable::get_rec_scn() { // no need lock because rec_scn_ aesc except INT64_MAX @@ -805,17 +794,8 @@ bool ObLockMemtable::is_active_memtable() const return !ATOMIC_LOAD(&is_frozen_); } -int ObLockMemtable::flush(int64_t recycle_log_ts, bool need_freeze) -{ - int ret = OB_SUCCESS; - palf::SCN tmp; - tmp.convert_for_lsn_allocator(recycle_log_ts); - ret = flush(tmp, need_freeze); - return ret; -} - -int ObLockMemtable::flush(const palf::SCN &recycle_scn, - const bool need_freeze) +int ObLockMemtable::flush(palf::SCN recycle_scn, + bool need_freeze) { int ret = OB_SUCCESS; UNUSED(need_freeze); diff --git a/src/storage/tablelock/ob_lock_memtable.h b/src/storage/tablelock/ob_lock_memtable.h index 56da8b80a..c40c2795e 100644 --- a/src/storage/tablelock/ob_lock_memtable.h +++ b/src/storage/tablelock/ob_lock_memtable.h @@ -118,11 +118,8 @@ public: bool is_active_memtable() const override; // =========== INHERITED FROM ObCommonCheckPoint ========== - virtual int64_t get_rec_log_ts() override; virtual palf::SCN get_rec_scn(); - virtual int flush(int64_t recycle_log_ts, bool need_freeze = true) override; - virtual int flush(const palf::SCN &recycle_scn, - const bool need_freeze = true); + virtual int flush(palf::SCN recycle_scn, bool need_freeze = true); virtual ObTabletID get_tablet_id() const; diff --git a/src/storage/tx/ob_id_service.cpp b/src/storage/tx/ob_id_service.cpp index e769dc66b..ee6ec152d 100644 --- a/src/storage/tx/ob_id_service.cpp +++ b/src/storage/tx/ob_id_service.cpp @@ -252,12 +252,12 @@ int ObIDService::update_ls_id_meta(const bool write_slog) return ret; } -int ObIDService::flush(int64_t rec_log_ts) +int ObIDService::flush(palf::SCN &rec_scn) { int ret = OB_SUCCESS; WLockGuard guard(rwlock_); palf::SCN latest_rec_log_ts = rec_log_ts_.atomic_get(); - if (latest_rec_log_ts.get_val_for_lsn_allocator() <= rec_log_ts) { + if (latest_rec_log_ts <= rec_scn) { latest_rec_log_ts = rec_log_ts_.atomic_get(); if (OB_FAIL(update_ls_id_meta(true))) { TRANS_LOG(WARN, "update id meta of ls meta fail", K(ret), K(service_type_)); @@ -289,11 +289,11 @@ int ObIDService::check_leader(bool &leader) return ret; } -int64_t ObIDService::get_rec_log_ts() +palf::SCN ObIDService::get_rec_scn() { const palf::SCN rec_log_ts = rec_log_ts_.atomic_get(); - TRANS_LOG(INFO, "get rec log ts", K(service_type_), K(rec_log_ts)); - return rec_log_ts.get_val_for_lsn_allocator(); + TRANS_LOG(INFO, "get rec log scn", K(service_type_), K(rec_log_ts)); + return rec_log_ts; } int ObIDService::switch_to_follower_gracefully() diff --git a/src/storage/tx/ob_id_service.h b/src/storage/tx/ob_id_service.h index df710972f..07970335b 100644 --- a/src/storage/tx/ob_id_service.h +++ b/src/storage/tx/ob_id_service.h @@ -18,6 +18,7 @@ #include "logservice/ob_append_callback.h" #include "logservice/ob_log_base_type.h" #include "logservice/ob_log_handler.h" +#include "logservice/palf/scn.h" namespace oceanbase { @@ -112,7 +113,8 @@ public: int handle_replay_result(const int64_t last_id, const int64_t limited_id, const palf::SCN log_ts); // clog checkpoint - int flush(int64_t rec_log_ts); + int flush(palf::SCN &scn); + palf::SCN get_rec_scn(); int64_t get_rec_log_ts(); // for clog replay diff --git a/src/storage/tx/ob_keep_alive_ls_handler.h b/src/storage/tx/ob_keep_alive_ls_handler.h index 248afcc7e..0f727b3d5 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.h +++ b/src/storage/tx/ob_keep_alive_ls_handler.h @@ -114,8 +114,8 @@ public: int switch_to_leader() { ATOMIC_STORE(&is_master_,true); return OB_SUCCESS;} int switch_to_follower_gracefully() { ATOMIC_STORE(&is_master_,false); return OB_SUCCESS;} int resume_leader() { ATOMIC_STORE(&is_master_,true);return OB_SUCCESS; } - int64_t get_rec_log_ts() { return INT64_MAX; } - int flush(int64_t rec_log_ts) { return OB_SUCCESS;} + palf::SCN get_rec_scn() { return palf::SCN::max_scn(); } + int flush(palf::SCN &rec_scn) { return OB_SUCCESS;} private: bool check_gts_(); diff --git a/src/storage/tx/ob_tx_retain_ctx_mgr.cpp b/src/storage/tx/ob_tx_retain_ctx_mgr.cpp index 09b1f475c..8476358b9 100644 --- a/src/storage/tx/ob_tx_retain_ctx_mgr.cpp +++ b/src/storage/tx/ob_tx_retain_ctx_mgr.cpp @@ -42,7 +42,6 @@ int ObAdvanceLSCkptTask::try_advance_ls_ckpt_ts() int ret = OB_SUCCESS; storage::ObLSHandle ls_handle; - int64_t target_ckpt_ts = 0; if (OB_ISNULL(MTL(ObLSService *)) || OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, storage::ObLSGetMod::TRANS_MOD)) @@ -52,10 +51,8 @@ int ObAdvanceLSCkptTask::try_advance_ls_ckpt_ts() } TRANS_LOG(WARN, "get ls faild", K(ret), K(MTL(ObLSService *))); } else if (ls_handle.get_ls()->get_checkpoint_executor()->advance_checkpoint_by_flush( - target_ckpt_ts)) { - TRANS_LOG(WARN, "advance checkpoint ts failed", K(ret), K(ls_id_), K(target_ckpt_ts)); - } else if (OB_FAIL(target_ckpt_ts_.convert_for_lsn_allocator(target_ckpt_ts))) { - TRANS_LOG(WARN, "convert for lsn fail", K(target_ckpt_ts)); + target_ckpt_ts_)) { + TRANS_LOG(WARN, "advance checkpoint ts failed", K(ret), K(ls_id_), K(target_ckpt_ts_)); } if (OB_SUCC(ret)) { diff --git a/src/storage/tx_storage/ob_checkpoint_service.cpp b/src/storage/tx_storage/ob_checkpoint_service.cpp index c2c4b5d55..1dc3fc121 100644 --- a/src/storage/tx_storage/ob_checkpoint_service.cpp +++ b/src/storage/tx_storage/ob_checkpoint_service.cpp @@ -106,10 +106,10 @@ void ObCheckPointService::wait() int ObCheckPointService::add_ls_freeze_task( ObDataCheckpoint *data_checkpoint, - int64_t rec_log_ts) + palf::SCN rec_scn) { int ret = OB_SUCCESS; - if (OB_FAIL(freeze_thread_.add_task(data_checkpoint, rec_log_ts))) { + if (OB_FAIL(freeze_thread_.add_task(data_checkpoint, rec_scn))) { STORAGE_LOG(WARN, "logstream freeze task failed", K(ret)); } return ret; @@ -358,7 +358,7 @@ int ObCheckPointService::do_minor_freeze() ObCheckpointExecutor *checkpoint_executor = nullptr; if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) { STORAGE_LOG(WARN, "checkpoint_executor should not be null", K(ls->get_ls_id())); - } else if (OB_SUCCESS != (tmp_ret = (checkpoint_executor->advance_checkpoint_by_flush(INT64_MAX)))) { + } else if (OB_SUCCESS != (tmp_ret = (checkpoint_executor->advance_checkpoint_by_flush(palf::SCN::max_scn())))) { STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", K(tmp_ret), K(ls->get_ls_id())); } } diff --git a/src/storage/tx_storage/ob_checkpoint_service.h b/src/storage/tx_storage/ob_checkpoint_service.h index 406f1979d..3cb377bff 100644 --- a/src/storage/tx_storage/ob_checkpoint_service.h +++ b/src/storage/tx_storage/ob_checkpoint_service.h @@ -15,6 +15,7 @@ #include "storage/tx_storage/ob_ls_freeze_thread.h" #include "lib/lock/ob_spin_lock.h" #include "lib/task/ob_timer.h" +#include "logservice/palf/scn.h" namespace oceanbase { @@ -46,11 +47,11 @@ public: void destroy(); // add ls checkpoint freeze task // @param [in] data_checkpoint, the data_checkpoint of this task. - // @param [in] rec_log_ts, freeze all the memtable whose rec_log_ts is + // @param [in] rec_scn, freeze all the memtable whose rec_scn is // smaller than this one. int add_ls_freeze_task( ObDataCheckpoint *data_checkpoint, - int64_t rec_log_ts); + palf::SCN rec_scn); int do_minor_freeze(); diff --git a/src/storage/tx_storage/ob_ls_freeze_thread.cpp b/src/storage/tx_storage/ob_ls_freeze_thread.cpp index fe5f0df9b..b08d0348c 100644 --- a/src/storage/tx_storage/ob_ls_freeze_thread.cpp +++ b/src/storage/tx_storage/ob_ls_freeze_thread.cpp @@ -27,10 +27,10 @@ using namespace checkpoint; void ObLSFreezeTask::set_task(ObLSFreezeThread *host, ObDataCheckpoint *data_checkpoint, - int64_t rec_log_ts) + palf::SCN rec_scn) { host_ = host; - rec_log_ts_ = rec_log_ts; + rec_scn_ = rec_scn; data_checkpoint_ = data_checkpoint; } @@ -38,7 +38,7 @@ void ObLSFreezeTask::handle() { int ret = OB_SUCCESS; if (OB_NOT_NULL(data_checkpoint_)) { - data_checkpoint_->road_to_flush(rec_log_ts_); + data_checkpoint_->road_to_flush(rec_scn_); } if (OB_NOT_NULL(host_)) { if (OB_FAIL(host_->push_back_(this))) { @@ -109,7 +109,7 @@ int ObLSFreezeThread::init(int tg_id) } int ObLSFreezeThread::add_task(ObDataCheckpoint *data_checkpoint, - int64_t rec_log_ts) + palf::SCN rec_scn) { int ret = OB_SUCCESS; ObLSFreezeTask *task = NULL; @@ -124,7 +124,7 @@ int ObLSFreezeThread::add_task(ObDataCheckpoint *data_checkpoint, } } if (OB_SUCC(ret)) { - task->set_task(this, data_checkpoint, rec_log_ts); + task->set_task(this, data_checkpoint, rec_scn); if (OB_FAIL(TG_PUSH_TASK(tg_id_, task))) { STORAGE_LOG(WARN, "schedule timer task failed", K(ret)); } diff --git a/src/storage/tx_storage/ob_ls_freeze_thread.h b/src/storage/tx_storage/ob_ls_freeze_thread.h index 09cb38ad0..6297a867c 100644 --- a/src/storage/tx_storage/ob_ls_freeze_thread.h +++ b/src/storage/tx_storage/ob_ls_freeze_thread.h @@ -15,6 +15,7 @@ #include "lib/lock/ob_spin_lock.h" #include "lib/thread/thread_mgr_interface.h" +#include "logservice/palf/scn.h" namespace oceanbase { @@ -34,12 +35,12 @@ class ObLSFreezeTask public: void set_task(ObLSFreezeThread *host, checkpoint::ObDataCheckpoint *data_checkpoint, - int64_t rec_log_ts); + palf::SCN rec_scn); void handle(); private: - int64_t rec_log_ts_; + palf::SCN rec_scn_; ObLSFreezeThread *host_; checkpoint::ObDataCheckpoint *data_checkpoint_; }; @@ -59,7 +60,7 @@ public: int init(int tg_id); void destroy(); - int add_task(checkpoint::ObDataCheckpoint *data_checkpoint, int64_t rec_log_ts); + int add_task(checkpoint::ObDataCheckpoint *data_checkpoint, palf::SCN rec_scn); void handle(void *task); int get_tg_id() { return tg_id_; } diff --git a/src/storage/tx_storage/ob_ls_service.cpp b/src/storage/tx_storage/ob_ls_service.cpp index 4a9cdf5fd..a0d53a4f2 100644 --- a/src/storage/tx_storage/ob_ls_service.cpp +++ b/src/storage/tx_storage/ob_ls_service.cpp @@ -734,7 +734,7 @@ int ObLSService::restore_update_ls_(const ObLSMetaPackage &meta_package) } else if (OB_ISNULL(ls = ls_handle.get_ls())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls is null", K(meta_package)); - } else if (OB_FAIL(ls->set_clog_checkpoint(ls_meta.get_clog_base_lsn(), ls_meta.get_clog_checkpoint_ts()))) { + } else if (OB_FAIL(ls->set_clog_checkpoint(ls_meta.get_clog_base_lsn(), ls_meta.get_clog_checkpoint_scn()))) { LOG_WARN("failed to set clog checkpoint", K(meta_package)); } else if (OB_FAIL(ls->advance_base_info(meta_package.palf_meta_, is_rebuild))) { LOG_WARN("failed to advance base lsn", K(meta_package)); diff --git a/src/storage/tx_table/ob_tx_ctx_memtable.cpp b/src/storage/tx_table/ob_tx_ctx_memtable.cpp index e15614162..845b5b550 100644 --- a/src/storage/tx_table/ob_tx_ctx_memtable.cpp +++ b/src/storage/tx_table/ob_tx_ctx_memtable.cpp @@ -219,19 +219,6 @@ transaction::ObLSTxCtxMgr *ObTxCtxMemtable::get_ls_tx_ctx_mgr() return ls_ctx_mgr_guard_.get_ls_tx_ctx_mgr(); } -// TODO: handle exception -int64_t ObTxCtxMemtable::get_rec_log_ts() -{ - int ret = OB_SUCCESS; - - // TODO: cxf remove this - palf::SCN tmp; - int64_t tmp_rec_log_ts = INT64_MAX; - tmp = get_rec_scn(); - tmp_rec_log_ts = tmp == palf::SCN::max_scn() ? INT64_MAX : tmp.get_val_for_lsn_allocator(); - return tmp_rec_log_ts; -} - palf::SCN ObTxCtxMemtable::get_rec_scn() { int ret = OB_SUCCESS; @@ -272,15 +259,15 @@ bool ObTxCtxMemtable::is_active_memtable() const return !ATOMIC_LOAD(&is_frozen_); } -int ObTxCtxMemtable::flush(int64_t recycle_log_ts, bool need_freeze) +int ObTxCtxMemtable::flush(palf::SCN recycle_scn, bool need_freeze) { int ret = OB_SUCCESS; ObSpinLockGuard guard(flush_lock_); if (need_freeze) { - int64_t rec_log_ts = get_rec_log_ts(); - if (rec_log_ts >= recycle_log_ts) { - TRANS_LOG(INFO, "no need to freeze", K(rec_log_ts), K(recycle_log_ts)); + palf::SCN rec_scn = get_rec_scn(); + if (rec_scn >= recycle_scn) { + TRANS_LOG(INFO, "no need to freeze", K(rec_scn), K(recycle_scn)); } else if (is_active_memtable()) { int64_t cur_ts = common::ObClockGenerator::getClock(); ObScnRange scn_range; diff --git a/src/storage/tx_table/ob_tx_ctx_memtable.h b/src/storage/tx_table/ob_tx_ctx_memtable.h index b1a31cf3d..a91478093 100644 --- a/src/storage/tx_table/ob_tx_ctx_memtable.h +++ b/src/storage/tx_table/ob_tx_ctx_memtable.h @@ -66,9 +66,8 @@ public: virtual int64_t get_occupied_size() const override { return 0; } // ================ INHERITED FROM ObCommonCheckpoint =============== - virtual int64_t get_rec_log_ts() override; virtual palf::SCN get_rec_scn(); - virtual int flush(int64_t recycle_log_ts, bool need_freeze = true) override; + virtual int flush(palf::SCN recycle_scn, bool need_freeze = true); virtual ObTabletID get_tablet_id() const override; diff --git a/src/storage/tx_table/ob_tx_data_memtable_mgr.cpp b/src/storage/tx_table/ob_tx_data_memtable_mgr.cpp index 45962a064..b3c592c54 100644 --- a/src/storage/tx_table/ob_tx_data_memtable_mgr.cpp +++ b/src/storage/tx_table/ob_tx_data_memtable_mgr.cpp @@ -369,11 +369,6 @@ int ObTxDataMemtableMgr::get_all_memtables_for_write(ObTxDataMemtableWriteGuard return ret; } -int64_t ObTxDataMemtableMgr::get_rec_log_ts() -{ - return get_rec_scn().get_val_for_lsn_allocator(); -} - palf::SCN ObTxDataMemtableMgr::get_rec_scn() { int ret = OB_SUCCESS; @@ -420,7 +415,6 @@ int ObTxDataMemtableMgr::flush_all_frozen_memtables_(ObTableHdlArray &memtable_h return ret; } - int ObTxDataMemtableMgr::flush(palf::SCN recycle_scn, bool need_freeze) { int ret = OB_SUCCESS; @@ -459,17 +453,6 @@ int ObTxDataMemtableMgr::flush(palf::SCN recycle_scn, bool need_freeze) return ret; } -int ObTxDataMemtableMgr::flush(int64_t recycle_log_ts, bool need_freeze) -{ - palf::SCN recycle_scn; - if (INT64_MAX == recycle_log_ts) { - recycle_scn.set_max(); - } else { - recycle_scn.convert_for_lsn_allocator(recycle_log_ts); - } - return flush(recycle_scn, need_freeze); -} - ObTabletID ObTxDataMemtableMgr::get_tablet_id() const { return LS_TX_DATA_TABLET; diff --git a/src/storage/tx_table/ob_tx_data_memtable_mgr.h b/src/storage/tx_table/ob_tx_data_memtable_mgr.h index dada17212..faf4f1ae9 100644 --- a/src/storage/tx_table/ob_tx_data_memtable_mgr.h +++ b/src/storage/tx_table/ob_tx_data_memtable_mgr.h @@ -94,10 +94,8 @@ public: // ObTxDataMemtableMgr int get_memtable_range(int64_t &memtable_head, int64_t &memtable_tail); // ================ INHERITED FROM ObCommonCheckpoint =============== - virtual int64_t get_rec_log_ts() override; virtual palf::SCN get_rec_scn() override; - virtual int flush(int64_t recycle_log_ts, bool need_freeze = true) override; virtual int flush(palf::SCN recycle_scn, bool need_freeze = true) override; virtual ObTabletID get_tablet_id() const override; diff --git a/src/storage/tx_table/ob_tx_data_table.cpp b/src/storage/tx_table/ob_tx_data_table.cpp index 2aed42b5e..e1704bb0c 100644 --- a/src/storage/tx_table/ob_tx_data_table.cpp +++ b/src/storage/tx_table/ob_tx_data_table.cpp @@ -783,7 +783,7 @@ int ObTxDataTable::self_freeze_task() STORAGE_LOG(INFO, "start tx data table self freeze task", K(get_ls_id())); - if (OB_FAIL(memtable_mgr_->flush(INT64_MAX, true))) { + if (OB_FAIL(memtable_mgr_->flush(palf::SCN::max_scn(), true))) { share::ObLSID ls_id = get_ls_id(); STORAGE_LOG(WARN, "self freeze of tx data memtable failed.", KR(ret), K(ls_id), KPC(memtable_mgr_)); }