diff --git a/deps/oblib/src/common/meta_programming/ob_meta_copy.h b/deps/oblib/src/common/meta_programming/ob_meta_copy.h index 7c50f1359..3a0c8af2e 100644 --- a/deps/oblib/src/common/meta_programming/ob_meta_copy.h +++ b/deps/oblib/src/common/meta_programming/ob_meta_copy.h @@ -18,6 +18,7 @@ inline int copy_or_assign(const T &src, T &dst, ObIAllocator &alloc = DummyAllocator::get_instance()) { + OCCAM_LOG(DEBUG, "call data assign method with allocator"); return dst.assign(alloc, src); } @@ -29,6 +30,7 @@ inline int copy_or_assign(const T &src, ObIAllocator &alloc = DummyAllocator::get_instance()) { UNUSED(alloc); + OCCAM_LOG(DEBUG, "call data assign method"); return dst.assign(src); } @@ -42,6 +44,7 @@ inline int copy_or_assign(const T &src, ObIAllocator &alloc = DummyAllocator::get_instance()) { UNUSED(alloc); + OCCAM_LOG(DEBUG, "call data assign operator"); dst = src; return common::OB_SUCCESS; } @@ -57,6 +60,7 @@ inline int copy_or_assign(const T &src, ObIAllocator &alloc = DummyAllocator::get_instance()) { UNUSED(alloc); + OCCAM_LOG(DEBUG, "call data copy construction"); new (&dst) T (src); return common::OB_SUCCESS; } @@ -86,20 +90,21 @@ inline int copy_or_assign(const T &src, // user will benefit from move sematic if dst is an rvalue and support move sematic // 1.1 try standard move assignment template ::value && + typename std::enable_if::value && std::is_move_assignable::value, bool>::type = true> inline int move_or_copy_or_assign(T &&src, T &dst, ObIAllocator &alloc = DummyAllocator::get_instance()) { UNUSED(alloc); + OCCAM_LOG(DEBUG, "call data move assign operator"); dst = std::move(src); return common::OB_SUCCESS; } // 1.2 try move construction template ::value && + typename std::enable_if::value && !std::is_move_assignable::value && std::is_move_constructible::value, bool>::type = true> inline int move_or_copy_or_assign(T &&src, @@ -107,6 +112,7 @@ inline int move_or_copy_or_assign(T &&src, ObIAllocator &alloc = DummyAllocator::get_instance()) { UNUSED(alloc); + OCCAM_LOG(DEBUG, "call data copy move construction"); new (&dst) T (std::move(src)); return common::OB_SUCCESS; } diff --git a/src/storage/multi_data_source/mds_row.h b/src/storage/multi_data_source/mds_row.h index bb4f48963..f64f50dde 100644 --- a/src/storage/multi_data_source/mds_row.h +++ b/src/storage/multi_data_source/mds_row.h @@ -27,6 +27,7 @@ #include "storage/multi_data_source/adapter_define/mds_dump_node.h" #include #include "deps/oblib/src/common/meta_programming/ob_meta_copy.h" +#include "runtime_utility/mds_retry_control.h" namespace oceanbase { @@ -76,7 +77,7 @@ public: template int set(DATA &&data, MdsCtx &ctx, - const int64_t lock_timeout_us, + const RetryParam &retry_param, const bool is_for_remove = false); template int replay(DATA &&data, @@ -87,7 +88,7 @@ public: int get_snapshot(READ_OP &&read_operation, const share::SCN snapshot, const int64_t read_seq, - const int64_t timeout_us) const; + const RetryParam &retry_param) const; template int get_latest(READ_OP &&read_operation, const int64_t read_seq) const; template @@ -95,7 +96,7 @@ public: const MdsWriter &writer, const share::SCN snapshot, const int64_t read_seq, - const int64_t timeout_us) const; + const RetryParam &retry_param) const; template int scan_dump_node_from_tail_to_head(DUMP_OP &&op, const uint8_t mds_table_id, @@ -116,10 +117,6 @@ public: int for_each_node_(OPERATION &&op) const; template int get_with_read_wrapper_(READ_OP &&read_operation, SPECIFIED_GET_LOGIC &&specified_logic) const; - template - int write_operation_wrapper_(WRITE_OPERATION &&write_op, - MdsCtx &ctx, - const int64_t lock_timeout_us); template int construct_insert_record_user_mds_node_(MdsRowBase *p_mds_row, DATA &&data, @@ -128,7 +125,7 @@ public: MdsCtx &ctx); int check_node_snapshot_(const UserMdsNode &node, const share::SCN snapshot, - const int64_t timeout_ts, + const RetryParam &retry_param, bool &can_read) const; template void report_event_(const char (&event_str)[N], diff --git a/src/storage/multi_data_source/mds_row.ipp b/src/storage/multi_data_source/mds_row.ipp index c0f74bfbb..908af44ef 100644 --- a/src/storage/multi_data_source/mds_row.ipp +++ b/src/storage/multi_data_source/mds_row.ipp @@ -234,90 +234,50 @@ int MdsRow::construct_insert_record_user_mds_node_(MdsRowBase *mds_r } template -template -int MdsRow::write_operation_wrapper_(WRITE_OPERATION &&write_op, - MdsCtx &ctx, - const int64_t lock_timeout_us) +template +int MdsRow::set(DATA &&data, + MdsCtx &ctx, + const RetryParam &retry_param, + const bool is_for_remove) { - #define PRINT_WRAPPER KR(ret), K(typeid(V).name()), K(ctx), KTIME(timeout_ts),\ - K(lock_timeout_us), K(try_lock_times), K(*this) + #define PRINT_WRAPPER KR(ret), K(typeid(V).name()), K(ctx), K(retry_param), K(*this) int ret = OB_SUCCESS; bool write_conflict = true; - int try_lock_times = 0; - int64_t timeout_ts = ObClockGenerator::getRealClock() + lock_timeout_us; - MDS_TG(std::max(lock_timeout_us, (int64_t)5_ms)); + MDS_TG(5_ms); if (!ctx.can_write()) { ret = OB_INVALID_ARGUMENT; MDS_LOG_SET(WARN, "invalid mds ctx"); } else { - do { - MDS_TG(5_ms); - MdsWLockGuard lg(MdsRowBase::lock_);// lock row - if (sorted_list_.empty()) {// for now, all writting data are in memory - write_conflict = false; - } else { - if (OB_LIKELY(sorted_list_.get_head().is_decided_())) { - write_conflict = false; - } else if (sorted_list_.get_head().get_writer_() == ctx.get_writer()) { - write_conflict = false; - } - } - UserMdsNode *new_node = nullptr; - if (OB_UNLIKELY(write_conflict)) { - ret = OB_TRY_LOCK_ROW_CONFLICT; - } else if (MDS_FAIL(write_op())) { - MDS_LOG_SET(WARN, "execute write op failed"); - } else { - MDS_LOG_SET(TRACE, "MdsRow set node success"); - } - } while (++try_lock_times && - OB_TRY_LOCK_ROW_CONFLICT == ret &&// keep trying but release latch - ObClockGenerator::getRealClock() < timeout_ts &&// check timeout - ({ob_usleep(50_ms); true;}));// if not, wait and try again - } - if (OB_FAIL(ret)) { - if (lock_timeout_us != 0 && OB_TRY_LOCK_ROW_CONFLICT == ret) { - ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT; - MDS_LOG_SET(WARN, "write data conflict cause reach lock timeout"); + MdsWLockGuard lg(MdsRowBase::lock_);// lock row + if (sorted_list_.empty()) {// for now, all writting data are in memory + write_conflict = false; } else { - MDS_LOG_SET(WARN, "write failed"); + if (OB_LIKELY(sorted_list_.get_head().is_decided_())) { + write_conflict = false; + } else if (sorted_list_.get_head().get_writer_() == ctx.get_writer()) { + write_conflict = false; + } + } + UserMdsNode *new_node = nullptr; + if (OB_UNLIKELY(write_conflict)) { + ret = OB_EAGAIN; + if (retry_param.check_reach_print_interval_and_update()) { + MDS_LOG_SET(INFO, "mds row write conflict"); + } + } else if (MDS_FAIL(construct_insert_record_user_mds_node_(this, + std::forward(data), + is_for_remove ? MdsNodeType::DELETE : MdsNodeType::SET, + share::SCN::max_scn(), + ctx))) { + MDS_LOG_SET(WARN, "execute write op failed"); + } else { + MDS_LOG_SET(DEBUG, "MdsRow set node success"); } - } else { - MDS_LOG_SET(TRACE, "MdsRow write node success"); } return ret; #undef PRINT_WRAPPER } -template -template -int MdsRow::set(DATA &&data, - MdsCtx &ctx, - const int64_t lock_timeout_us, - const bool is_for_remove) -{ - ForwardWrapper forward_wrapper(std::forward(data)); - return write_operation_wrapper_( - [&forward_wrapper, &ctx, is_for_remove, this]() -> int { - int ret = OB_SUCCESS; - if (forward_wrapper.is_constructed_from_lvalue_) { - ret = construct_insert_record_user_mds_node_(this, - forward_wrapper.data_, - is_for_remove ? MdsNodeType::DELETE : MdsNodeType::SET, - share::SCN::max_scn(), - ctx); - } else { - ret = construct_insert_record_user_mds_node_(this, - std::move(forward_wrapper.data_), - is_for_remove ? MdsNodeType::DELETE : MdsNodeType::SET, - share::SCN::max_scn(), - ctx); - } - return ret; - }, - ctx, lock_timeout_us); -} - template template int MdsRow::replay(DATA &&data, @@ -352,12 +312,10 @@ int MdsRow::replay(DATA &&data, } // this function must be called from for_each_lock_ method with lock protection. -// but this function may hung until node decided, so when it sleep, must release lock, -// when it awaken, must add lock again. template int MdsRow::check_node_snapshot_(const UserMdsNode &node, const share::SCN snapshot, - const int64_t timeout_ts, + const RetryParam &retry_param, bool &can_read) const { #define PRINT_WRAPPER KR(ret), KPC(node), K(typeid(READ_OP).name()), K(retry_times), K(can_read),\ @@ -369,11 +327,9 @@ int MdsRow::check_node_snapshot_(const UserMdsNode &node, snapshot_converted = share::SCN::scn_dec(snapshot_converted); } if (!node.is_decided_() && node.get_prepare_version_() <= snapshot_converted) { - if (ObClockGenerator::getRealClock() > timeout_ts) { - ret = OB_ERR_SHARED_LOCK_CONFLICT; - MDS_LOG(WARN, "timeout when wait node decided", K(node), K(node.get_prepare_version_()), K(snapshot_converted), K(snapshot)); - } else { - ret = OB_EAGAIN; + ret = OB_EAGAIN; + if (retry_param.check_reach_print_interval_and_update()) { + MDS_LOG(WARN, "mds row lock_for_read conflict"); } } else if (node.is_committed_() && node.get_commit_version_() <= snapshot_converted) { can_read = true; @@ -428,28 +384,24 @@ template int MdsRow::get_snapshot(READ_OP &&read_operation, const share::SCN snapshot, const int64_t read_seq, - const int64_t timeout_us) const + const RetryParam &retry_param) const { - #define PRINT_WRAPPER KR(ret), K(snapshot), K(read_seq), K(timeout_us), K(timeout_ts), K(*this) + #define PRINT_WRAPPER KR(ret), K(snapshot), K(read_seq), K(retry_param), K(*this) int ret = OB_SUCCESS; UNUSED(read_seq); - int64_t timeout_ts = ObClockGenerator::getRealClock() + timeout_us; MDS_TG(5_ms); - do { - MdsRLockGuard lg(MdsRowBase::lock_); - if (MDS_FAIL(get_with_read_wrapper_(read_operation, - [&](const UserMdsNode &node, bool &can_read) -> int { - return check_node_snapshot_(node, snapshot, timeout_ts, can_read); - } - ))) { - if (OB_EAGAIN != ret && OB_SNAPSHOT_DISCARDED != ret) { - MDS_LOG_GET(WARN, "MdsRow get_snapshot failed"); - } - } else { - MDS_LOG_GET(TRACE, "MdsRow get_snapshot success"); + MdsRLockGuard lg(MdsRowBase::lock_); + if (MDS_FAIL(get_with_read_wrapper_(read_operation, + [this, snapshot, &retry_param](const UserMdsNode &node, bool &can_read) -> int { + return check_node_snapshot_(node, snapshot, retry_param, can_read); } - } while (OB_EAGAIN == ret && - ({ob_usleep(10_ms); true;})); + ))) { + if (OB_EAGAIN != ret && OB_SNAPSHOT_DISCARDED != ret) { + MDS_LOG_GET(WARN, "MdsRow get_snapshot failed"); + } + } else { + MDS_LOG_GET(DEBUG, "MdsRow get_snapshot success"); + } return ret; #undef PRINT_WRAPPER } @@ -485,35 +437,30 @@ int MdsRow::get_by_writer(READ_OP &&read_operation, const MdsWriter &writer, const share::SCN snapshot, const int64_t read_seq, - const int64_t timeout_us) const + const RetryParam &retry_param) const { - #define PRINT_WRAPPER KR(ret), K(writer), K(snapshot), K(read_seq), K(timeout_us),\ - KTIME(timeout_ts), K(*this) + #define PRINT_WRAPPER KR(ret), K(writer), K(snapshot), K(read_seq), K(retry_param), K(*this) int ret = OB_SUCCESS; UNUSED(read_seq); MDS_TG(5_ms); - int64_t timeout_ts = ObClockGenerator::getRealClock() + timeout_us; - do { - MdsRLockGuard lg(MdsRowBase::lock_); - if (MDS_FAIL(get_with_read_wrapper_(read_operation, - [&](const UserMdsNode &node, bool &can_read) -> int { - int ret = OB_SUCCESS; - if (node.get_writer_() == writer) { - can_read = true; - } else { - ret = check_node_snapshot_(node, snapshot, timeout_ts, can_read); - } - return ret; + MdsRLockGuard lg(MdsRowBase::lock_); + if (MDS_FAIL(get_with_read_wrapper_(read_operation, + [this, snapshot, &writer, &retry_param](const UserMdsNode &node, bool &can_read) -> int { + int ret = OB_SUCCESS; + if (node.get_writer_() == writer) { + can_read = true; + } else { + ret = check_node_snapshot_(node, snapshot, retry_param, can_read); } - ))) { - if (OB_UNLIKELY(OB_EAGAIN != ret && OB_SNAPSHOT_DISCARDED != ret)) { - MDS_LOG_GET(WARN, "MdsRow get_by_writer failed"); - } - } else { - MDS_LOG_GET(TRACE, "MdsRow get_by_writer success"); + return ret; } - } while (OB_EAGAIN == ret && - ({ob_usleep(10_ms); true;})); + ))) { + if (OB_UNLIKELY(OB_EAGAIN != ret && OB_SNAPSHOT_DISCARDED != ret)) { + MDS_LOG_GET(WARN, "MdsRow get_by_writer failed"); + } + } else { + MDS_LOG_GET(DEBUG, "MdsRow get_by_writer success"); + } return ret; #undef PRINT_WRAPPER } diff --git a/src/storage/multi_data_source/mds_table_handle.ipp b/src/storage/multi_data_source/mds_table_handle.ipp index 924e761c0..b6b3472fd 100644 --- a/src/storage/multi_data_source/mds_table_handle.ipp +++ b/src/storage/multi_data_source/mds_table_handle.ipp @@ -164,7 +164,7 @@ int MdsTableHandle::set(T &&data, MdsCtx &ctx, const int64_t lock_timeout_us) if (OB_FAIL(p_mds_table_base_->set(unit_id, (void*)&dummy_key, (void*)&data, - std::is_rvalue_reference::value, + std::is_rvalue_reference::value, ctx, lock_timeout_us))) { MDS_LOG(WARN, "fail to call set", KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us)); diff --git a/src/storage/multi_data_source/mds_unit.h b/src/storage/multi_data_source/mds_unit.h index cad504d18..11851dd74 100644 --- a/src/storage/multi_data_source/mds_unit.h +++ b/src/storage/multi_data_source/mds_unit.h @@ -48,6 +48,89 @@ struct KvPair : public ListNode> template class MdsUnit final : public MdsUnitBase { + struct SetOP { + SetOP() = delete; + SetOP(const SetOP &) = delete; + SetOP &operator=(const SetOP &) = delete; + SetOP(MdsUnit *p_this, + bool is_lvalue, + MdsTableBase *p_mds_table, + const K &key, + V &value, + MdsCtx &ctx, + bool is_for_remove, + RetryParam &retry_param) : + this_(p_this), + is_lvalue_(is_lvalue), + p_mds_table_(p_mds_table), + key_(key), + value_(value), + ctx_(ctx), + is_for_remove_(is_for_remove), + retry_param_(retry_param) {} + int operator()(); + MdsUnit *this_; + bool is_lvalue_; + MdsTableBase *p_mds_table_; + const K &key_; + V &value_; + MdsCtx &ctx_; + bool is_for_remove_; + RetryParam &retry_param_; + }; + template + struct GetSnapShotOp { + GetSnapShotOp() = delete; + GetSnapShotOp(const GetSnapShotOp &) = delete; + GetSnapShotOp &operator=(const GetSnapShotOp &) = delete; + GetSnapShotOp(const MdsUnit *p_this, + const K &key, + OP &read_op, + share::SCN snapshot, + int64_t read_seq, + RetryParam &retry_param) + : this_(p_this), + key_(key), + read_op_(read_op), + snapshot_(snapshot), + read_seq_(read_seq), + retry_param_(retry_param) {} + int operator()(); + const MdsUnit *this_; + const K &key_; + OP &read_op_; + share::SCN snapshot_; + int64_t read_seq_; + RetryParam &retry_param_; + }; + template + struct GetByWriterOp { + GetByWriterOp() = delete; + GetByWriterOp(const GetByWriterOp &) = delete; + GetByWriterOp &operator=(const GetByWriterOp &) = delete; + GetByWriterOp(const MdsUnit *p_this, + const K &key, + OP &read_op, + const MdsWriter &writer, + share::SCN snapshot, + int64_t read_seq, + RetryParam &retry_param) + : this_(p_this), + key_(key), + read_op_(read_op), + writer_(writer), + snapshot_(snapshot), + read_seq_(read_seq), + retry_param_(retry_param) {} + int operator()(); + const MdsUnit *this_; + const K &key_; + OP &read_op_; + const MdsWriter &writer_; + share::SCN snapshot_; + int64_t read_seq_; + RetryParam &retry_param_; + }; public: typedef K key_type; typedef V value_type; @@ -124,8 +207,9 @@ public: const char *file = __builtin_FILE(), const uint32_t line = __builtin_LINE(), const char *function_name = __builtin_FUNCTION()) const; - const Row *get_row_from_list_(const K &key) const; - int insert_empty_kv_to_list_(const K &key, Row *&row, MdsTableBase *p_mds_table); + KvPair> *get_row_from_list_(const K &key) const; + int insert_empty_kv_to_list_(const K &key, KvPair> *&p_kv, MdsTableBase *p_mds_table); + void erase_kv_from_list_if_empty_(KvPair> *p_kv); SortedList>, SORT_TYPE::ASC> multi_row_list_; mutable MdsLock lock_; }; @@ -134,6 +218,74 @@ public: template class MdsUnit final : public MdsUnitBase { + struct SetOP { + SetOP() = delete; + SetOP(const SetOP &) = delete; + SetOP &operator=(const SetOP &) = delete; + SetOP(MdsUnit *p_this, + bool is_lvalue, + V &value, + MdsCtx &ctx, + RetryParam &retry_param) : + this_(p_this), + is_lvalue_(is_lvalue), + value_(value), + ctx_(ctx), + retry_param_(retry_param) {} + int operator()(); + MdsUnit *this_; + bool is_lvalue_; + V &value_; + MdsCtx &ctx_; + RetryParam &retry_param_; + }; + template + struct GetSnapShotOp { + GetSnapShotOp() = delete; + GetSnapShotOp(const GetSnapShotOp &) = delete; + GetSnapShotOp &operator=(const GetSnapShotOp &) = delete; + GetSnapShotOp(const MdsUnit *p_this, + OP &read_op, + share::SCN snapshot, + int64_t read_seq, + RetryParam &retry_param) + : this_(p_this), + read_op_(read_op), + snapshot_(snapshot), + read_seq_(read_seq), + retry_param_(retry_param) {} + int operator()(); + const MdsUnit *this_; + OP &read_op_; + share::SCN snapshot_; + int64_t read_seq_; + RetryParam &retry_param_; + }; + template + struct GetByWriterOp { + GetByWriterOp() = delete; + GetByWriterOp(const GetByWriterOp &) = delete; + GetByWriterOp &operator=(const GetByWriterOp &) = delete; + GetByWriterOp(const MdsUnit *p_this, + OP &read_op, + const MdsWriter &writer, + share::SCN snapshot, + int64_t read_seq, + RetryParam &retry_param) + : this_(p_this), + read_op_(read_op), + writer_(writer), + snapshot_(snapshot), + read_seq_(read_seq), + retry_param_(retry_param) {} + int operator()(); + const MdsUnit *this_; + OP &read_op_; + const MdsWriter &writer_; + share::SCN snapshot_; + int64_t read_seq_; + RetryParam &retry_param_; + }; public: typedef DummyKey key_type; typedef V value_type; diff --git a/src/storage/multi_data_source/mds_unit.ipp b/src/storage/multi_data_source/mds_unit.ipp index 97ce37feb..fb21e290b 100644 --- a/src/storage/multi_data_source/mds_unit.ipp +++ b/src/storage/multi_data_source/mds_unit.ipp @@ -13,8 +13,12 @@ #define STORAGE_MULTI_DATA_SOURCE_MDS_UNIT_IPP #include "lib/list/ob_dlist.h" +#include "lib/ob_errno.h" +#include "ob_clock_generator.h" #include "observer/virtual_table/ob_mds_event_buffer.h" #include "share/ob_errno.h" +#include "runtime_utility/mds_retry_control.h" +#include #ifndef STORAGE_MULTI_DATA_SOURCE_MDS_UNIT_H_IPP #define STORAGE_MULTI_DATA_SOURCE_MDS_UNIT_H_IPP #include "mds_unit.h" @@ -188,7 +192,7 @@ int MdsUnit::for_each_row(FowEachRowAction action_type, OP &&op)// node ma MDS_TG(1_ms); const K *p_k = &kv_row.k_; const Row &row = kv_row.v_; - if (MDS_FAIL(op(row))) { + if (MDS_FAIL(op(row))) {// node maybe recycled inside op MDS_LOG_SCAN(WARN, "fail to scan row", KPC(p_k)); } // CAUTIONS: not every path scan need recycle empty row, or maybe result some problem unexpected, for example: @@ -197,11 +201,7 @@ int MdsUnit::for_each_row(FowEachRowAction action_type, OP &&op)// node ma // but destroy mds_row will add row's lock inner destruction, which will resulting deadlock in same thread. // so only operations logic behaves like gc should recycle empty row. if (FowEachRowAction::RECYCLE == action_type || FowEachRowAction::RESET == action_type) { - if (row.sorted_list_.empty()) {// if this row is recycled, just delete it - KvPair> *p_kv = &const_cast> &>(kv_row); - multi_row_list_.del(p_kv); - MdsFactory::destroy(p_kv); - } + erase_kv_from_list_if_empty_(&const_cast> &>(kv_row)); } return OB_SUCCESS != ret;// keep scanning until meet failure }); @@ -210,27 +210,26 @@ int MdsUnit::for_each_row(FowEachRowAction action_type, OP &&op)// node ma } template -const Row *MdsUnit::get_row_from_list_(const K &key) const +KvPair> *MdsUnit::get_row_from_list_(const K &key) const { - const Row *row = nullptr; + KvPair> *p_kv = nullptr; multi_row_list_.for_each_node_from_head_to_tail_until_true( - [&row, &key](const KvPair> &kv_pair) { + [&p_kv, &key](const KvPair> &kv_pair) { if (kv_pair.k_ == key) { - row = &const_cast &>(kv_pair.v_); + p_kv = &const_cast> &>(kv_pair); } - return nullptr != row; + return nullptr != p_kv; } ); - return row; + return p_kv; } template -int MdsUnit::insert_empty_kv_to_list_(const K &key, Row *&row, MdsTableBase *p_mds_table) +int MdsUnit::insert_empty_kv_to_list_(const K &key, KvPair> *&p_kv, MdsTableBase *p_mds_table) { #define PRINT_WRAPPER KR(ret), K(key), K(typeid(K).name()), K(typeid(V).name()) int ret = OB_SUCCESS; MDS_TG(1_ms); - KvPair> *p_kv; set_mds_mem_check_thread_local_info(p_mds_table->ls_id_, p_mds_table->tablet_id_, typeid(p_kv).name()); @@ -242,14 +241,57 @@ int MdsUnit::insert_empty_kv_to_list_(const K &key, Row *&row, MdsTa } else { p_kv->v_.key_ = &(p_kv->k_); multi_row_list_.insert(p_kv); - row = &(p_kv->v_); - report_event_("CREATE_KEY", key); } reset_mds_mem_check_thread_local_info(); return ret; #undef PRINT_WRAPPER } +template +void MdsUnit::erase_kv_from_list_if_empty_(KvPair> *p_kv) +{ + int ret = OB_SUCCESS; + Row &mds_row = p_kv->v_; + if (mds_row.sorted_list_.empty()) { + if (!multi_row_list_.check_node_exist(p_kv)) { + MDS_LOG(ERROR, "mds row is not record in list", KPC(p_kv)); + } else { + multi_row_list_.del(p_kv); + MdsFactory::destroy(p_kv); + } + } + return; +} + +template +int MdsUnit::SetOP::operator()() { + #define PRINT_WRAPPER KR(ret), K_(key), K_(value), K_(ctx), K_(retry_param), K_(is_for_remove) + int ret = OB_SUCCESS; + MDS_TG(10_ms); + KvPair> *p_kv = this_->get_row_from_list_(key_); + if (OB_ISNULL(p_kv)) { + if (MDS_FAIL(this_->insert_empty_kv_to_list_(key_, p_kv, p_mds_table_))) { + MDS_LOG_SET(WARN, "insert new key to unit failed"); + } else { + MDS_LOG_SET(INFO, "insert new key to unit"); + } + } + if (OB_SUCC(ret)) { + if (is_lvalue_) { + ret = p_kv->v_.set(value_, ctx_, retry_param_, is_for_remove_); + } else { + ret = p_kv->v_.set(std::move(value_), ctx_, retry_param_, is_for_remove_); + } + if (MDS_FAIL(ret)) { + this_->erase_kv_from_list_if_empty_(p_kv);// rollback + MDS_LOG_SET(WARN, "MdsUnit set value failed"); + } else { + MDS_LOG_SET(TRACE, "MdsUnit set value success"); + } + } + return ret; + #undef PRINT_WRAPPER +} template template int MdsUnit::set(MdsTableBase *p_mds_table, @@ -259,31 +301,17 @@ int MdsUnit::set(MdsTableBase *p_mds_table, const int64_t lock_timeout_us, const bool is_for_remove) { - #define PRINT_WRAPPER KR(ret), K(key), K(value), K(ctx), K(lock_timeout_us), K(is_for_remove) int ret = OB_SUCCESS; MDS_TG(10_ms); - MdsWLockGuard lg(lock_); - CLICK(); - Row *mds_row = const_cast *>(get_row_from_list_(key)); - if (OB_ISNULL(mds_row)) { - if (MDS_FAIL(insert_empty_kv_to_list_(key, mds_row, p_mds_table))) { - MDS_LOG_SET(WARN, "insert new key to unit failed"); - } else { - MDS_LOG_SET(INFO, "insert new key to unit"); - } - } - if (OB_SUCC(ret)) { - if (MDS_FAIL(mds_row->set(std::forward(value), - ctx, - lock_timeout_us, - is_for_remove))) { - MDS_LOG_SET(WARN, "MdsUnit set value failed"); - } else { - MDS_LOG_SET(TRACE, "MdsUnit set value success"); + bool is_lvalue = std::is_lvalue_reference::value; + RetryParam retry_param(lock_timeout_us); + SetOP op(this, is_lvalue, p_mds_table, key, value, ctx, is_for_remove, retry_param); + if (MDS_FAIL(retry_release_lock_with_op_until_timeout(lock_, retry_param, op))) { + if (OB_TIMEOUT == ret) { + ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT; } } return ret; - #undef PRINT_WRAPPER } template @@ -300,19 +328,20 @@ int MdsUnit::replay(MdsTableBase *p_mds_table, MDS_TG(10_ms); MdsWLockGuard lg(lock_); CLICK(); - Row *mds_row = const_cast *>(get_row_from_list_(key)); - if (OB_ISNULL(mds_row)) { - if (MDS_FAIL(insert_empty_kv_to_list_(key, mds_row, p_mds_table))) { + KvPair> *p_kv = get_row_from_list_(key); + if (OB_ISNULL(p_kv)) { + if (MDS_FAIL(insert_empty_kv_to_list_(key, p_kv, p_mds_table))) { MDS_LOG_SET(WARN, "insert new key to unit failed"); } else { MDS_LOG_SET(INFO, "insert new key to unit"); } } if (OB_SUCC(ret)) { - if (MDS_FAIL(mds_row->replay(std::forward(value), + if (MDS_FAIL(p_kv->v_.replay(std::forward(value), ctx, scn, is_for_remove))) { + erase_kv_from_list_if_empty_(p_kv);// rollback MDS_LOG_SET(WARN, "MdsUnit set value failed"); } else { MDS_LOG_SET(TRACE, "MdsUnit set value success"); @@ -322,6 +351,27 @@ int MdsUnit::replay(MdsTableBase *p_mds_table, #undef PRINT_WRAPPER } +template +template +int MdsUnit::GetSnapShotOp::operator()() { + #define PRINT_WRAPPER KR(ret), K_(key), K(typeid(OP).name()), K_(snapshot), K_(read_seq), K_(retry_param) + int ret = OB_SUCCESS; + MDS_TG(10_ms); + KvPair> *p_kv = this_->get_row_from_list_(key_); + if (OB_ISNULL(p_kv)) { + ret = OB_ENTRY_NOT_EXIST; + MDS_LOG_GET(WARN, "row key not exist"); + } else if (MDS_FAIL(p_kv->v_.get_snapshot(read_op_, + snapshot_, + read_seq_, + retry_param_))) { + if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret && OB_EAGAIN != ret)) { + MDS_LOG_GET(WARN, "MdsUnit get_snapshot failed"); + } + } + return ret; + #undef PRINT_WRAPPER +} template template int MdsUnit::get_snapshot(const K &key, @@ -330,30 +380,42 @@ int MdsUnit::get_snapshot(const K &key, const int64_t read_seq, const int64_t lock_timeout_us) const { - #define PRINT_WRAPPER KR(ret), K(key), K(typeid(OP).name()), K(snapshot), K(read_seq),\ - K(lock_timeout_us) int ret = OB_SUCCESS; MDS_TG(10_ms); - MdsRLockGuard lg(lock_); - CLICK(); - const Row *mds_row = get_row_from_list_(key); - if (OB_ISNULL(mds_row)) { + RetryParam retry_param(lock_timeout_us); + GetSnapShotOp::type> op(this, key, read_op, snapshot, read_seq, retry_param); + if (MDS_FAIL(retry_release_lock_with_op_until_timeout(lock_, retry_param, op))) { + if (OB_TIMEOUT == ret) { + ret = OB_ERR_SHARED_LOCK_CONFLICT; + } + } + return ret; +} + +template +template +int MdsUnit::GetByWriterOp::operator()() { + #define PRINT_WRAPPER KR(ret), K_(key), K(typeid(OP).name()), K_(writer), K_(snapshot), K_(read_seq), K_(retry_param) + int ret = OB_SUCCESS; + MDS_TG(10_ms); + KvPair> *p_kv = this_->get_row_from_list_(key_); + if (OB_ISNULL(p_kv)) { ret = OB_ENTRY_NOT_EXIST; MDS_LOG_GET(WARN, "row key not exist"); - } else if (MDS_FAIL(mds_row->get_snapshot(std::forward(read_op), - snapshot, - read_seq, - lock_timeout_us))) { + } else if (MDS_FAIL(p_kv->v_.get_by_writer(read_op_, + writer_, + snapshot_, + read_seq_, + retry_param_))) { if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { - MDS_LOG_GET(WARN, "MdsUnit get_snapshot failed"); + MDS_LOG_GET(WARN, "MdsUnit get_by_writer failed"); } } else { - MDS_LOG_GET(TRACE, "MdsUnit get_snapshot success"); + MDS_LOG_GET(TRACE, "MdsUnit get_by_writer success"); } return ret; #undef PRINT_WRAPPER } - template template int MdsUnit::get_by_writer(const K &key, @@ -363,29 +425,16 @@ int MdsUnit::get_by_writer(const K &key, const int64_t read_seq, const int64_t lock_timeout_us) const { - #define PRINT_WRAPPER KR(ret), K(key), K(typeid(OP).name()), K(writer), K(snapshot), K(read_seq),\ - K(lock_timeout_us) int ret = OB_SUCCESS; MDS_TG(10_ms); - MdsRLockGuard lg(lock_); - CLICK(); - const Row *mds_row = get_row_from_list_(key); - if (OB_ISNULL(mds_row)) { - ret = OB_ENTRY_NOT_EXIST; - MDS_LOG_GET(WARN, "row key not exist"); - } else if (MDS_FAIL(mds_row->get_by_writer(std::forward(read_op), - writer, - snapshot, - read_seq, - lock_timeout_us))) { - if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { - MDS_LOG_GET(WARN, "MdsUnit get_by_writer failed"); + RetryParam retry_param(lock_timeout_us); + GetByWriterOp::type> op(this, key, read_op, writer, snapshot, read_seq, retry_param); + if (MDS_FAIL(retry_release_lock_with_op_until_timeout(lock_, retry_param, op))) { + if (OB_TIMEOUT == ret) { + ret = OB_ERR_SHARED_LOCK_CONFLICT; } - } else { - MDS_LOG_GET(TRACE, "MdsUnit get_by_writer success"); } return ret; - #undef PRINT_WRAPPER } template @@ -397,11 +446,11 @@ int MdsUnit::get_latest(const K &key, OP &&read_op, const int64_t read_seq MDS_TG(10_ms); MdsRLockGuard lg(lock_); CLICK(); - const Row *mds_row = get_row_from_list_(key); - if (OB_ISNULL(mds_row)) { + KvPair> *p_kv = get_row_from_list_(key); + if (OB_ISNULL(p_kv)) { ret = OB_ENTRY_NOT_EXIST; MDS_LOG_GET(WARN, "row key not exist"); - } else if (MDS_FAIL(mds_row->get_latest(std::forward(read_op), read_seq))) { + } else if (MDS_FAIL(p_kv->v_.get_latest(std::forward(read_op), read_seq))) { if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { MDS_LOG_GET(WARN, "MdsUnit get_latest failed"); } @@ -603,18 +652,16 @@ typename MdsUnit::const_reverse_iterator MdsUnit::cren { return const_reverse_iterator(nullptr); } template -template -int MdsUnit::set(MdsTableBase *p_mds_table, - Value &&value, - MdsCtx &ctx, - const int64_t lock_timeout_us) -{ - #define PRINT_WRAPPER KR(ret), K(value), K(ctx), K(lock_timeout_us) +int MdsUnit::SetOP::operator()() { + #define PRINT_WRAPPER KR(ret), K_(value), K_(ctx), K_(retry_param), K_(is_lvalue) int ret = OB_SUCCESS; MDS_TG(10_ms); - MdsWLockGuard lg(lock_); - CLICK(); - if (MDS_FAIL(single_row_.v_.set(std::forward(value), ctx, lock_timeout_us))) { + if (is_lvalue_) { + ret = this_->single_row_.v_.set(value_, ctx_, retry_param_); + } else { + ret = this_->single_row_.v_.set(std::move(value_), ctx_, retry_param_); + } + if (MDS_FAIL(ret)) { MDS_LOG_SET(WARN, "MdsUnit set value failed"); } else { MDS_LOG_SET(TRACE, "MdsUnit set value success"); @@ -622,6 +669,25 @@ int MdsUnit::set(MdsTableBase *p_mds_table, return ret; #undef PRINT_WRAPPER } +template +template +int MdsUnit::set(MdsTableBase *p_mds_table, + Value &&value, + MdsCtx &ctx, + const int64_t lock_timeout_us) +{ + int ret = OB_SUCCESS; + MDS_TG(10_ms); + RetryParam retry_param(lock_timeout_us); + bool is_lvalue = std::is_lvalue_reference::value; + SetOP op(this, is_lvalue, value, ctx, retry_param); + if (MDS_FAIL(retry_release_lock_with_op_until_timeout(lock_, retry_param, op))) { + if (OB_TIMEOUT == ret) { + ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT; + } + } + return ret; +} template template @@ -646,20 +712,14 @@ int MdsUnit::replay(MdsTableBase *p_mds_table, template template -int MdsUnit::get_snapshot(OP &&read_op, - const share::SCN snapshot, - const int64_t read_seq, - const int64_t lock_timeout_us) const -{ - #define PRINT_WRAPPER KR(ret), K(typeid(OP).name()), K(read_seq), K(lock_timeout_us) +int MdsUnit::GetSnapShotOp::operator()() { + #define PRINT_WRAPPER KR(ret), K(typeid(OP).name()), K_(read_seq), K_(retry_param) int ret = OB_SUCCESS; MDS_TG(10_ms); - MdsRLockGuard lg(lock_); - CLICK(); - if (MDS_FAIL(single_row_.v_.get_snapshot(std::forward(read_op), - snapshot, - read_seq, - lock_timeout_us))) { + if (MDS_FAIL(this_->single_row_.v_.get_snapshot(read_op_, + snapshot_, + read_seq_, + retry_param_))) { if (OB_SNAPSHOT_DISCARDED != ret) { MDS_LOG_GET(WARN, "MdsUnit get_snapshot failed"); } @@ -669,26 +729,36 @@ int MdsUnit::get_snapshot(OP &&read_op, return ret; #undef PRINT_WRAPPER } +template +template +int MdsUnit::get_snapshot(OP &&read_op, + const share::SCN snapshot, + const int64_t read_seq, + const int64_t lock_timeout_us) const +{ + int ret = OB_SUCCESS; + MDS_TG(10_ms); + RetryParam retry_param(lock_timeout_us); + GetSnapShotOp::type> op(this, read_op, snapshot, read_seq, retry_param); + if (MDS_FAIL(retry_release_lock_with_op_until_timeout(lock_, retry_param, op))) { + if (OB_TIMEOUT == ret) { + ret = OB_ERR_SHARED_LOCK_CONFLICT; + } + } + return ret; +} template template -int MdsUnit::get_by_writer(OP &&read_op, - const MdsWriter &writer, - const share::SCN snapshot, - const int64_t read_seq, - const int64_t lock_timeout_us) const -{ - #define PRINT_WRAPPER KR(ret), K(typeid(OP).name()), K(writer), K(snapshot), K(read_seq),\ - K(lock_timeout_us) +int MdsUnit::GetByWriterOp::operator()() { + #define PRINT_WRAPPER KR(ret), K(typeid(OP).name()), K_(writer), K_(snapshot), K_(read_seq), K_(retry_param) int ret = OB_SUCCESS; MDS_TG(10_ms); - MdsRLockGuard lg(lock_); - CLICK(); - if (MDS_FAIL(single_row_.v_.get_by_writer(std::forward(read_op), - writer, - snapshot, - read_seq, - lock_timeout_us))) { + if (MDS_FAIL(this_->single_row_.v_.get_by_writer(read_op_, + writer_, + snapshot_, + read_seq_, + retry_param_))) { if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { MDS_LOG_GET(WARN, "MdsUnit get_by_writer failed"); } @@ -698,6 +768,25 @@ int MdsUnit::get_by_writer(OP &&read_op, return ret; #undef PRINT_WRAPPER } +template +template +int MdsUnit::get_by_writer(OP &&read_op, + const MdsWriter &writer, + const share::SCN snapshot, + const int64_t read_seq, + const int64_t lock_timeout_us) const +{ + int ret = OB_SUCCESS; + MDS_TG(10_ms); + RetryParam retry_param(lock_timeout_us); + GetByWriterOp::type> op(this, read_op, writer, snapshot, read_seq, retry_param); + if (MDS_FAIL(retry_release_lock_with_op_until_timeout(lock_, retry_param, op))) { + if (OB_TIMEOUT == ret) { + ret = OB_ERR_SHARED_LOCK_CONFLICT; + } + } + return ret; +} template template diff --git a/src/storage/multi_data_source/runtime_utility/common_define.h b/src/storage/multi_data_source/runtime_utility/common_define.h index 6d6a28c13..457c2076e 100644 --- a/src/storage/multi_data_source/runtime_utility/common_define.h +++ b/src/storage/multi_data_source/runtime_utility/common_define.h @@ -5,6 +5,7 @@ #include "lib/oblog/ob_log_module.h" #include "lib/ob_define.h" #include "lib/oblog/ob_log.h" +#include "ob_clock_generator.h" #include "src/share/ob_errno.h" #include "src/share/scn.h" #include "src/share/ob_occam_time_guard.h" diff --git a/src/storage/multi_data_source/runtime_utility/mds_retry_control.h b/src/storage/multi_data_source/runtime_utility/mds_retry_control.h new file mode 100644 index 000000000..6848a412e --- /dev/null +++ b/src/storage/multi_data_source/runtime_utility/mds_retry_control.h @@ -0,0 +1,78 @@ +#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_UTILITY_MDS_RETRY_CONTROL_H +#define SHARE_STORAGE_MULTI_DATA_SOURCE_UTILITY_MDS_RETRY_CONTROL_H + +#include "common_define.h" +#include "mds_lock.h" + +namespace oceanbase +{ +namespace storage +{ +namespace mds +{ + +struct RetryParam { + RetryParam(int64_t lock_timeout_us, int64_t print_interval = 500_ms) : + start_ts_(ObClockGenerator::getClock()), + last_print_ts_(0), + timeout_ts_(start_ts_ + lock_timeout_us), + retry_cnt_(0), + print_interval_(print_interval) {} + RetryParam &operator++() { ++retry_cnt_; return *this; } + bool check_reach_print_interval_and_update() const { + int64_t current_ts = ObClockGenerator::getClock(); + bool ret = ObClockGenerator::getClock() > (last_print_ts_ + print_interval_); + if (ret) { + last_print_ts_ = current_ts; + } + return ret; + } + bool check_timeout() const { return ObClockGenerator::getClock() > timeout_ts_; } + TO_STRING_KV(KTIME_(start_ts), KTIME_(last_print_ts), KTIME_(timeout_ts), K_(retry_cnt), K_(print_interval)); + int64_t start_ts_; + mutable int64_t last_print_ts_; + int64_t timeout_ts_; + int64_t retry_cnt_; + int64_t print_interval_; +}; + +enum class LockMode { + READ = 1, + WRITE = 2, +}; + +template +struct LockModeGuard; + +template <> +struct LockModeGuard { using type = MdsRLockGuard; }; + +template <> +struct LockModeGuard { using type = MdsWLockGuard; }; + +template +int retry_release_lock_with_op_until_timeout(const MdsLock &lock, + RetryParam &retry_param, + OP &&op) { + int ret = OB_SUCCESS; + MDS_TG(10_ms); + do { + int64_t current_ts = ObClockGenerator::getClock();; + typename LockModeGuard::type lg(lock); + if (MDS_FAIL(op())) { + if (OB_LIKELY(OB_EAGAIN == ret)) { + if (retry_param.check_timeout()) { + ret = OB_TIMEOUT; + } + } + } + } while (OB_EAGAIN == ret && ({ ob_usleep(10_ms); ++retry_param; true; })); + return ret; + #undef PRINT_WRAPPER +}; + +} +} +} + +#endif \ No newline at end of file diff --git a/unittest/storage/multi_data_source/test_mds_table.cpp b/unittest/storage/multi_data_source/test_mds_table.cpp index 603a36f1f..0595cd029 100644 --- a/unittest/storage/multi_data_source/test_mds_table.cpp +++ b/unittest/storage/multi_data_source/test_mds_table.cpp @@ -627,6 +627,20 @@ TEST_F(TestMdsTable, test_rw_lock_wwlock) { t2.join(); } +TEST_F(TestMdsTable, test_rvalue_set) { + ExampleUserData2 data; + ASSERT_EQ(OB_SUCCESS, data.assign(MdsAllocator::get_instance(), "123")); + MdsCtx ctx(mds::MdsWriter(ObTransID(100))); + ObString str = data.data_; + ASSERT_EQ(OB_SUCCESS, mds_table_.set(std::move(data), ctx, 0)); + bool is_committed = false; + ASSERT_EQ(OB_SUCCESS, mds_table_.get_latest([&data, str](const ExampleUserData2 &read_data) -> int { + MDS_ASSERT(data.data_ == nullptr); + MDS_ASSERT(str.ptr() == read_data.data_.ptr()); + return OB_SUCCESS; + }, is_committed)); +} + } }