[MDS] add remove mds node interface
This commit is contained in:
parent
788c2a3489
commit
569304f966
@ -79,7 +79,7 @@ struct ApplyOnLSOp {
|
||||
if (table_->judege_in_ranges(ls.get_ls_id(), table_->ls_ranges_)) {
|
||||
(void) table_->get_tablet_info_(ls, apply_on_tablet_op_);
|
||||
} else {
|
||||
MDS_LOG(TRACE, "not in ranges", K(ret), K(*table_));
|
||||
MDS_LOG(TRACE, "not in ranges", K(ls.get_ls_id()), K(ret), K(*table_));
|
||||
}
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
@ -97,7 +97,7 @@ struct ApplyOnTenantOp {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
MDS_LOG(TRACE, "not in ranges", K(ret), K(*table_));
|
||||
MDS_LOG(TRACE, "not in ranges", K(MTL_ID()), K(ret), K(*table_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -81,6 +81,7 @@ struct MdsEvent {
|
||||
friend class ObMdsEventBuffer;
|
||||
MdsEvent()
|
||||
: timestamp_(0),
|
||||
obj_ptr_(nullptr),
|
||||
event_(nullptr),
|
||||
info_str_(),
|
||||
unit_id_(UINT8_MAX),
|
||||
@ -110,9 +111,9 @@ struct MdsEvent {
|
||||
if (!rhs.is_valid_()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG(WARN, "invalid argument", KR(ret), K(rhs));
|
||||
} else if (OB_FAIL(set_(alloc, rhs.timestamp_, rhs.event_, rhs.info_str_, rhs.unit_id_, rhs.writer_type_,
|
||||
rhs.writer_id_, rhs.seq_no_, rhs.redo_scn_, rhs.end_scn_, rhs.trans_version_,
|
||||
rhs.node_type_, rhs.state_, rhs.key_str_))) {
|
||||
} else if (OB_FAIL(set_(alloc, rhs.timestamp_, rhs.obj_ptr_, rhs.event_, rhs.info_str_, rhs.unit_id_,
|
||||
rhs.writer_type_, rhs.writer_id_, rhs.seq_no_, rhs.redo_scn_, rhs.end_scn_,
|
||||
rhs.trans_version_, rhs.node_type_, rhs.state_, rhs.key_str_))) {
|
||||
// don't report 4013, cause alloc use ring buffer, 4013 is expected
|
||||
} else {
|
||||
tid_ = rhs.tid_;
|
||||
@ -121,12 +122,14 @@ struct MdsEvent {
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
TO_STRING_KV(KP_(alloc), KTIME_(timestamp), K_(event), K_(info_str), K_(unit_id), K_(key_str), K_(writer_type), \
|
||||
K_(writer_id), K_(seq_no), K_(redo_scn), K_(end_scn), K_(trans_version), K_(node_type), K_(state));
|
||||
TO_STRING_KV(KP_(alloc), KTIME_(timestamp), K_(obj_ptr), K_(event), K_(info_str), K_(unit_id), K_(key_str), \
|
||||
K_(writer_type), K_(writer_id), K_(seq_no), K_(redo_scn), K_(end_scn), K_(trans_version), K_(node_type),\
|
||||
K_(state));
|
||||
private:
|
||||
bool is_valid_() const { return OB_NOT_NULL(event_); }
|
||||
int set_(ObIAllocator &alloc,
|
||||
int64_t timestamp,
|
||||
void *obj_ptr,
|
||||
const char *event_str,
|
||||
const ObString &info_str,
|
||||
uint8_t unit_id,
|
||||
@ -139,8 +142,8 @@ private:
|
||||
storage::mds::MdsNodeType node_type,
|
||||
storage::mds::TwoPhaseCommitState state,
|
||||
const ObString &key_str) {
|
||||
#define PRINT_WRAPPER K(ret), K(event_str), K(unit_id), K(writer_type), K(writer_id), K(seq_no), K(redo_scn),\
|
||||
K(trans_version), K(node_type), K(state), K(key_str)
|
||||
#define PRINT_WRAPPER K(ret), K(obj_ptr), K(event_str), K(unit_id), K(writer_type), K(writer_id), K(seq_no), \
|
||||
K(redo_scn), K(trans_version), K(node_type), K(state), K(key_str)
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_valid_()) {
|
||||
ret = OB_INIT_TWICE;
|
||||
@ -175,6 +178,7 @@ private:
|
||||
}
|
||||
} else {
|
||||
timestamp_ = timestamp;
|
||||
obj_ptr_ = obj_ptr;
|
||||
event_ = event_str;
|
||||
unit_id_ = unit_id;
|
||||
writer_type_ = writer_type;
|
||||
@ -209,6 +213,7 @@ private:
|
||||
char tname_[16] = {0};
|
||||
int64_t timestamp_;
|
||||
// need fill
|
||||
void *obj_ptr_;
|
||||
const char *event_;
|
||||
ObString info_str_;
|
||||
uint8_t unit_id_;
|
||||
@ -254,7 +259,12 @@ struct ObMdsEventBuffer {
|
||||
void destroy() {
|
||||
mds_event_cache_.~ObVtableEventRecycleBuffer();
|
||||
}
|
||||
void append(const MdsEventKey &key, const MdsEvent &event, const storage::mds::MdsTableBase *mds_table, const char *file, const uint32_t line, const char *func) {
|
||||
void append(const MdsEventKey &key,
|
||||
const MdsEvent &event,
|
||||
const storage::mds::MdsTableBase *mds_table,
|
||||
const char *file,
|
||||
const uint32_t line,
|
||||
const char *func) {
|
||||
if (OB_NOT_NULL(file) && OB_UNLIKELY(line != 0) && OB_NOT_NULL(func) && OB_NOT_NULL(event.event_)) {
|
||||
share::ObTaskController::get().allow_next_syslog();
|
||||
::oceanbase::common::OB_PRINT("[MDS.EVENT]", OB_LOG_LEVEL_INFO, file, line, func, OB_LOG_LOCATION_HASH_VAL, OB_SUCCESS,
|
||||
|
@ -46,12 +46,13 @@ ObTabletPointer::ObTabletPointer()
|
||||
ddl_info_(),
|
||||
initial_state_(true),
|
||||
ddl_kv_mgr_lock_(),
|
||||
mds_lock_(),
|
||||
mds_table_handler_(),
|
||||
old_version_chain_(nullptr),
|
||||
attr_()
|
||||
{
|
||||
#if defined(__x86_64__) && !defined(ENABLE_OBJ_LEAK_CHECK)
|
||||
static_assert(sizeof(ObTabletPointer) == 344, "The size of ObTabletPointer will affect the meta memory manager, and the necessity of adding new fields needs to be considered.");
|
||||
static_assert(sizeof(ObTabletPointer) == 352, "The size of ObTabletPointer will affect the meta memory manager, and the necessity of adding new fields needs to be considered.");
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -486,7 +487,17 @@ int ObTabletPointer::get_mds_table(const ObTabletID &tablet_id,
|
||||
bool not_exist_create)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!ls_handle_.is_valid()) {
|
||||
ObLSID ls_id = ls_handle_.get_ls()->get_ls_id();
|
||||
share::SCN mds_ckpt_scn = share::SCN::min_scn();
|
||||
if (not_exist_create) {
|
||||
ScanAllVersionTabletsOp::GetMaxMdsCkptScnOp op(mds_ckpt_scn);
|
||||
if (OB_UNLIKELY(phy_addr_.is_none())) {// first time create, without phy addr, use min scn to init mds table
|
||||
} else if (OB_FAIL(MTL(ObTenantMetaMemMgr *)->scan_all_version_tablets({ls_id, tablet_id}, op))) {
|
||||
LOG_WARN("failed to get mds_ckpt_scn", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (!ls_handle_.is_valid()) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_ERROR("invalid ls_handle_, maybe not init yet", K(ret));
|
||||
} else if (!tablet_id.is_valid()) {
|
||||
@ -495,6 +506,7 @@ int ObTabletPointer::get_mds_table(const ObTabletID &tablet_id,
|
||||
} else if (OB_FAIL(mds_table_handler_.get_mds_table_handle(handle,
|
||||
tablet_id,
|
||||
ls_handle_.get_ls()->get_ls_id(),
|
||||
mds_ckpt_scn,
|
||||
not_exist_create,
|
||||
this))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
@ -549,7 +561,24 @@ int ObTabletPointer::release_memtable_and_mds_table_for_ls_offline(const ObTable
|
||||
} else {
|
||||
LOG_WARN("failed to get mds table", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(mds_table.forcely_reset_mds_table("OFFLINE"))) {
|
||||
} else if (OB_FAIL(mds_table.forcely_remove_nodes("OFFLINE", share::SCN::max_scn()))) {
|
||||
LOG_WARN("fail to release mds nodes in mds table", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletPointer::release_mds_nodes_redo_scn_below(const ObTabletID &tablet_id, const share::SCN &mds_ckpt_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
mds::MdsTableHandle mds_table;
|
||||
if (OB_FAIL(get_mds_table(tablet_id, mds_table, false/*not_exist_create*/))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to get mds table", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(mds_table.forcely_remove_nodes("REMOVE", mds_ckpt_scn))) {
|
||||
LOG_WARN("fail to release mds nodes in mds table", K(ret));
|
||||
}
|
||||
|
||||
@ -609,15 +638,20 @@ int ObTabletPointer::remove_tablet_from_old_version_chain(ObTablet *tablet)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletPointer::get_min_mds_ckpt_scn(share::SCN &scn)
|
||||
int ObTabletPointer::scan_all_tablets_on_chain(const ObFunction<int(ObTablet &)> &op)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
scn.set_max();
|
||||
ObTablet *cur = old_version_chain_;
|
||||
while (OB_SUCC(ret) && OB_NOT_NULL(cur)) {
|
||||
scn = MIN(scn, cur->get_tablet_meta().mds_checkpoint_scn_);
|
||||
cur = cur->get_next_tablet();
|
||||
|
||||
if (!op.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "receive an invalid operator", K(ret), K(obj_));
|
||||
} else {
|
||||
while (OB_SUCC(ret) && OB_NOT_NULL(cur)) {
|
||||
if (OB_FAIL(op(*cur))) {
|
||||
STORAGE_LOG(WARN, "failed to apply op on old version tablet", K(ret), K(obj_));
|
||||
}
|
||||
cur = cur->get_next_tablet();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -708,7 +742,31 @@ int ObITabletFilterOp::operator()(const ObTabletResidentInfo &info, bool &is_ski
|
||||
return ret;
|
||||
}
|
||||
|
||||
ScanAllVersionTabletsOp::GetMinMdsCkptScnOp::GetMinMdsCkptScnOp(share::SCN &min_mds_ckpt_scn)
|
||||
:min_mds_ckpt_scn_(min_mds_ckpt_scn)
|
||||
{
|
||||
min_mds_ckpt_scn_.set_max();
|
||||
}
|
||||
|
||||
int ScanAllVersionTabletsOp::GetMinMdsCkptScnOp::operator()(ObTablet &tablet)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
min_mds_ckpt_scn_ = MIN(min_mds_ckpt_scn_, tablet.get_mds_checkpoint_scn());
|
||||
return ret;
|
||||
}
|
||||
|
||||
ScanAllVersionTabletsOp::GetMaxMdsCkptScnOp::GetMaxMdsCkptScnOp(share::SCN &max_mds_ckpt_scn)
|
||||
:max_mds_ckpt_scn_(max_mds_ckpt_scn)
|
||||
{
|
||||
max_mds_ckpt_scn_.set_min();
|
||||
}
|
||||
|
||||
int ScanAllVersionTabletsOp::GetMaxMdsCkptScnOp::operator()(ObTablet &tablet)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
max_mds_ckpt_scn_ = MAX(max_mds_ckpt_scn_, tablet.get_mds_checkpoint_scn());
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
#include "lib/lock/ob_mutex.h"
|
||||
#include "storage/meta_mem/ob_meta_obj_struct.h"
|
||||
#include "storage/multi_data_source/runtime_utility/mds_lock.h"
|
||||
#include "storage/tablet/ob_tablet_ddl_info.h"
|
||||
#include "storage/tx_storage/ob_ls_handle.h"
|
||||
#include "storage/multi_data_source/mds_table_handler.h"
|
||||
@ -85,12 +86,45 @@ public:
|
||||
int64_t backup_bytes_;
|
||||
};
|
||||
|
||||
enum class LockMode { SHARE, EXCLUSIVE };
|
||||
template <LockMode mode>
|
||||
struct TabletMdsLockGuard
|
||||
{
|
||||
TabletMdsLockGuard() : lock_(nullptr) {}
|
||||
TabletMdsLockGuard(mds::MdsLock &lock) : lock_(&lock) {
|
||||
if (mode == LockMode::SHARE) {
|
||||
lock_->rdlock();
|
||||
} else if (mode == LockMode::EXCLUSIVE) {
|
||||
lock_->wrlock();
|
||||
}
|
||||
}
|
||||
~TabletMdsLockGuard() {
|
||||
if (lock_) {
|
||||
if (mode == LockMode::SHARE) {
|
||||
lock_->rdunlock();
|
||||
} else if (mode == LockMode::EXCLUSIVE) {
|
||||
lock_->wrunlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
mds::MdsLock *lock_;
|
||||
};
|
||||
|
||||
class ObTabletPointer final
|
||||
{
|
||||
friend class ObTablet;
|
||||
friend class ObLSTabletService;
|
||||
friend class ObTenantMetaMemMgr;
|
||||
friend class ObTabletResidentInfo;
|
||||
public:
|
||||
template <LockMode MODE>
|
||||
void get_mds_truncate_lock_guard(TabletMdsLockGuard<MODE> &lock_guard) const {
|
||||
if (OB_ISNULL(lock_guard.lock_)) {
|
||||
new (&lock_guard) TabletMdsLockGuard<MODE>(mds_lock_);
|
||||
} else {
|
||||
ob_abort();// just for defence
|
||||
}
|
||||
}
|
||||
public:
|
||||
ObTabletPointer();
|
||||
ObTabletPointer(const ObLSHandle &ls_handle,
|
||||
@ -149,13 +183,19 @@ public:
|
||||
int try_release_mds_nodes_below(const share::SCN &scn);
|
||||
int try_gc_mds_table();
|
||||
int release_memtable_and_mds_table_for_ls_offline(const ObTabletID &tablet_id);
|
||||
int get_min_mds_ckpt_scn(share::SCN &scn);
|
||||
// NOTICE1: input arg mds_ckpt_scn must be very carefully picked,
|
||||
// this scn should be calculated by mds table when flush,
|
||||
// and dumped to mds sstable, recorded on tablet
|
||||
// ohterwise mds data will lost
|
||||
// NOTICE2: this call must be protected by TabletMdsLockGuard<LockMode::EXCLUSIVE>
|
||||
int release_mds_nodes_redo_scn_below(const ObTabletID &tablet_id, const share::SCN &mds_ckpt_scn);
|
||||
ObLS *get_ls() const;
|
||||
// the RW operations of tablet_attr are protected by lock guard of tablet_map_
|
||||
int set_tablet_attr(const ObTabletAttr &attr);
|
||||
bool is_old_version_chain_empty() const { return OB_ISNULL(old_version_chain_); }
|
||||
bool is_attr_valid() const { return attr_.is_valid(); }
|
||||
private:
|
||||
int scan_all_tablets_on_chain(const ObFunction<int(ObTablet &)> &op);// must be called under t3m bucket lock's protection
|
||||
int wash_obj();
|
||||
int add_tablet_to_old_version_chain(ObTablet *tablet);
|
||||
int remove_tablet_from_old_version_chain(ObTablet *tablet);
|
||||
@ -168,10 +208,11 @@ private:
|
||||
ObTabletDDLInfo ddl_info_; // 32B
|
||||
bool initial_state_; // 1B
|
||||
ObByteLock ddl_kv_mgr_lock_; // 1B
|
||||
mutable mds::MdsLock mds_lock_;// 12B
|
||||
mds::ObMdsTableHandler mds_table_handler_;// 48B
|
||||
ObTablet *old_version_chain_; // 8B
|
||||
ObTabletAttr attr_; // 32B // protected by rw lock of tablet_map_
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTabletPointer); // 320B
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTabletPointer); // 352B
|
||||
};
|
||||
|
||||
struct ObTabletResidentInfo final
|
||||
@ -224,6 +265,22 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObITabletFilterOp);
|
||||
};
|
||||
|
||||
struct ScanAllVersionTabletsOp
|
||||
{
|
||||
struct GetMinMdsCkptScnOp
|
||||
{
|
||||
explicit GetMinMdsCkptScnOp(share::SCN &min_mds_ckpt_scn);
|
||||
int operator()(ObTablet &);
|
||||
share::SCN &min_mds_ckpt_scn_;
|
||||
};
|
||||
struct GetMaxMdsCkptScnOp
|
||||
{
|
||||
explicit GetMaxMdsCkptScnOp(share::SCN &max_mds_ckpt_scn);
|
||||
int operator()(ObTablet &);
|
||||
share::SCN &max_mds_ckpt_scn_;
|
||||
};
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
|
||||
|
@ -998,18 +998,20 @@ int ObTenantMetaMemMgr::get_min_end_scn_from_single_tablet(ObTablet *tablet,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantMetaMemMgr::get_min_mds_ckpt_scn(const ObTabletMapKey &key, share::SCN &scn)
|
||||
int ObTenantMetaMemMgr::scan_all_version_tablets(const ObTabletMapKey &key, const ObFunction<int(ObTablet &)> &op)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SCN min_scn_from_old_tablets = SCN::max_scn();
|
||||
SCN min_scn_from_cur_tablet = SCN::max_scn();
|
||||
ObArenaAllocator allocator(common::ObMemAttr(MTL_ID(), "GetMinMdsScn"));
|
||||
ObTablet *cur_tablet = nullptr;
|
||||
ObArenaAllocator allocator(common::ObMemAttr(MTL_ID(), "ScanTablets"));
|
||||
ObTabletPointerHandle ptr_handle(tablet_map_);
|
||||
ObTabletHandle handle;
|
||||
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTenantMetaMemMgr hasn't been initialized", K(ret));
|
||||
} else if (!op.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("operator is invalid", K(ret));
|
||||
} else if (OB_FAIL(get_tablet_with_allocator(WashTabletPriority::WTP_LOW, key, allocator, handle))) {
|
||||
LOG_WARN("fail to get latest tablet", K(ret), K(key));
|
||||
} else if (OB_FAIL(tablet_map_.get(key, ptr_handle))) {
|
||||
@ -1019,18 +1021,16 @@ int ObTenantMetaMemMgr::get_min_mds_ckpt_scn(const ObTabletMapKey &key, share::S
|
||||
LOG_WARN("unexpected invalid handle", K(ret), K(handle), K(ptr_handle));
|
||||
} else {
|
||||
// since cur_tablet may be added into old_chain, we must get it at first
|
||||
min_scn_from_cur_tablet = handle.get_obj()->get_tablet_meta().mds_checkpoint_scn_;
|
||||
cur_tablet = handle.get_obj();
|
||||
ObTabletPointer *tablet_ptr = static_cast<ObTabletPointer*>(ptr_handle.get_resource_ptr());
|
||||
ObBucketHashRLockGuard lock_guard(bucket_lock_, key.hash()); // lock old_version_chain
|
||||
if (OB_ISNULL(tablet_ptr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tablet ptr is NULL", K(ret), K(ptr_handle));
|
||||
} else if (OB_FAIL(tablet_ptr->get_min_mds_ckpt_scn(min_scn_from_old_tablets))) {
|
||||
LOG_WARN("fail to get min mds ckpt scn from old tablets", K(ret), K(key));
|
||||
} else {
|
||||
scn = MIN(min_scn_from_cur_tablet, min_scn_from_old_tablets);
|
||||
LOG_TRACE("get min mds ckpt scn", K(ret), K(key),
|
||||
K(scn), K(min_scn_from_cur_tablet), K(min_scn_from_old_tablets));
|
||||
} else if (OB_FAIL(tablet_ptr->scan_all_tablets_on_chain(op))) {
|
||||
LOG_WARN("fail to apply op on old chain tablets", K(ret), K(key));
|
||||
} else if (OB_FAIL(op(*cur_tablet))) {
|
||||
LOG_WARN("fail to apply op on current tablet", K(ret), K(key));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -198,7 +198,7 @@ public:
|
||||
const share::SCN &ls_checkpoint,
|
||||
share::SCN &min_end_scn_from_latest,
|
||||
share::SCN &min_end_scn_from_old);
|
||||
int get_min_mds_ckpt_scn(const ObTabletMapKey &key, share::SCN &scn);
|
||||
int scan_all_version_tablets(const ObTabletMapKey &key, const ObFunction<int(ObTablet &)> &op);
|
||||
|
||||
// garbage collector for sstable and memtable.
|
||||
int push_table_into_gc_queue(ObITable *table, const ObITable::TableType table_type);
|
||||
|
@ -72,11 +72,6 @@ int MdsCtx::set_writer(const MdsWriter &writer)
|
||||
} else {
|
||||
writer_.writer_type_ = writer.writer_type_;
|
||||
writer_.writer_id_ = writer.writer_id_;
|
||||
if (WriterType::TRANSACTION != writer_.writer_type_) {
|
||||
if (OB_FAIL(inc_seq_no())) {
|
||||
MDS_LOG(WARN, "fail to inc seq no", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -231,6 +231,12 @@ int MdsRow<K, V>::construct_insert_record_user_mds_node_(MdsRowBase<K, V> *mds_r
|
||||
MDS_TG(5_ms);
|
||||
bool need_rollback_node = false;
|
||||
UserMdsNode<K, V> *new_node = nullptr;
|
||||
share::SCN last_inner_recycled_scn = share::SCN::min_scn();
|
||||
if (OB_NOT_NULL(mds_row) &&
|
||||
OB_NOT_NULL(mds_row->p_mds_unit_) &&
|
||||
OB_NOT_NULL(mds_row->p_mds_unit_->p_mds_table_)) {
|
||||
last_inner_recycled_scn = mds_row->p_mds_unit_->p_mds_table_->last_inner_recycled_scn_;
|
||||
}
|
||||
if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_0_0) {
|
||||
if (!scn.is_max() && !ctx.get_seq_no().is_valid()) {
|
||||
ctx.set_seq_no(transaction::ObTxSEQ::MIN_VAL());
|
||||
@ -249,6 +255,8 @@ int MdsRow<K, V>::construct_insert_record_user_mds_node_(MdsRowBase<K, V> *mds_r
|
||||
#endif
|
||||
// if this is an insert action, check is same scn mds node exists
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (!scn.is_max() && last_inner_recycled_scn >= scn) {
|
||||
MDS_LOG_SET(INFO, "scn is not max, and below last_inner_recycled_scn, this replay action should be filtered");
|
||||
} else if (scn != share::SCN::max_scn() && MDS_FAIL(sorted_list_.for_each_node(CheckNodeExistOp(scn)))) {
|
||||
MDS_LOG_SET(WARN, "scn is not max, this is an insert action, but node with same scn exists");
|
||||
} else if (MDS_FAIL_FLAG(MdsFactory::create(new_node,
|
||||
@ -269,14 +277,17 @@ int MdsRow<K, V>::construct_insert_record_user_mds_node_(MdsRowBase<K, V> *mds_r
|
||||
if (!scn.is_max()) {// replay
|
||||
new_node->on_redo_(scn);
|
||||
}
|
||||
sorted_list_.insert(new_node);
|
||||
new_node->mds_ctx_ = &ctx;
|
||||
CLICK();
|
||||
ctx.record_written_node(new_node);
|
||||
if (scn == share::SCN::max_scn()) {
|
||||
report_event_("WRITE_NODE", *new_node);
|
||||
if (MDS_FAIL(sorted_list_.insert(new_node))) {
|
||||
MDS_LOG_SET(WARN, "failed to insert new node to sorted_list");
|
||||
} else {
|
||||
report_event_("REPLAY_NODE", *new_node);
|
||||
new_node->mds_ctx_ = &ctx;
|
||||
CLICK();
|
||||
ctx.record_written_node(new_node);
|
||||
if (scn == share::SCN::max_scn()) {
|
||||
report_event_("WRITE_NODE", *new_node);
|
||||
} else {
|
||||
report_event_("REPLAY_NODE", *new_node);
|
||||
}
|
||||
}
|
||||
}
|
||||
// rollback logic
|
||||
@ -619,6 +630,14 @@ struct DumpNodeOP {
|
||||
MdsAllocator::get_instance()))) {
|
||||
MDS_LOG_SCAN(WARN, "failt to convert user mds node to dump node", K(node));
|
||||
/**********************this is different logic from normal logic***************************************************/
|
||||
} else if (specified_node.user_data_.data_type_ == ObTabletMdsUserDataType::START_TRANSFER_OUT_PREPARE) {
|
||||
if (specified_node.seq_no_.is_min()) {
|
||||
dump_kv_.v_.seq_no_ = transaction::ObTxSEQ::mk_v0(47);
|
||||
dump_kv_.v_.crc_check_number_ = dump_kv_.v_.generate_hash();
|
||||
MDS_LOG(INFO, "convert start transfer out prepare tablet status mds node's seq no from min to 47 for compat issue", K(node), K(dump_kv_));
|
||||
} else {
|
||||
MDS_LOG(TRACE, "no need convert start transfer out prepare tablet status mds node's seq no cause it is valid", K(node), K(dump_kv_));
|
||||
}
|
||||
} else if (specified_node.user_data_.data_type_ == ObTabletMdsUserDataType::START_TRANSFER_OUT) {
|
||||
if (specified_node.seq_no_.is_min()) {
|
||||
dump_kv_.v_.seq_no_ = transaction::ObTxSEQ::mk_v0(100);
|
||||
|
@ -50,6 +50,7 @@ int MdsTableBase::advance_state_to(State new_state) const
|
||||
|
||||
int MdsTableBase::init(const ObTabletID tablet_id,
|
||||
const share::ObLSID ls_id,
|
||||
const share::SCN mds_ckpt_scn_from_tablet,// this is used to filter replayed nodes after removed action
|
||||
ObTabletPointer *pointer,
|
||||
ObMdsTableMgr *p_mgr)
|
||||
{
|
||||
@ -63,6 +64,7 @@ int MdsTableBase::init(const ObTabletID tablet_id,
|
||||
} else {
|
||||
tablet_id_ = tablet_id;
|
||||
ls_id_ = ls_id;
|
||||
last_inner_recycled_scn_ = mds_ckpt_scn_from_tablet;
|
||||
if (OB_NOT_NULL(p_mgr)) {
|
||||
mgr_handle_.set_mds_table_mgr(p_mgr);
|
||||
debug_info_.do_init_tablet_pointer_ = pointer;
|
||||
|
@ -113,6 +113,7 @@ public:
|
||||
}
|
||||
int init(const ObTabletID tablet_id,
|
||||
const share::ObLSID ls_id,
|
||||
const share::SCN mds_ckpt_scn_from_tablet,// this is used to filter replayed nodes after removed action
|
||||
ObTabletPointer *pointer,
|
||||
ObMdsTableMgr *p_mgr);
|
||||
virtual int set(int64_t unit_id,
|
||||
@ -172,7 +173,7 @@ public:
|
||||
virtual ObTabletID get_tablet_id() const;
|
||||
virtual bool is_flushing() const;
|
||||
virtual int fill_virtual_info(ObIArray<MdsNodeInfoForVirtualTable> &mds_node_info_array) const = 0;
|
||||
virtual int forcely_reset_mds_table(const char *reason) = 0;
|
||||
virtual int forcely_remove_nodes(const char *reason, share::SCN redo_scn_limit) = 0;
|
||||
void mark_removed_from_t3m(ObTabletPointer *pointer);// need called in del tablet phase
|
||||
void mark_switched_to_empty_shell();
|
||||
bool is_switched_to_empty_shell() const;
|
||||
@ -321,18 +322,18 @@ protected:
|
||||
: do_init_tablet_pointer_(nullptr),
|
||||
do_remove_tablet_pointer_(nullptr),
|
||||
init_ts_(0),
|
||||
last_reset_ts_(0),
|
||||
last_remove_ts_(0),
|
||||
remove_ts_(0),
|
||||
switch_to_empty_shell_ts_(0),
|
||||
last_flush_ts_(0),
|
||||
init_trace_id_(),
|
||||
remove_trace_id_() {}
|
||||
TO_STRING_KV(KP_(do_init_tablet_pointer), KP_(do_remove_tablet_pointer), KTIME_(init_ts), KTIME_(last_reset_ts),
|
||||
TO_STRING_KV(KP_(do_init_tablet_pointer), KP_(do_remove_tablet_pointer), KTIME_(init_ts), KTIME_(last_remove_ts),
|
||||
KTIME_(remove_ts), KTIME_(last_flush_ts), KTIME_(switch_to_empty_shell_ts), K_(init_trace_id), K_(remove_trace_id));
|
||||
ObTabletPointer *do_init_tablet_pointer_;// can not be accessed, just record it to debug
|
||||
ObTabletPointer *do_remove_tablet_pointer_;// can not be accessed, just record it to debug
|
||||
int64_t init_ts_;
|
||||
int64_t last_reset_ts_;
|
||||
int64_t last_remove_ts_;
|
||||
int64_t remove_ts_;
|
||||
int64_t switch_to_empty_shell_ts_;
|
||||
int64_t last_flush_ts_;
|
||||
@ -343,7 +344,7 @@ protected:
|
||||
share::ObLSID ls_id_;
|
||||
ObTabletID tablet_id_;
|
||||
share::SCN flushing_scn_;// To tell if this mds table is flushing
|
||||
share::SCN last_inner_recycled_scn_;// To filter repeated release operation
|
||||
share::SCN last_inner_recycled_scn_;// To filter repeated release operation, and filter replay operation
|
||||
share::SCN rec_scn_;// To CLOG to recycle
|
||||
int64_t total_node_cnt_;// To tell if this mds table is safety to destroy
|
||||
int64_t construct_sequence_;// To filter invalid dump DAG
|
||||
|
@ -41,6 +41,7 @@ public:
|
||||
int init(ObIAllocator &allocator,
|
||||
const ObTabletID tablet_id,
|
||||
const share::ObLSID ls_id,
|
||||
const share::SCN mds_ckpt_scn_from_tablet,// this is used to filter replayed nodes after removed action
|
||||
ObTabletPointer *pointer,
|
||||
ObMdsTableMgr *mgr_handle = nullptr);
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
@ -49,7 +50,7 @@ public:
|
||||
int mark_removed_from_t3m(ObTabletPointer *pointer) const;
|
||||
int mark_switched_to_empty_shell() const;
|
||||
template <int N>
|
||||
int forcely_reset_mds_table(const char (&reason)[N]);
|
||||
int forcely_remove_nodes(const char (&reason)[N], share::SCN redo_scn_limit);
|
||||
/******************************Single Key Unit Access Interface**********************************/
|
||||
template <typename T>
|
||||
int set(T &&data, MdsCtx &ctx, const int64_t lock_timeout_us = 0);
|
||||
|
@ -124,6 +124,7 @@ template <typename MdsTableType>
|
||||
int MdsTableHandle::init(ObIAllocator &allocator,
|
||||
const ObTabletID tablet_id,
|
||||
const share::ObLSID ls_id,
|
||||
const share::SCN mds_ckpt_scn_from_tablet,// this is used to filter replayed nodes after removed action
|
||||
ObTabletPointer *pointer,
|
||||
ObMdsTableMgr *p_mgr)
|
||||
{
|
||||
@ -141,7 +142,7 @@ int MdsTableHandle::init(ObIAllocator &allocator,
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(p_mds_table.construct(allocator))) {
|
||||
MDS_LOG(WARN, "construct mds table impl failed", KP(this), K(lbt()));
|
||||
} else if (OB_FAIL(p_mds_table->init(tablet_id, ls_id, pointer, p_mgr))) {
|
||||
} else if (OB_FAIL(p_mds_table->init(tablet_id, ls_id, mds_ckpt_scn_from_tablet, pointer, p_mgr))) {
|
||||
MDS_LOG(WARN, "init mds table failed", KR(ret), K(mds_table_id_),
|
||||
K(typeid(MdsTableType).name()));
|
||||
} else {
|
||||
@ -875,7 +876,7 @@ inline int MdsTableHandle::mark_switched_to_empty_shell() const
|
||||
}
|
||||
|
||||
template <int N>
|
||||
inline int MdsTableHandle::forcely_reset_mds_table(const char (&reason)[N])
|
||||
inline int MdsTableHandle::forcely_remove_nodes(const char (&reason)[N], share::SCN redo_scn_limit)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
CHECK_MDS_TABLE_INIT();
|
||||
@ -883,7 +884,7 @@ inline int MdsTableHandle::forcely_reset_mds_table(const char (&reason)[N])
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(WARN, "p_mds_table_base_ is invalid", K(*this));
|
||||
} else {
|
||||
p_mds_table_base_->forcely_reset_mds_table(reason);
|
||||
p_mds_table_base_->forcely_remove_nodes(reason, redo_scn_limit);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ ObMdsTableHandler::~ObMdsTableHandler() { ATOMIC_STORE(&is_written_, false); }
|
||||
int ObMdsTableHandler::get_mds_table_handle(mds::MdsTableHandle &handle,
|
||||
const ObTabletID &tablet_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const share::SCN mds_ckpt_scn_from_tablet,// this is used to filter replayed nodes after removed action
|
||||
const bool not_exist_create,
|
||||
ObTabletPointer *pointer)
|
||||
{
|
||||
@ -89,6 +90,7 @@ int ObMdsTableHandler::get_mds_table_handle(mds::MdsTableHandle &handle,
|
||||
if (MDS_FAIL(mds_table_handle_.init<mds::NormalMdsTable>(mds::MdsAllocator::get_instance(),
|
||||
tablet_id,
|
||||
ls_id,
|
||||
mds_ckpt_scn_from_tablet,
|
||||
pointer,
|
||||
mds_table_mgr_handle_.get_mds_table_mgr()))) {
|
||||
MDS_LOG_INIT(WARN, "fail to init mds table");
|
||||
|
@ -36,6 +36,7 @@ public:
|
||||
int get_mds_table_handle(mds::MdsTableHandle &handle,
|
||||
const ObTabletID &tablet_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const share::SCN mds_ckpt_scn_from_tablet,// this is used to filter replayed nodes after removed action
|
||||
const bool not_exist_create,
|
||||
ObTabletPointer *pointer);
|
||||
int try_gc_mds_table();
|
||||
|
@ -141,8 +141,9 @@ public:
|
||||
const ScanRowOrder scan_row_order,
|
||||
const ScanNodeOrder scan_node_order) const override;
|
||||
virtual int operate(const ObFunction<int(MdsTableBase &)> &operation) override;
|
||||
int calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN &flush_scn, int64_t &need_dumped_nodes_cnt);
|
||||
virtual int flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn) override;
|
||||
int calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN &flush_scn, int64_t &need_dumped_nodes_cnt);
|
||||
void calculate_rec_scn_and_advance_to_it_(share::SCN on_flush_scn);
|
||||
void on_flush_(const share::SCN &flush_scn, const int flush_ret);
|
||||
virtual void on_flush(const share::SCN &flush_scn, const int flush_ret) override;
|
||||
virtual int try_recycle(const share::SCN recycle_scn) override;
|
||||
@ -150,7 +151,7 @@ public:
|
||||
ForEachUnitFillVirtualInfoHelper helper(mds_node_info_array);
|
||||
return unit_tuple_.for_each(helper);
|
||||
}
|
||||
virtual int forcely_reset_mds_table(const char *reason) override;
|
||||
virtual int forcely_remove_nodes(const char *reason, share::SCN redo_scn_limit) override;
|
||||
/*****************************Single Key Unit Access Interface***********************************/
|
||||
template <typename T>
|
||||
int set(T &&data,
|
||||
|
@ -932,10 +932,16 @@ int MdsTableImpl<MdsTableType>::calculate_flush_scn_and_need_dumped_nodes_cnt_(s
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(do_flush_scn), K(need_dumped_nodes_cnt)
|
||||
MDS_TG(100_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
share::SCN last_calculated_do_flush_scn = do_flush_scn;
|
||||
RecalculateFlushScnCauseOnlySuppportDumpCommittedNodeOP op1(do_flush_scn);// recalculate flush scn
|
||||
do {
|
||||
last_calculated_do_flush_scn = do_flush_scn;
|
||||
if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCUALTE_FLUSH_SCN, op1))) {
|
||||
MDS_LOG_FLUSH(WARN, "for each to calculate flush scn failed");
|
||||
}
|
||||
} while (OB_SUCC(ret) && last_calculated_do_flush_scn != do_flush_scn);// flush scn must not cross any node's [redo_scn, end_scn)
|
||||
CountUnDumpdedNodesBelowDoFlushScn op2(need_dumped_nodes_cnt, do_flush_scn);// count nodes need dump
|
||||
if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCUALTE_FLUSH_SCN, op1))) {
|
||||
MDS_LOG_FLUSH(WARN, "for each to calculate flush scn failed");
|
||||
if (MDS_FAIL(ret)) {
|
||||
} else if (MDS_FAIL(for_each_scan_row(FowEachRowAction::COUNT_NODES_BEFLOW_FLUSH_SCN, op2))) {
|
||||
MDS_LOG_FLUSH(WARN, "for each to count undumped nodes failed");
|
||||
}
|
||||
@ -1063,6 +1069,27 @@ void MdsTableImpl<MdsTableType>::on_flush(const share::SCN &flush_scn, const int
|
||||
on_flush_(flush_scn, flush_ret);
|
||||
}
|
||||
template <typename MdsTableType>
|
||||
void MdsTableImpl<MdsTableType>::calculate_rec_scn_and_advance_to_it_(share::SCN on_flush_scn)
|
||||
{
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(on_flush_scn)
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(10_ms);
|
||||
bool need_retry = false;
|
||||
do {
|
||||
need_retry = false;
|
||||
CalculateRecScnOp op(on_flush_scn);
|
||||
if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCULATE_REC_SCN, op))) {// lock all rows failed, retry until lock all rows success
|
||||
need_retry = true;
|
||||
MDS_LOG_FLUSH(WARN, "fail to do on flush");// record row lock guard may failed, cause lock guard array may meet extended failed cause memory not enough, but retry will make it success
|
||||
} else {
|
||||
share::SCN new_rec_scn = op.rec_scn_;
|
||||
try_advance_rec_scn(new_rec_scn);
|
||||
report_on_flush_event_("ON_FLUSH", on_flush_scn);
|
||||
}
|
||||
} while (need_retry && !FALSE_IT(PAUSE()));// here will release all rows lock
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
template <typename MdsTableType>
|
||||
void MdsTableImpl<MdsTableType>::on_flush_(const share::SCN &flush_scn, const int flush_ret)
|
||||
{
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(flush_scn)
|
||||
@ -1080,19 +1107,7 @@ void MdsTableImpl<MdsTableType>::on_flush_(const share::SCN &flush_scn, const in
|
||||
}
|
||||
} else {
|
||||
flushing_scn_.reset();
|
||||
bool need_retry = false;
|
||||
do {
|
||||
need_retry = false;
|
||||
CalculateRecScnOp op(flush_scn);
|
||||
if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCULATE_REC_SCN, op))) {// lock all rows failed, retry until lock all rows success
|
||||
need_retry = true;
|
||||
MDS_LOG_FLUSH(WARN, "fail to do on flush");// record row lock guard may failed, cause lock guard array may meet extended failed cause memory not enough, but retry will make it success
|
||||
} else {
|
||||
share::SCN new_rec_scn = op.rec_scn_;
|
||||
try_advance_rec_scn(new_rec_scn);
|
||||
report_on_flush_event_("ON_FLUSH", flush_scn);
|
||||
}
|
||||
} while (need_retry && !FALSE_IT(PAUSE()));// here will release all rows lock
|
||||
(void) calculate_rec_scn_and_advance_to_it_(flush_scn);
|
||||
}
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
@ -1433,9 +1448,11 @@ int MdsTableImpl<MdsTableType>::try_recycle(const share::SCN recycle_scn)
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
struct ForcelyReleaseAllNodeOp
|
||||
struct ForcelyReleaseNodesRedoScnBelowOp
|
||||
{
|
||||
ForcelyReleaseAllNodeOp(const char *reason) : reason_(reason) {}
|
||||
ForcelyReleaseNodesRedoScnBelowOp(const char *reason, share::SCN redo_below_scn)
|
||||
: reason_(reason),
|
||||
redo_below_scn_(redo_below_scn) {}
|
||||
template <typename K, typename V>
|
||||
int operator()(const MdsRow<K, V> &row) {
|
||||
#define PRINT_WRAPPER K(cast_node), K_(reason)
|
||||
@ -1447,11 +1464,18 @@ struct ForcelyReleaseAllNodeOp
|
||||
row.sorted_list_.for_each_node_from_tail_to_head_until_true(
|
||||
[this, &row](const UserMdsNode<K, V> &mds_node) {
|
||||
UserMdsNode<K, V> &cast_node = const_cast<UserMdsNode<K, V> &>(mds_node);
|
||||
if (!cast_node.is_committed_()) {
|
||||
MDS_LOG_GC(INFO, "release uncommitted node");
|
||||
if (mds_node.redo_scn_ <= redo_below_scn_) {
|
||||
// below_scn_ comes from mds sstable mds_ckpt_scn, it must not cross any node's [redo, end) scn range
|
||||
// before support dump uncommitted nodes, all nodes be scanned must satisfy this rule
|
||||
MDS_ASSERT(mds_node.end_scn_ <= redo_below_scn_);
|
||||
if (!cast_node.is_committed_()) {
|
||||
MDS_LOG_GC(INFO, "release uncommitted node");
|
||||
}
|
||||
const_cast<MdsRow<K, V> &>(row).sorted_list_.del((ListNodeBase*)(ListNode<UserMdsNode<K, V>>*)(&cast_node));
|
||||
MdsFactory::destroy(&cast_node);
|
||||
} else if (redo_below_scn_.is_max()) {// just for defence
|
||||
ob_abort();
|
||||
}
|
||||
const_cast<MdsRow<K, V> &>(row).sorted_list_.del((ListNodeBase*)(ListNode<UserMdsNode<K, V>>*)(&cast_node));
|
||||
MdsFactory::destroy(&cast_node);
|
||||
return false;
|
||||
}
|
||||
);
|
||||
@ -1459,24 +1483,28 @@ struct ForcelyReleaseAllNodeOp
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
const char *reason_;
|
||||
share::SCN redo_below_scn_;
|
||||
};
|
||||
template <typename MdsTableType>
|
||||
int MdsTableImpl<MdsTableType>::forcely_reset_mds_table(const char *reason)
|
||||
int MdsTableImpl<MdsTableType>::forcely_remove_nodes(const char *reason, share::SCN redo_scn_limit)
|
||||
{
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(reason)
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(reason), K(redo_scn_limit)
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(100_ms);
|
||||
MdsWLockGuard lg(lock_);
|
||||
ForcelyReleaseAllNodeOp op(reason);
|
||||
if (OB_FAIL(for_each_scan_row(FowEachRowAction::RESET, op))) {
|
||||
MDS_LOG_GC(ERROR, "fail to do reset");
|
||||
ForcelyReleaseNodesRedoScnBelowOp op(reason, redo_scn_limit);
|
||||
if (OB_FAIL(for_each_scan_row(FowEachRowAction::REMOVE, op))) {
|
||||
MDS_LOG_GC(ERROR, "fail to do remove");
|
||||
} else {
|
||||
debug_info_.last_reset_ts_ = ObClockGenerator::getClock();
|
||||
debug_info_.last_remove_ts_ = ObClockGenerator::getClock();
|
||||
flushing_scn_.reset();
|
||||
last_inner_recycled_scn_ = share::SCN::min_scn();
|
||||
rec_scn_ = share::SCN::max_scn();
|
||||
ATOMIC_STORE(&total_node_cnt_, 0);
|
||||
MDS_LOG_GC(INFO, "forcely release all mds nodes");
|
||||
calculate_rec_scn_and_advance_to_it_(redo_scn_limit);
|
||||
if (redo_scn_limit.is_max()) {// reset operation
|
||||
last_inner_recycled_scn_ = share::SCN::min_scn();
|
||||
} else {// remove operation
|
||||
last_inner_recycled_scn_ = std::max(last_inner_recycled_scn_, redo_scn_limit);
|
||||
}
|
||||
MDS_LOG_GC(INFO, "forcely release mds nodes below");
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
|
@ -202,7 +202,7 @@ int MdsUnit<K, V>::for_each_row(FowEachRowAction action_type, OP &&op)// node ma
|
||||
// (to resolve replay out of order problem, if repaly concurrent happened with calculate rec_scn, without lock's protection, will finally get a wrong rec_scn)
|
||||
// but destroy mds_row will add row's lock inner destruction, which will resulting deadlock in same thread.
|
||||
// so only operations logic behaves like gc should recycle empty row.
|
||||
if (FowEachRowAction::RECYCLE == action_type || FowEachRowAction::RESET == action_type) {
|
||||
if (FowEachRowAction::RECYCLE == action_type || FowEachRowAction::REMOVE == action_type) {
|
||||
MDS_LOG_SCAN(DEBUG, "scan row to recycle or reset", K(action_type), KPC(p_k));
|
||||
erase_kv_from_list_if_empty_(&const_cast<KvPair<K, Row<K, V>> &>(kv_row));
|
||||
}
|
||||
@ -251,10 +251,17 @@ int MdsUnit<K, V>::insert_empty_kv_to_list_(const K &key, KvPair<K, Row<K, V>> *
|
||||
MDS_LOG_SET(WARN, "MdsUnit create kv pair failed");
|
||||
} else if (FALSE_IT(p_kv->v_.p_mds_unit_ = this)) {
|
||||
} else if (MDS_FAIL(common::meta::copy_or_assign(key, p_kv->k_))) {
|
||||
MDS_LOG_SET(ERROR, "copy user key failed");
|
||||
MDS_LOG_SET(WARN, "copy user key failed");
|
||||
} else if (MDS_FAIL(multi_row_list_.insert(p_kv))) {
|
||||
MDS_LOG_SET(WARN, "insert new kv row to multi_row_list_ failed");
|
||||
} else {
|
||||
p_kv->v_.key_ = &(p_kv->k_);
|
||||
multi_row_list_.insert(p_kv);
|
||||
}
|
||||
if (MDS_FAIL(ret)) {
|
||||
if (OB_NOT_NULL(p_kv)) {
|
||||
MdsFactory::destroy(p_kv);
|
||||
p_kv = nullptr;
|
||||
}
|
||||
}
|
||||
reset_mds_mem_check_thread_local_info();
|
||||
return ret;
|
||||
|
@ -65,7 +65,7 @@ enum class FowEachRowAction {
|
||||
COUNT_NODES_BEFLOW_FLUSH_SCN,
|
||||
CALCULATE_REC_SCN,
|
||||
RECYCLE,
|
||||
RESET,
|
||||
REMOVE,
|
||||
};
|
||||
|
||||
enum class ScanRowOrder {
|
||||
|
@ -347,25 +347,29 @@ int ObTenantMdsTimer::process_with_tablet_(ObTablet &tablet)
|
||||
|
||||
int ObTenantMdsTimer::get_tablet_oldest_scn_(ObTablet &tablet, share::SCN &oldest_scn)
|
||||
{
|
||||
#define PRINT_WRAPPER KR(ret), K(ls_id), K(tablet_id), K(oldest_scn), KPC(this)
|
||||
#define PRINT_WRAPPER KR(ret), K(ls_id), K(tablet_id), K(oldest_scn), K(op.min_mds_ckpt_scn_), KPC(this)
|
||||
int ret = OB_SUCCESS;
|
||||
const share::ObLSID &ls_id = tablet.get_ls_id();
|
||||
const common::ObTabletID &tablet_id = tablet.get_tablet_id();
|
||||
MDS_TG(5_ms);
|
||||
oldest_scn = SCN::min_scn();// means can not recycle any node
|
||||
ScanAllVersionTabletsOp::GetMinMdsCkptScnOp op(oldest_scn);
|
||||
if (OB_ISNULL(MTL(ObTenantMetaMemMgr*))) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG_GC(ERROR, "MTL ObTenantMetaMemMgr is NULL");
|
||||
} else if (MDS_FAIL(MTL(ObTenantMetaMemMgr*)->get_min_mds_ckpt_scn(ObTabletMapKey(ls_id, tablet_id),
|
||||
oldest_scn))) {
|
||||
} else if (MDS_FAIL(MTL(ObTenantMetaMemMgr*)->scan_all_version_tablets(ObTabletMapKey(ls_id, tablet_id), op))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
MDS_LOG_GC(WARN, "get_min_mds_ckpt_scn meet OB_ENTRY_NOT_EXIST");
|
||||
} else if (OB_ITEM_NOT_SETTED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
MDS_LOG_GC(WARN, "get_min_mds_ckpt_scn meet OB_ITEM_NOT_SETTED");
|
||||
} else {
|
||||
MDS_LOG_GC(WARN, "fail to get oldest tablet min_mds_ckpt_scn");
|
||||
}
|
||||
oldest_scn = SCN::min_scn();// means can not recycle any node
|
||||
} else if (oldest_scn.is_max() || !oldest_scn.is_valid()) {
|
||||
oldest_scn = SCN::min_scn();// means can not recycle any node
|
||||
MDS_LOG_GC(WARN, "get min_mds_ckpt_scn, but is invalid");
|
||||
oldest_scn.set_min();
|
||||
}
|
||||
MDS_LOG_GC(DEBUG, "get tablet oldest scn");
|
||||
return ret;
|
||||
|
@ -83,19 +83,28 @@ protected:// implemented by ObTablet
|
||||
virtual int get_mds_table_handle_(mds::MdsTableHandle &handle,
|
||||
const bool create_if_not_exist) const = 0;
|
||||
virtual ObTabletPointer *get_tablet_pointer_() const = 0;
|
||||
template <typename K, typename T>
|
||||
template <typename K, typename V>
|
||||
int read_data_from_tablet_cache(const K &key,
|
||||
const common::ObFunction<int(const V&)> &read_op,
|
||||
bool &applied_success) const;
|
||||
template <typename K, typename V>
|
||||
int read_data_from_mds_sstable(common::ObIAllocator &allocator,
|
||||
const K &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const V&)> &read_op) const;
|
||||
template <typename K, typename V>
|
||||
int read_data_from_cache_or_mds_sstable(common::ObIAllocator &allocator,
|
||||
const K &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const V&)> &read_op) const;
|
||||
template <typename K, typename V>
|
||||
int get_mds_data_from_tablet(
|
||||
const K &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const T&)> &read_op) const;
|
||||
template <typename K, typename T>
|
||||
int read_data_from_mds_sstable(
|
||||
common::ObIAllocator &allocator,
|
||||
const K &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const T&)> &read_op) const;
|
||||
const K &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const V&)> &read_op) const;
|
||||
int read_raw_data(
|
||||
common::ObIAllocator &allocator,
|
||||
const uint8_t mds_unit_id,
|
||||
|
@ -26,9 +26,6 @@ inline int ObITabletMdsInterface::check_tablet_status_written(bool &written)
|
||||
if (OB_UNLIKELY(!check_is_inited_())) {
|
||||
ret = OB_NOT_INIT;
|
||||
MDS_LOG(WARN, "not inited", K(ret), KPC(this));
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG(ERROR, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else {
|
||||
written = get_tablet_pointer_()->is_tablet_status_written();
|
||||
}
|
||||
@ -37,13 +34,67 @@ inline int ObITabletMdsInterface::check_tablet_status_written(bool &written)
|
||||
|
||||
/**********************************IMPLEMENTATION WITH TEMPLATE************************************/
|
||||
|
||||
template <typename K, typename V>
|
||||
int ObITabletMdsInterface::read_data_from_tablet_cache(const K &key,
|
||||
const common::ObFunction<int(const V&)> &read_op,
|
||||
bool &applied_success) const
|
||||
{
|
||||
static_assert(!std::is_same<typename std::decay<V>::type, ObTabletCreateDeleteMdsUserData>::value,
|
||||
"tablet status should read from real cache");
|
||||
int ret = OB_SUCCESS;
|
||||
applied_success = false;
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <>
|
||||
inline int ObITabletMdsInterface::read_data_from_tablet_cache
|
||||
<mds::DummyKey, ObTabletCreateDeleteMdsUserData>(const mds::DummyKey &key,
|
||||
const common::ObFunction<int(const ObTabletCreateDeleteMdsUserData&)> &read_op,
|
||||
bool &applied_success) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObTabletCreateDeleteMdsUserData &last_persisted_committed_tablet_status = get_tablet_meta_().last_persisted_committed_tablet_status_;
|
||||
if (last_persisted_committed_tablet_status.is_valid() && ObTabletStatus::NONE != last_persisted_committed_tablet_status.get_tablet_status()) {
|
||||
applied_success = true;
|
||||
if (OB_FAIL(read_op(last_persisted_committed_tablet_status))) {
|
||||
MDS_LOG(WARN, "failed to do read op", KR(ret), K(last_persisted_committed_tablet_status));
|
||||
}
|
||||
} else {
|
||||
applied_success = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
int ObITabletMdsInterface::read_data_from_cache_or_mds_sstable(common::ObIAllocator &allocator,
|
||||
const K &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const V&)> &read_op) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool applied_success = false;
|
||||
if (OB_SUCCESS != (ret = read_data_from_tablet_cache<K, V>(key, read_op, applied_success))) {
|
||||
MDS_LOG(WARN, "failed to read mds data from cache", KR(ret), K(key), K(snapshot), K(timeout_us));
|
||||
} else if (!applied_success) {
|
||||
if (OB_FAIL(read_data_from_mds_sstable(allocator, key, snapshot, timeout_us, read_op))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SNAPSHOT_DISCARDED;
|
||||
MDS_LOG(DEBUG, "read nothing from mds sstable", K(ret));
|
||||
} else {
|
||||
MDS_LOG(WARN, "fail to read data from mds sstable", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename K, typename T>
|
||||
int ObITabletMdsInterface::read_data_from_mds_sstable(
|
||||
common::ObIAllocator &allocator,
|
||||
const K &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const T&)> &read_op) const
|
||||
int ObITabletMdsInterface::read_data_from_mds_sstable(common::ObIAllocator &allocator,
|
||||
const K &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const T&)> &read_op) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -89,71 +140,12 @@ int ObITabletMdsInterface::read_data_from_mds_sstable(
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <>
|
||||
inline int ObITabletMdsInterface::get_mds_data_from_tablet<mds::DummyKey, ObTabletCreateDeleteMdsUserData>(
|
||||
const mds::DummyKey &key,
|
||||
template <typename K, typename V>
|
||||
inline int ObITabletMdsInterface::get_mds_data_from_tablet(
|
||||
const K &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const ObTabletCreateDeleteMdsUserData&)> &read_op) const
|
||||
{
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
ObArenaAllocator allocator(ObMemAttr(MTL_ID(), "mds_reader", ObCtxIds::DEFAULT_CTX_ID));
|
||||
const ObTabletCreateDeleteMdsUserData &last_persisted_committed_tablet_status = get_tablet_meta_().last_persisted_committed_tablet_status_;
|
||||
MDS_LOG(DEBUG, "print last persisted tablet status", K(last_persisted_committed_tablet_status));
|
||||
if (last_persisted_committed_tablet_status.is_valid() &&
|
||||
ObTabletStatus::NONE != last_persisted_committed_tablet_status.get_tablet_status()) {
|
||||
if (CLICK_FAIL(read_op(last_persisted_committed_tablet_status))) {
|
||||
MDS_LOG(WARN, "failed to do read op", K(ret), K(last_persisted_committed_tablet_status));
|
||||
}
|
||||
} else {
|
||||
if (CLICK_FAIL((read_data_from_mds_sstable<mds::DummyKey, ObTabletCreateDeleteMdsUserData>(
|
||||
allocator, key, snapshot, timeout_us, read_op)))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SNAPSHOT_DISCARDED;
|
||||
MDS_LOG(DEBUG, "read nothing from mds sstable", K(ret));
|
||||
} else {
|
||||
MDS_LOG(WARN, "fail to read data from mds sstable", K(ret));
|
||||
}
|
||||
} else {
|
||||
MDS_LOG(DEBUG, "succeed to read tablet status", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <>
|
||||
inline int ObITabletMdsInterface::get_mds_data_from_tablet<mds::DummyKey, ObTabletBindingMdsUserData>(
|
||||
const mds::DummyKey &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const ObTabletBindingMdsUserData&)> &read_op) const
|
||||
{
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
ObArenaAllocator allocator(ObMemAttr(MTL_ID(), "mds_reader", ObCtxIds::DEFAULT_CTX_ID));
|
||||
|
||||
if (CLICK_FAIL((read_data_from_mds_sstable<mds::DummyKey, ObTabletBindingMdsUserData>(
|
||||
allocator, key, snapshot, timeout_us, read_op)))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SNAPSHOT_DISCARDED;
|
||||
MDS_LOG(DEBUG, "read nothing from mds sstable", K(ret));
|
||||
} else {
|
||||
MDS_LOG(WARN, "fail to read data from mds sstable", K(ret), K(lbt()));
|
||||
}
|
||||
} else {
|
||||
MDS_LOG(DEBUG, "succeed to read aux tablet info", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <>
|
||||
inline int ObITabletMdsInterface::get_mds_data_from_tablet<mds::DummyKey, share::ObTabletAutoincSeq>(
|
||||
const mds::DummyKey &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const share::ObTabletAutoincSeq&)> &read_op) const
|
||||
const common::ObFunction<int(const V&)> &read_op) const
|
||||
{
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
@ -162,7 +154,7 @@ inline int ObITabletMdsInterface::get_mds_data_from_tablet<mds::DummyKey, share:
|
||||
if (OB_UNLIKELY(!snapshot.is_max())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG(WARN, "invalid args, snapshot is not max scn", K(ret), K(snapshot));
|
||||
} else if (CLICK_FAIL((read_data_from_mds_sstable<mds::DummyKey, share::ObTabletAutoincSeq>(
|
||||
} else if (CLICK_FAIL((read_data_from_cache_or_mds_sstable<K, V>(
|
||||
allocator, key, snapshot, timeout_us, read_op)))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SNAPSHOT_DISCARDED;
|
||||
@ -176,35 +168,6 @@ inline int ObITabletMdsInterface::get_mds_data_from_tablet<mds::DummyKey, share:
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <>
|
||||
inline int ObITabletMdsInterface::get_mds_data_from_tablet<compaction::ObMediumCompactionInfoKey, compaction::ObMediumCompactionInfo>(
|
||||
const compaction::ObMediumCompactionInfoKey &key,
|
||||
const share::SCN &snapshot,
|
||||
const int64_t timeout_us,
|
||||
const common::ObFunction<int(const compaction::ObMediumCompactionInfo&)> &read_op) const
|
||||
{
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
ObArenaAllocator allocator(ObMemAttr(MTL_ID(), "mds_reader", ObCtxIds::DEFAULT_CTX_ID));
|
||||
|
||||
if (OB_UNLIKELY(!snapshot.is_max())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG(WARN, "invalid args, snapshot is not max scn", K(ret), K(snapshot));
|
||||
} else if (CLICK_FAIL((read_data_from_mds_sstable<compaction::ObMediumCompactionInfoKey, compaction::ObMediumCompactionInfo>(
|
||||
allocator, key, snapshot, timeout_us, read_op)))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SNAPSHOT_DISCARDED;
|
||||
MDS_LOG(DEBUG, "read nothing from mds sstable", K(ret));
|
||||
} else {
|
||||
MDS_LOG(WARN, "fail to read data from mds sstable", K(ret));
|
||||
}
|
||||
} else {
|
||||
MDS_LOG(DEBUG, "succeed to read medium info", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename T>// general set for dummy key unit
|
||||
int ObITabletMdsInterface::set(T &&data, mds::MdsCtx &ctx, const int64_t lock_timeout_us)
|
||||
{
|
||||
@ -212,18 +175,22 @@ int ObITabletMdsInterface::set(T &&data, mds::MdsCtx &ctx, const int64_t lock_ti
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
mds::MdsTableHandle handle;
|
||||
if (MDS_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (MDS_FAIL(handle.set(std::forward<T>(data), ctx, lock_timeout_us))) {
|
||||
MDS_LOG_SET(WARN, "failed to set dummy key unit data");
|
||||
} else if (std::is_same<ObTabletCreateDeleteMdsUserData, typename std::decay<T>::type>::value) {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
if (MDS_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (MDS_FAIL(handle.set(std::forward<T>(data), ctx, lock_timeout_us))) {
|
||||
MDS_LOG_SET(WARN, "failed to set dummy key unit data");
|
||||
} else if (std::is_same<ObTabletCreateDeleteMdsUserData, typename std::decay<T>::type>::value) {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
@ -235,22 +202,26 @@ int ObITabletMdsInterface::replay(T &&data, mds::MdsCtx &ctx, const share::SCN &
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(data), K(ctx), K(scn)
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
if (scn < get_tablet_meta_().mds_checkpoint_scn_) {
|
||||
MDS_LOG_SET(TRACE, "no need do replay");
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
mds::MdsTableHandle handle;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.replay(std::forward<T>(data), ctx, scn))) {
|
||||
MDS_LOG_SET(WARN, "failed to replay dummy key unit data");
|
||||
} else if (std::is_same<ObTabletCreateDeleteMdsUserData, typename std::decay<T>::type>::value) {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
if (scn < get_tablet_meta_().mds_checkpoint_scn_) {
|
||||
MDS_LOG_SET(TRACE, "no need do replay");
|
||||
} else {
|
||||
mds::MdsTableHandle handle;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (CLICK_FAIL(handle.replay(std::forward<T>(data), ctx, scn))) {
|
||||
MDS_LOG_SET(WARN, "failed to replay dummy key unit data");
|
||||
} else if (std::is_same<ObTabletCreateDeleteMdsUserData, typename std::decay<T>::type>::value) {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -264,19 +235,23 @@ int ObITabletMdsInterface::set(const Key &key, Value &&data, mds::MdsCtx &ctx, c
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
mds::MdsTableHandle handle;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.set(key, std::forward<Value>(data), ctx, lock_timeout_us))) {
|
||||
MDS_LOG_SET(WARN, "failed to set multi key unit data");
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
MDS_LOG_SET(TRACE, "success to set multi key unit data");
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (CLICK_FAIL(handle.set(key, std::forward<Value>(data), ctx, lock_timeout_us))) {
|
||||
MDS_LOG_SET(WARN, "failed to set multi key unit data");
|
||||
} else {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
MDS_LOG_SET(TRACE, "success to set multi key unit data");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
@ -291,23 +266,27 @@ int ObITabletMdsInterface::replay(const Key &key,
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(key), K(mds), K(ctx), K(scn)
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
if (scn < get_tablet_meta_().mds_checkpoint_scn_) {
|
||||
MDS_LOG_SET(TRACE, "no need do replay");
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
mds::MdsTableHandle handle;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.replay(key, std::forward<Value>(mds), ctx, scn))) {
|
||||
MDS_LOG_SET(WARN, "failed to replay multi key unit data");
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
if (scn < get_tablet_meta_().mds_checkpoint_scn_) {
|
||||
MDS_LOG_SET(TRACE, "no need do replay");
|
||||
} else {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
MDS_LOG_SET(TRACE, "success to replay multi key unit data");
|
||||
mds::MdsTableHandle handle;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (CLICK_FAIL(handle.replay(key, std::forward<Value>(mds), ctx, scn))) {
|
||||
MDS_LOG_SET(WARN, "failed to replay multi key unit data");
|
||||
} else {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
MDS_LOG_SET(TRACE, "success to replay multi key unit data");
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -321,22 +300,23 @@ int ObITabletMdsInterface::remove(const Key &key, mds::MdsCtx &ctx, const int64_
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
mds::MdsTableHandle handle;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.remove(key, ctx, lock_timeout_us))) {
|
||||
MDS_LOG_SET(WARN, "failed to remove multi key unit data");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
MDS_LOG_SET(TRACE, "success to remove multi key unit data");
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (CLICK_FAIL(handle.remove(key, ctx, lock_timeout_us))) {
|
||||
MDS_LOG_SET(WARN, "failed to remove multi key unit data");
|
||||
} else {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
MDS_LOG_SET(TRACE, "success to remove multi key unit data");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
@ -349,19 +329,23 @@ int ObITabletMdsInterface::replay_remove(const Key &key, mds::MdsCtx &ctx, const
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
mds::MdsTableHandle handle;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (CLICK() && OB_SUCCESS != (ret = handle.replay_remove<Key, Value>(key, ctx, scn))) {
|
||||
MDS_LOG_SET(WARN, "failed to replay remove multi key unit data");
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
MDS_LOG_SET(TRACE, "success to remove multi key unit data");
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, true))) {
|
||||
MDS_LOG_SET(WARN, "failed to get_mds_table");
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_SET(WARN, "mds cannot be NULL");
|
||||
} else if (CLICK() && OB_SUCCESS != (ret = handle.replay_remove<Key, Value>(key, ctx, scn))) {
|
||||
MDS_LOG_SET(WARN, "failed to replay remove multi key unit data");
|
||||
} else {
|
||||
get_tablet_pointer_()->set_tablet_status_written();
|
||||
MDS_LOG_SET(TRACE, "success to remove multi key unit data");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
@ -374,45 +358,49 @@ int ObITabletMdsInterface::is_locked_by_others(bool &is_locked, const mds::MdsWr
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_online = false;
|
||||
do {
|
||||
mds::MdsTableHandle handle;
|
||||
ObLSSwitchChecker ls_switch_checker;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, false))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get_mds_table");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get_mds_table");
|
||||
ret = OB_SUCCESS;
|
||||
is_locked = false;
|
||||
}
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (MDS_FAIL(ls_switch_checker.check_ls_switch_state(get_tablet_pointer_()->get_ls(), is_online))) {
|
||||
MDS_LOG_GET(WARN, "check ls online state failed", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.is_locked_by_others<T>(is_locked, self))) {
|
||||
if (OB_SNAPSHOT_DISCARDED != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to check lock unit data");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to check lock unit data");
|
||||
ret = OB_SUCCESS;
|
||||
is_locked = false;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_online && MDS_FAIL(ls_switch_checker.double_check_epoch(is_online))) {
|
||||
if (!is_online) {
|
||||
ret = OB_LS_OFFLINE;
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
do {
|
||||
mds::MdsTableHandle handle;
|
||||
ObLSSwitchChecker ls_switch_checker;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, false))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get_mds_table");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get_mds_table");
|
||||
ret = OB_SUCCESS;
|
||||
is_locked = false;
|
||||
}
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "mds cannot be NULL");
|
||||
} else if (MDS_FAIL(ls_switch_checker.check_ls_switch_state(get_tablet_pointer_()->get_ls(), is_online))) {
|
||||
MDS_LOG_GET(WARN, "check ls online state failed", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.is_locked_by_others<T>(is_locked, self))) {
|
||||
if (OB_SNAPSHOT_DISCARDED != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to check lock unit data");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to check lock unit data");
|
||||
ret = OB_SUCCESS;
|
||||
is_locked = false;
|
||||
}
|
||||
MDS_LOG_GET(WARN, "failed to double check ls online");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "success to get is locked by others state");
|
||||
}
|
||||
}
|
||||
} while (ret == OB_VERSION_NOT_MATCH && is_online);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_online && MDS_FAIL(ls_switch_checker.double_check_epoch(is_online))) {
|
||||
if (!is_online) {
|
||||
ret = OB_LS_OFFLINE;
|
||||
}
|
||||
MDS_LOG_GET(WARN, "failed to double check ls online");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "success to get is locked by others state");
|
||||
}
|
||||
}
|
||||
} while (ret == OB_VERSION_NOT_MATCH && is_online);
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
@ -424,61 +412,65 @@ int ObITabletMdsInterface::get_latest(OP &&read_op, bool &is_committed) const
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_online = false;
|
||||
do {
|
||||
mds::MdsTableHandle handle;
|
||||
ObLSSwitchChecker ls_switch_checker;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, false))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get_mds_table");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get_mds_table");
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
do {
|
||||
mds::MdsTableHandle handle;
|
||||
ObLSSwitchChecker ls_switch_checker;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, false))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get_mds_table");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get_mds_table");
|
||||
}
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "mds cannot be NULL");
|
||||
} else if (MDS_FAIL(ls_switch_checker.check_ls_switch_state(get_tablet_pointer_()->get_ls(), is_online))) {
|
||||
MDS_LOG_GET(WARN, "check ls online state failed", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.get_latest<T>(read_op, is_committed))) {
|
||||
if (OB_SNAPSHOT_DISCARDED != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get mds data");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get mds data");
|
||||
}
|
||||
}
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (MDS_FAIL(ls_switch_checker.check_ls_switch_state(get_tablet_pointer_()->get_ls(), is_online))) {
|
||||
MDS_LOG_GET(WARN, "check ls online state failed", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.get_latest<T>(read_op, is_committed))) {
|
||||
if (OB_SNAPSHOT_DISCARDED != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get mds data");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get mds data");
|
||||
}
|
||||
}
|
||||
if (CLICK_FAIL(ret)) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret || OB_SNAPSHOT_DISCARDED == ret) {
|
||||
auto func = [&read_op, &is_committed](const T& data) -> int {
|
||||
is_committed = true;// FIXME: here need more judge after support dump uncommitted node
|
||||
return read_op(data);
|
||||
};
|
||||
if (CLICK_FAIL((get_mds_data_from_tablet<mds::DummyKey, T>(
|
||||
mds::DummyKey(),
|
||||
share::SCN::max_scn()/*snapshot*/,
|
||||
ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US/*timeout_us*/,
|
||||
func)))) {
|
||||
if (OB_SNAPSHOT_DISCARDED == ret) {
|
||||
ret = OB_EMPTY_RESULT;
|
||||
MDS_LOG_GET(DEBUG, "read nothing from mds sstable");
|
||||
} else {
|
||||
MDS_LOG_GET(WARN, "failed to get latest data from tablet");
|
||||
if (CLICK_FAIL(ret)) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret || OB_SNAPSHOT_DISCARDED == ret) {
|
||||
auto func = [&read_op, &is_committed](const T& data) -> int {
|
||||
is_committed = true;// FIXME: here need more judge after support dump uncommitted node
|
||||
return read_op(data);
|
||||
};
|
||||
if (CLICK_FAIL((get_mds_data_from_tablet<mds::DummyKey, T>(
|
||||
mds::DummyKey(),
|
||||
share::SCN::max_scn()/*snapshot*/,
|
||||
ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US/*timeout_us*/,
|
||||
func)))) {
|
||||
if (OB_SNAPSHOT_DISCARDED == ret) {
|
||||
ret = OB_EMPTY_RESULT;
|
||||
MDS_LOG_GET(DEBUG, "read nothing from mds sstable");
|
||||
} else {
|
||||
MDS_LOG_GET(WARN, "failed to get latest data from tablet");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_online && MDS_FAIL(ls_switch_checker.double_check_epoch(is_online))) {
|
||||
if (!is_online) {
|
||||
ret = OB_LS_OFFLINE;
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_online && MDS_FAIL(ls_switch_checker.double_check_epoch(is_online))) {
|
||||
if (!is_online) {
|
||||
ret = OB_LS_OFFLINE;
|
||||
}
|
||||
MDS_LOG_GET(WARN, "failed to double check ls online");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "success to get_latest");
|
||||
}
|
||||
MDS_LOG_GET(WARN, "failed to double check ls online");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "success to get_latest");
|
||||
}
|
||||
}
|
||||
} while (ret == OB_VERSION_NOT_MATCH && is_online);
|
||||
} while (ret == OB_VERSION_NOT_MATCH && is_online);
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
@ -501,57 +493,61 @@ int ObITabletMdsInterface::get_snapshot(const Key &key,
|
||||
MDS_TG(10_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_online = false;
|
||||
do {
|
||||
mds::MdsTableHandle handle;
|
||||
ObLSSwitchChecker ls_switch_checker;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, false))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get_mds_table");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get_mds_table");
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
do {
|
||||
mds::MdsTableHandle handle;
|
||||
ObLSSwitchChecker ls_switch_checker;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, false))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get_mds_table");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get_mds_table");
|
||||
}
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "mds cannot be NULL");
|
||||
} else if (MDS_FAIL(ls_switch_checker.check_ls_switch_state(get_tablet_pointer_()->get_ls(), is_online))) {
|
||||
MDS_LOG_GET(WARN, "check ls online state failed", K(ret), KPC(this));
|
||||
} else if (CLICK() && OB_SUCCESS != (ret = handle.get_snapshot<Key, Value>(key, read_op, snapshot, timeout_us))) {
|
||||
if (OB_SNAPSHOT_DISCARDED != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get mds data");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get mds data");
|
||||
}
|
||||
}
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (MDS_FAIL(ls_switch_checker.check_ls_switch_state(get_tablet_pointer_()->get_ls(), is_online))) {
|
||||
MDS_LOG_GET(WARN, "check ls online state failed", K(ret), KPC(this));
|
||||
} else if (CLICK() && OB_SUCCESS != (ret = handle.get_snapshot<Key, Value>(key, read_op, snapshot, timeout_us))) {
|
||||
if (OB_SNAPSHOT_DISCARDED != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get mds data");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get mds data");
|
||||
}
|
||||
}
|
||||
if (CLICK_FAIL(ret)) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret || OB_SNAPSHOT_DISCARDED == ret) {
|
||||
auto func = [&read_op](const Value& data) -> int {
|
||||
return read_op(data);
|
||||
};
|
||||
if (CLICK_FAIL((get_mds_data_from_tablet<Key, Value>(key, snapshot, timeout_us, func)))) {
|
||||
if (OB_SNAPSHOT_DISCARDED == ret) {
|
||||
// read nothing from tablet, maybe this is not an error
|
||||
ret = OB_EMPTY_RESULT;
|
||||
MDS_LOG_GET(DEBUG, "read nothing from mds sstable");
|
||||
} else {
|
||||
MDS_LOG_GET(WARN, "failed to get snapshot data from tablet");
|
||||
if (CLICK_FAIL(ret)) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret || OB_SNAPSHOT_DISCARDED == ret) {
|
||||
auto func = [&read_op](const Value& data) -> int {
|
||||
return read_op(data);
|
||||
};
|
||||
if (CLICK_FAIL((get_mds_data_from_tablet<Key, Value>(key, snapshot, timeout_us, func)))) {
|
||||
if (OB_SNAPSHOT_DISCARDED == ret) {
|
||||
// read nothing from tablet, maybe this is not an error
|
||||
ret = OB_EMPTY_RESULT;
|
||||
MDS_LOG_GET(DEBUG, "read nothing from mds sstable");
|
||||
} else {
|
||||
MDS_LOG_GET(WARN, "failed to get snapshot data from tablet");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_online && MDS_FAIL(ls_switch_checker.double_check_epoch(is_online))) {
|
||||
if (!is_online) {
|
||||
ret = OB_LS_OFFLINE;
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_online && MDS_FAIL(ls_switch_checker.double_check_epoch(is_online))) {
|
||||
if (!is_online) {
|
||||
ret = OB_LS_OFFLINE;
|
||||
}
|
||||
MDS_LOG_GET(WARN, "failed to double check ls online");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "success to get snapshot");
|
||||
}
|
||||
MDS_LOG_GET(WARN, "failed to double check ls online");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "success to get snapshot");
|
||||
}
|
||||
}
|
||||
} while (ret == OB_VERSION_NOT_MATCH && is_online);
|
||||
} while (ret == OB_VERSION_NOT_MATCH && is_online);
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
@ -582,7 +578,7 @@ int ObITabletMdsInterface::fill_virtual_info_from_mds_sstable(ObIArray<mds::MdsN
|
||||
common::ObString dummy_key;
|
||||
mds::MdsDumpKV kv;
|
||||
mds::MdsDumpKV *dump_kv = &kv;
|
||||
constexpr uint8_t mds_unit_id = mds::TupleTypeIdx<mds::NormalMdsTable, mds::MdsUnit<mds::DummyKey, ObTabletBindingMdsUserData>>::value;
|
||||
constexpr uint8_t mds_unit_id = mds::TupleTypeIdx<mds::NormalMdsTable, mds::MdsUnit<mds::DummyKey, T>>::value;
|
||||
|
||||
|
||||
if (CLICK_FAIL(mds_node_info_array.push_back(mds::MdsNodeInfoForVirtualTable()))) {
|
||||
@ -593,7 +589,7 @@ int ObITabletMdsInterface::fill_virtual_info_from_mds_sstable(ObIArray<mds::MdsN
|
||||
ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US/*timeout_us*/, kv))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
MDS_LOG(DEBUG, "read nothing from mds sstable", K(ret));
|
||||
MDS_LOG(TRACE, "read nothing from mds sstable", K(ret));
|
||||
} else {
|
||||
MDS_LOG(WARN, "fail to read raw data", K(ret));
|
||||
}
|
||||
@ -608,6 +604,7 @@ int ObITabletMdsInterface::fill_virtual_info_from_mds_sstable(ObIArray<mds::MdsN
|
||||
cur_virtual_info->tablet_id_ = get_tablet_meta_().tablet_id_;
|
||||
cur_virtual_info->position_ = mds::NodePosition::DISK;
|
||||
cur_virtual_info->unit_id_ = dump_kv->v_.mds_unit_id_;
|
||||
MDS_LOG_GET(TRACE, "success to fill virtual info");
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
mds_node_info_array.pop_back();
|
||||
@ -647,9 +644,6 @@ int ObITabletMdsInterface::fill_virtual_info_by_obj_(const T &obj,
|
||||
}
|
||||
if (CLICK_FAIL(ret)) {
|
||||
mds_node_info_array.pop_back();
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -664,24 +658,18 @@ inline int ObITabletMdsInterface::fill_virtual_info(ObIArray<mds::MdsNodeInfoFor
|
||||
|
||||
ObArenaAllocator allocator(ObMemAttr(MTL_ID(), "mds_reader", ObCtxIds::DEFAULT_CTX_ID));
|
||||
share::ObTabletAutoincSeq seq_on_tablet;
|
||||
if (CLICK_FAIL((get_mds_data_from_tablet<mds::DummyKey, share::ObTabletAutoincSeq>(
|
||||
mds::DummyKey(),
|
||||
share::SCN::max_scn()/*snapshot*/,
|
||||
ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US/*timeout_us*/,
|
||||
[&allocator, &seq_on_tablet](const share::ObTabletAutoincSeq &seq) {
|
||||
return seq_on_tablet.assign(allocator, seq);
|
||||
})))) {
|
||||
if (OB_SNAPSHOT_DISCARDED == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (CLICK_FAIL(fill_virtual_info_by_obj_(seq_on_tablet, mds::NodePosition::DISK, mds_node_info_array))) {
|
||||
MDS_LOG_GET(WARN, "fail to fill seq from disk");
|
||||
} else if (CLICK_FAIL(fill_virtual_info_by_obj_(get_tablet_meta_().last_persisted_committed_tablet_status_, mds::NodePosition::TABLET, mds_node_info_array))) {
|
||||
if (CLICK_FAIL(fill_virtual_info_by_obj_(get_tablet_meta_().last_persisted_committed_tablet_status_,
|
||||
mds::NodePosition::TABLET,
|
||||
mds_node_info_array))) {
|
||||
MDS_LOG_GET(WARN, "fail to fill tablet_status_ from cache");
|
||||
} else if (CLICK_FAIL(fill_virtual_info_from_mds_sstable<ObTabletCreateDeleteMdsUserData>(mds_node_info_array))) {
|
||||
MDS_LOG_GET(WARN, "fail to fill tablet_status_");
|
||||
} else if (CLICK_FAIL(fill_virtual_info_from_mds_sstable<share::ObTabletAutoincSeq>(mds_node_info_array))) {
|
||||
MDS_LOG_GET(WARN, "fail to fill seq from disk");
|
||||
} else if (CLICK_FAIL(fill_virtual_info_from_mds_sstable<ObTabletBindingMdsUserData>(mds_node_info_array))) {
|
||||
MDS_LOG_GET(WARN, "fail to fill aux_tablet_info_");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "sucess to fill virtual info");
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -745,69 +733,73 @@ inline int ObITabletMdsInterface::check_transfer_in_redo_written(bool &written)
|
||||
bool is_online = false;
|
||||
ObTabletCreateDeleteMdsUserData tablet_status;
|
||||
share::SCN redo_scn;
|
||||
do {
|
||||
mds::MdsTableHandle handle;
|
||||
ObLSSwitchChecker ls_switch_checker;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, false))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get_mds_table");
|
||||
TabletMdsLockGuard<LockMode::SHARE> guard;
|
||||
if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
MDS_LOG(ERROR, "pointer on tablet should not be null");
|
||||
} else {
|
||||
get_tablet_pointer_()->get_mds_truncate_lock_guard(guard);
|
||||
do {
|
||||
mds::MdsTableHandle handle;
|
||||
ObLSSwitchChecker ls_switch_checker;
|
||||
if (CLICK_FAIL(get_mds_table_handle_(handle, false))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get_mds_table");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get_mds_table");
|
||||
}
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "mds cannot be NULL");
|
||||
} else if (MDS_FAIL(ls_switch_checker.check_ls_switch_state(get_tablet_pointer_()->get_ls(), is_online))) {
|
||||
MDS_LOG_GET(WARN, "check ls online state failed", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.get_tablet_status_node(GetTabletStatusNodeFromMdsTableOp(tablet_status, redo_scn)))) {
|
||||
if (OB_SNAPSHOT_DISCARDED != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get mds data");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get mds data");
|
||||
}
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get_mds_table");
|
||||
}
|
||||
} else if (!handle.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "mds cannot be NULL");
|
||||
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_GET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||
} else if (MDS_FAIL(ls_switch_checker.check_ls_switch_state(get_tablet_pointer_()->get_ls(), is_online))) {
|
||||
MDS_LOG_GET(WARN, "check ls online state failed", K(ret), KPC(this));
|
||||
} else if (CLICK_FAIL(handle.get_tablet_status_node(GetTabletStatusNodeFromMdsTableOp(tablet_status, redo_scn)))) {
|
||||
if (OB_SNAPSHOT_DISCARDED != ret) {
|
||||
MDS_LOG_GET(WARN, "failed to get mds data");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "failed to get mds data");
|
||||
}
|
||||
} else {
|
||||
if (tablet_status.get_tablet_status() != ObTabletStatus::TRANSFER_IN) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
} else if (!redo_scn.is_valid() || redo_scn.is_max()) {
|
||||
written = false;
|
||||
MDS_LOG_GET(TRACE, "get transfer in status on mds_table, but redo scn is not valid");
|
||||
} else {
|
||||
written = true;
|
||||
MDS_LOG_GET(TRACE, "get transfer in status on mds_table, and redo scn is valid");
|
||||
}
|
||||
}
|
||||
if (CLICK_FAIL(ret)) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret || OB_SNAPSHOT_DISCARDED == ret) {
|
||||
if (CLICK_FAIL((get_mds_data_from_tablet<mds::DummyKey, ObTabletCreateDeleteMdsUserData>(
|
||||
mds::DummyKey(),
|
||||
share::SCN::max_scn()/*snapshot*/,
|
||||
ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US/*timeout_us*/,
|
||||
ReadTabletStatusOp(tablet_status))))) {
|
||||
MDS_LOG_GET(WARN, "failed to get latest data from tablet");
|
||||
} else if (tablet_status.get_tablet_status() != ObTabletStatus::TRANSFER_IN) {
|
||||
if (tablet_status.get_tablet_status() != ObTabletStatus::TRANSFER_IN) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
} else if (!redo_scn.is_valid() || redo_scn.is_max()) {
|
||||
written = false;
|
||||
MDS_LOG_GET(TRACE, "get transfer in status on mds_table, but redo scn is not valid");
|
||||
} else {
|
||||
written = true;
|
||||
MDS_LOG_GET(TRACE, "get transfer in status on mds_table, and redo scn is valid");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_online && MDS_FAIL(ls_switch_checker.double_check_epoch(is_online))) {
|
||||
if (!is_online) {
|
||||
ret = OB_LS_OFFLINE;
|
||||
if (CLICK_FAIL(ret)) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret || OB_SNAPSHOT_DISCARDED == ret) {
|
||||
if (CLICK_FAIL((get_mds_data_from_tablet<mds::DummyKey, ObTabletCreateDeleteMdsUserData>(
|
||||
mds::DummyKey(),
|
||||
share::SCN::max_scn()/*snapshot*/,
|
||||
ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US/*timeout_us*/,
|
||||
ReadTabletStatusOp(tablet_status))))) {
|
||||
MDS_LOG_GET(WARN, "failed to get latest data from tablet");
|
||||
} else if (tablet_status.get_tablet_status() != ObTabletStatus::TRANSFER_IN) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
} else {
|
||||
written = true;
|
||||
}
|
||||
}
|
||||
MDS_LOG_GET(WARN, "failed to double check ls online");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "success to check_transfer_in_redo_written");
|
||||
}
|
||||
}
|
||||
} while (ret == OB_VERSION_NOT_MATCH && is_online);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_online && MDS_FAIL(ls_switch_checker.double_check_epoch(is_online))) {
|
||||
if (!is_online) {
|
||||
ret = OB_LS_OFFLINE;
|
||||
}
|
||||
MDS_LOG_GET(WARN, "failed to double check ls online");
|
||||
} else {
|
||||
MDS_LOG_GET(TRACE, "success to check_transfer_in_redo_written");
|
||||
}
|
||||
}
|
||||
} while (ret == OB_VERSION_NOT_MATCH && is_online);
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
@ -75,6 +75,7 @@ storage_unittest(test_mds_new_ctx_deserialized multi_data_source/test_mds_new_ct
|
||||
storage_unittest(test_mds_table multi_data_source/test_mds_table.cpp)
|
||||
storage_unittest(test_mds_table_flush multi_data_source/test_mds_table_flush.cpp)
|
||||
storage_unittest(test_mds_table_flusher multi_data_source/test_mds_table_flusher.cpp)
|
||||
storage_unittest(test_mds_table_forcelly_remove multi_data_source/test_mds_table_forcelly_remove.cpp)
|
||||
storage_unittest(test_mds_dump_kv multi_data_source/test_mds_dump_kv.cpp)
|
||||
#storage_unittest(test_multiple_merge)
|
||||
#storage_unittest(test_memtable_multi_version_row_iterator memtable/test_memtable_multi_version_row_iterator.cpp)
|
||||
|
@ -167,7 +167,7 @@ void TestMdsTable::compare_binary_key() {
|
||||
}
|
||||
|
||||
void TestMdsTable::set() {
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), (ObTabletPointer*)0x111));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), share::SCN::min_scn(), (ObTabletPointer*)0x111));
|
||||
MDS_LOG(INFO, "test sizeof", K(sizeof(MdsTableImpl<UnitTestMdsTable>)), K(sizeof(B)), K(mds_table_.p_mds_table_base_.ctrl_ptr_->ref_));
|
||||
ExampleUserData1 data1(1);
|
||||
ExampleUserData2 data2;
|
||||
@ -534,6 +534,7 @@ TEST_F(TestMdsTable, basic_trans_example) {
|
||||
ASSERT_EQ(OB_SUCCESS, mth.init<UnitTestMdsTable>(mds::DefaultAllocator::get_instance(),
|
||||
ObTabletID(1),
|
||||
share::ObLSID(1),
|
||||
share::SCN::min_scn(),
|
||||
nullptr));
|
||||
MdsTableHandle mth2 = mth;// 两个引用计数
|
||||
MdsCtx ctx(mds::MdsWriter(ObTransID(123)));// 创建一个写入句柄,接入多源事务,ctx由事务层创建
|
||||
@ -557,6 +558,7 @@ TEST_F(TestMdsTable, basic_non_trans_example) {
|
||||
ASSERT_EQ(OB_SUCCESS, mth.init<UnitTestMdsTable>(mds::DefaultAllocator::get_instance(),
|
||||
ObTabletID(1),
|
||||
share::ObLSID(1),
|
||||
share::SCN::min_scn(),
|
||||
nullptr));
|
||||
MdsTableHandle mth2 = mth;// 两个引用计数
|
||||
MdsCtx ctx(MdsWriter(WriterType::AUTO_INC_SEQ, 1));// 创建一个写入句柄,不接入事务,自己写日志
|
||||
@ -595,7 +597,7 @@ TEST_F(TestMdsTable, test_recycle) {
|
||||
|
||||
TEST_F(TestMdsTable, test_recalculate_flush_scn_op) {
|
||||
MdsTableHandle mds_table;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), (ObTabletPointer*)0x111));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), share::SCN::min_scn(), (ObTabletPointer*)0x111));
|
||||
MdsCtx ctx1(mds::MdsWriter(ObTransID(1)));
|
||||
MdsCtx ctx2(mds::MdsWriter(ObTransID(2)));
|
||||
MdsCtx ctx3(mds::MdsWriter(ObTransID(3)));
|
||||
|
@ -78,12 +78,6 @@ share::SCN MOCK_MAX_CONSEQUENT_CALLBACKED_SCN;
|
||||
namespace mds
|
||||
{
|
||||
|
||||
int MdsTableBase::get_ls_max_consequent_callbacked_scn_(share::SCN &max_consequent_callbacked_scn) const
|
||||
{
|
||||
max_consequent_callbacked_scn = MOCK_MAX_CONSEQUENT_CALLBACKED_SCN;
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
int MdsTableBase::merge(const int64_t construct_sequence, const share::SCN &flushing_scn)
|
||||
{
|
||||
return OB_SUCCESS;
|
||||
@ -139,7 +133,7 @@ int construct_tested_mds_table(MdsTableHandle &handle) {
|
||||
for (int i = 0; i < 7; ++i) {
|
||||
v_ctx.push_back(new MdsCtx(MdsWriter(transaction::ObTransID(i))));
|
||||
}
|
||||
if (OB_FAIL(handle.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), (ObTabletPointer*)0x111))) {
|
||||
if (OB_FAIL(handle.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), share::SCN::min_scn(), (ObTabletPointer*)0x111))) {
|
||||
} else if (OB_FAIL(handle.set<ExampleUserData1>(1, *v_ctx[0]))) {
|
||||
} else if (FALSE_IT(v_ctx[0]->on_redo(mock_scn(50)))) {
|
||||
} else if (FALSE_IT(v_ctx[0]->on_commit(mock_scn(100), mock_scn(100)))) {
|
||||
@ -244,6 +238,48 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
ASSERT_EQ(mock_scn(500), rec_scn);// 没变化
|
||||
}
|
||||
|
||||
TEST_F(TestMdsTableFlush, flush_scn_decline1) {
|
||||
MdsTableHandle handle;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), share::SCN::min_scn(), (ObTabletPointer*)0x111));
|
||||
MdsCtx ctx1(MdsWriter(transaction::ObTransID(1)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx1);
|
||||
ctx1.on_redo(mock_scn(10));
|
||||
ctx1.on_commit(mock_scn(20), mock_scn(20));
|
||||
MdsCtx ctx2(MdsWriter(transaction::ObTransID(2)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx2);
|
||||
ctx2.on_redo(mock_scn(19));
|
||||
ctx2.on_commit(mock_scn(30), mock_scn(30));
|
||||
MdsCtx ctx3(MdsWriter(transaction::ObTransID(3)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx3);
|
||||
ctx3.on_redo(mock_scn(29));
|
||||
ctx3.on_commit(mock_scn(40), mock_scn(40));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(35)));// finally will be 9
|
||||
share::SCN rec_scn;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
ASSERT_EQ(mock_scn(10), rec_scn);
|
||||
}
|
||||
|
||||
TEST_F(TestMdsTableFlush, flush_scn_decline2) {
|
||||
MdsTableHandle handle;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), share::SCN::min_scn(), (ObTabletPointer*)0x111));
|
||||
MdsCtx ctx1(MdsWriter(transaction::ObTransID(1)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx1);
|
||||
ctx1.on_redo(mock_scn(10));
|
||||
ctx1.on_commit(mock_scn(20), mock_scn(20));
|
||||
MdsCtx ctx2(MdsWriter(transaction::ObTransID(2)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(2), ExampleUserData1(2), ctx2);// another row
|
||||
ctx2.on_redo(mock_scn(19));
|
||||
ctx2.on_commit(mock_scn(30), mock_scn(30));
|
||||
MdsCtx ctx3(MdsWriter(transaction::ObTransID(3)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx3);
|
||||
ctx3.on_redo(mock_scn(29));
|
||||
ctx3.on_commit(mock_scn(40), mock_scn(40));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(35)));// finally will be 9
|
||||
share::SCN rec_scn;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
ASSERT_EQ(mock_scn(10), rec_scn);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -169,6 +169,7 @@ TEST_F(TestMdsTableFlush, flusher_for_all_order_with_enough_memory) {
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(),
|
||||
v_key[i].tablet_id_,
|
||||
share::ObLSID(1),
|
||||
share::SCN::min_scn(),
|
||||
(ObTabletPointer*)0x111,
|
||||
&mgr));
|
||||
MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_;
|
||||
@ -203,6 +204,7 @@ TEST_F(TestMdsTableFlush, flusher_for_all_order_with_limitted_memory_reserve_fai
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(),
|
||||
v_key[i].tablet_id_,
|
||||
share::ObLSID(1),
|
||||
share::SCN::min_scn(),
|
||||
(ObTabletPointer*)0x111,
|
||||
&mgr));
|
||||
MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_;
|
||||
@ -248,6 +250,7 @@ TEST_F(TestMdsTableFlush, flusher_for_one) {
|
||||
// ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(),
|
||||
// v_key[i].tablet_id_,
|
||||
// share::ObLSID(1),
|
||||
// share::SCN::min_scn(),
|
||||
// (ObTabletPointer*)0x111,
|
||||
// &mgr));
|
||||
// MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_;
|
||||
@ -275,6 +278,7 @@ TEST_F(TestMdsTableFlush, flusher_for_one) {
|
||||
// ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(),
|
||||
// v_key[i].tablet_id_,
|
||||
// share::ObLSID(1),
|
||||
// share::SCN::min_scn(),
|
||||
// (ObTabletPointer*)0x111,
|
||||
// &mgr));
|
||||
// MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_;
|
||||
|
@ -0,0 +1,192 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#define UNITTEST_DEBUG
|
||||
static bool MDS_FLUSHER_ALLOW_ALLOC = true;
|
||||
#include <gtest/gtest.h>
|
||||
#define private public
|
||||
#define protected public
|
||||
#include "lib/utility/utility.h"
|
||||
#include "storage/multi_data_source/compile_utility/mds_dummy_key.h"
|
||||
#include "share/ob_ls_id.h"
|
||||
#include "storage/multi_data_source/mds_writer.h"
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <chrono>
|
||||
#include <exception>
|
||||
#include "lib/ob_errno.h"
|
||||
#include "share/ob_errno.h"
|
||||
#include "storage/multi_data_source/adapter_define/mds_dump_node.h"
|
||||
#include "lib/allocator/ob_malloc.h"
|
||||
#include "storage/multi_data_source/mds_node.h"
|
||||
#include "common/ob_clock_generator.h"
|
||||
#include "storage/multi_data_source/mds_row.h"
|
||||
#include "storage/multi_data_source/mds_unit.h"
|
||||
#include "storage/multi_data_source/mds_table_handle.h"
|
||||
#include "storage/multi_data_source/mds_table_handler.h"
|
||||
#include "storage/tx/ob_trans_define.h"
|
||||
#include <algorithm>
|
||||
#include <numeric>
|
||||
#include "storage/multi_data_source/runtime_utility/mds_lock.h"
|
||||
#include "storage/tablet/ob_tablet_meta.h"
|
||||
#include "storage/multi_data_source/mds_table_mgr.h"
|
||||
#include "storage/ls/ob_ls.h"
|
||||
#include "storage/multi_data_source/mds_table_handle.h"
|
||||
#include "storage/multi_data_source/mds_table_order_flusher.h"
|
||||
#include "storage/tablet/ob_mds_schema_helper.h"
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
namespace mds {
|
||||
void *DefaultAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void DefaultAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size) {
|
||||
void *ptr = std::malloc(size);// ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
std::free(ptr);// ob_free(ptr);
|
||||
}
|
||||
}}}
|
||||
namespace oceanbase {
|
||||
namespace storage
|
||||
{
|
||||
|
||||
share::SCN MOCK_MAX_CONSEQUENT_CALLBACKED_SCN;
|
||||
|
||||
namespace mds
|
||||
{
|
||||
|
||||
int MdsTableBase::merge(const int64_t construct_sequence, const share::SCN &flushing_scn)
|
||||
{
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
namespace unittest {
|
||||
|
||||
using namespace common;
|
||||
using namespace std;
|
||||
using namespace storage;
|
||||
using namespace mds;
|
||||
using namespace transaction;
|
||||
|
||||
class TestMdsTableForcellyRemove: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsTableForcellyRemove() { ObMdsSchemaHelper::get_instance().init(); }
|
||||
virtual ~TestMdsTableForcellyRemove() {}
|
||||
virtual void SetUp() {
|
||||
}
|
||||
virtual void TearDown() {
|
||||
}
|
||||
private:
|
||||
// disallow copy
|
||||
DISALLOW_COPY_AND_ASSIGN(TestMdsTableForcellyRemove);
|
||||
};
|
||||
|
||||
TEST_F(TestMdsTableForcellyRemove, reset) {
|
||||
MdsTableHandle handle;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), share::SCN::min_scn(), (ObTabletPointer*)0x111));
|
||||
MdsCtx ctx1(MdsWriter(transaction::ObTransID(1)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx1);
|
||||
ctx1.on_redo(mock_scn(10));
|
||||
ctx1.on_commit(mock_scn(20), mock_scn(20));
|
||||
MdsCtx ctx2(MdsWriter(transaction::ObTransID(2)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx2);
|
||||
ctx2.on_redo(mock_scn(21));
|
||||
ctx2.on_commit(mock_scn(25), mock_scn(25));
|
||||
MdsCtx ctx3(MdsWriter(transaction::ObTransID(3)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx3);
|
||||
ctx3.on_redo(mock_scn(29));
|
||||
ctx3.on_commit(mock_scn(40), mock_scn(40));
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, handle.forcely_remove_nodes("test", share::SCN::max_scn()));
|
||||
int64_t node_cnt = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_node_cnt(node_cnt));
|
||||
ASSERT_EQ(0, node_cnt);
|
||||
share::SCN rec_scn_after_remove;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn_after_remove));
|
||||
ASSERT_EQ(share::SCN::max_scn(), rec_scn_after_remove);
|
||||
|
||||
MdsCtx ctx4(MdsWriter(transaction::ObTransID(1)));
|
||||
int ret = handle.replay<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx4, mock_scn(10));
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_node_cnt(node_cnt));
|
||||
ASSERT_EQ(1, node_cnt);// success
|
||||
}
|
||||
|
||||
TEST_F(TestMdsTableForcellyRemove, remove) {
|
||||
MdsTableHandle handle;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.init<UnitTestMdsTable>(MdsAllocator::get_instance(), ObTabletID(1), share::ObLSID(1), share::SCN::min_scn(), (ObTabletPointer*)0x111));
|
||||
MdsCtx ctx1(MdsWriter(transaction::ObTransID(1)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx1);
|
||||
ctx1.on_redo(mock_scn(10));
|
||||
ctx1.on_commit(mock_scn(20), mock_scn(20));
|
||||
MdsCtx ctx2(MdsWriter(transaction::ObTransID(2)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx2);
|
||||
ctx2.on_redo(mock_scn(21));
|
||||
ctx2.on_commit(mock_scn(25), mock_scn(25));
|
||||
MdsCtx ctx3(MdsWriter(transaction::ObTransID(3)));
|
||||
handle.set<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx3);
|
||||
ctx3.on_redo(mock_scn(29));
|
||||
ctx3.on_commit(mock_scn(40), mock_scn(40));
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, handle.forcely_remove_nodes("test", mock_scn(27)));
|
||||
int64_t node_cnt = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_node_cnt(node_cnt));
|
||||
ASSERT_EQ(1, node_cnt);
|
||||
share::SCN rec_scn_after_remove;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn_after_remove));
|
||||
ASSERT_EQ(mock_scn(29), rec_scn_after_remove);
|
||||
|
||||
MdsCtx ctx4(MdsWriter(transaction::ObTransID(1)));
|
||||
int ret = handle.replay<ExampleUserKey, ExampleUserData1>(ExampleUserKey(1), ExampleUserData1(1), ctx4, mock_scn(10));
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_node_cnt(node_cnt));
|
||||
ASSERT_EQ(1, node_cnt);// failed
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
system("rm -rf test_mds_forcelly_remove.log");
|
||||
oceanbase::common::ObLogger &logger = oceanbase::common::ObLogger::get_logger();
|
||||
logger.set_file_name("test_mds_forcelly_remove.log", false);
|
||||
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
int ret = RUN_ALL_TESTS();
|
||||
int64_t alloc_times = oceanbase::storage::mds::MdsAllocator::get_alloc_times();
|
||||
int64_t free_times = oceanbase::storage::mds::MdsAllocator::get_free_times();
|
||||
if (alloc_times != free_times) {
|
||||
MDS_LOG(ERROR, "memory may leak", K(free_times), K(alloc_times));
|
||||
ret = -1;
|
||||
} else {
|
||||
MDS_LOG(INFO, "all memory released", K(free_times), K(alloc_times));
|
||||
}
|
||||
return ret;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user