[MDS] avoid hold mds_unit's lock when lock_for_read

This commit is contained in:
fengdeyiji 2023-09-21 04:10:19 +00:00 committed by ob-robot
parent cf0ffafe1b
commit d23c31fd93
9 changed files with 526 additions and 242 deletions

View File

@ -18,6 +18,7 @@ inline int copy_or_assign(const T &src,
T &dst,
ObIAllocator &alloc = DummyAllocator::get_instance())
{
OCCAM_LOG(DEBUG, "call data assign method with allocator");
return dst.assign(alloc, src);
}
@ -29,6 +30,7 @@ inline int copy_or_assign(const T &src,
ObIAllocator &alloc = DummyAllocator::get_instance())
{
UNUSED(alloc);
OCCAM_LOG(DEBUG, "call data assign method");
return dst.assign(src);
}
@ -42,6 +44,7 @@ inline int copy_or_assign(const T &src,
ObIAllocator &alloc = DummyAllocator::get_instance())
{
UNUSED(alloc);
OCCAM_LOG(DEBUG, "call data assign operator");
dst = src;
return common::OB_SUCCESS;
}
@ -57,6 +60,7 @@ inline int copy_or_assign(const T &src,
ObIAllocator &alloc = DummyAllocator::get_instance())
{
UNUSED(alloc);
OCCAM_LOG(DEBUG, "call data copy construction");
new (&dst) T (src);
return common::OB_SUCCESS;
}
@ -86,20 +90,21 @@ inline int copy_or_assign(const T &src,
// user will benefit from move sematic if dst is an rvalue and support move sematic
// 1.1 try standard move assignment
template <typename T,
typename std::enable_if<std::is_rvalue_reference<T>::value &&
typename std::enable_if<std::is_rvalue_reference<T &&>::value &&
std::is_move_assignable<T>::value, bool>::type = true>
inline int move_or_copy_or_assign(T &&src,
T &dst,
ObIAllocator &alloc = DummyAllocator::get_instance())
{
UNUSED(alloc);
OCCAM_LOG(DEBUG, "call data move assign operator");
dst = std::move(src);
return common::OB_SUCCESS;
}
// 1.2 try move construction
template <typename T,
typename std::enable_if<std::is_rvalue_reference<T>::value &&
typename std::enable_if<std::is_rvalue_reference<T &&>::value &&
!std::is_move_assignable<T>::value &&
std::is_move_constructible<T>::value, bool>::type = true>
inline int move_or_copy_or_assign(T &&src,
@ -107,6 +112,7 @@ inline int move_or_copy_or_assign(T &&src,
ObIAllocator &alloc = DummyAllocator::get_instance())
{
UNUSED(alloc);
OCCAM_LOG(DEBUG, "call data copy move construction");
new (&dst) T (std::move(src));
return common::OB_SUCCESS;
}

View File

@ -27,6 +27,7 @@
#include "storage/multi_data_source/adapter_define/mds_dump_node.h"
#include <utility>
#include "deps/oblib/src/common/meta_programming/ob_meta_copy.h"
#include "runtime_utility/mds_retry_control.h"
namespace oceanbase
{
@ -76,7 +77,7 @@ public:
template <typename DATA>
int set(DATA &&data,
MdsCtx &ctx,
const int64_t lock_timeout_us,
const RetryParam &retry_param,
const bool is_for_remove = false);
template <typename DATA>
int replay(DATA &&data,
@ -87,7 +88,7 @@ public:
int get_snapshot(READ_OP &&read_operation,
const share::SCN snapshot,
const int64_t read_seq,
const int64_t timeout_us) const;
const RetryParam &retry_param) const;
template <typename READ_OP>
int get_latest(READ_OP &&read_operation, const int64_t read_seq) const;
template <typename READ_OP>
@ -95,7 +96,7 @@ public:
const MdsWriter &writer,
const share::SCN snapshot,
const int64_t read_seq,
const int64_t timeout_us) const;
const RetryParam &retry_param) const;
template <typename DUMP_OP>
int scan_dump_node_from_tail_to_head(DUMP_OP &&op,
const uint8_t mds_table_id,
@ -116,10 +117,6 @@ public:
int for_each_node_(OPERATION &&op) const;
template <typename READ_OP, typename SPECIFIED_GET_LOGIC>
int get_with_read_wrapper_(READ_OP &&read_operation, SPECIFIED_GET_LOGIC &&specified_logic) const;
template <typename WRITE_OPERATION>
int write_operation_wrapper_(WRITE_OPERATION &&write_op,
MdsCtx &ctx,
const int64_t lock_timeout_us);
template <typename DATA>
int construct_insert_record_user_mds_node_(MdsRowBase<K, V> *p_mds_row,
DATA &&data,
@ -128,7 +125,7 @@ public:
MdsCtx &ctx);
int check_node_snapshot_(const UserMdsNode<K, V> &node,
const share::SCN snapshot,
const int64_t timeout_ts,
const RetryParam &retry_param,
bool &can_read) const;
template <int N>
void report_event_(const char (&event_str)[N],

View File

@ -234,90 +234,50 @@ int MdsRow<K, V>::construct_insert_record_user_mds_node_(MdsRowBase<K, V> *mds_r
}
template <typename K, typename V>
template <typename WRITE_OPERATION>
int MdsRow<K, V>::write_operation_wrapper_(WRITE_OPERATION &&write_op,
MdsCtx &ctx,
const int64_t lock_timeout_us)
template <typename DATA>
int MdsRow<K, V>::set(DATA &&data,
MdsCtx &ctx,
const RetryParam &retry_param,
const bool is_for_remove)
{
#define PRINT_WRAPPER KR(ret), K(typeid(V).name()), K(ctx), KTIME(timeout_ts),\
K(lock_timeout_us), K(try_lock_times), K(*this)
#define PRINT_WRAPPER KR(ret), K(typeid(V).name()), K(ctx), K(retry_param), K(*this)
int ret = OB_SUCCESS;
bool write_conflict = true;
int try_lock_times = 0;
int64_t timeout_ts = ObClockGenerator::getRealClock() + lock_timeout_us;
MDS_TG(std::max(lock_timeout_us, (int64_t)5_ms));
MDS_TG(5_ms);
if (!ctx.can_write()) {
ret = OB_INVALID_ARGUMENT;
MDS_LOG_SET(WARN, "invalid mds ctx");
} else {
do {
MDS_TG(5_ms);
MdsWLockGuard lg(MdsRowBase<K, V>::lock_);// lock row
if (sorted_list_.empty()) {// for now, all writting data are in memory
write_conflict = false;
} else {
if (OB_LIKELY(sorted_list_.get_head().is_decided_())) {
write_conflict = false;
} else if (sorted_list_.get_head().get_writer_() == ctx.get_writer()) {
write_conflict = false;
}
}
UserMdsNode<K, V> *new_node = nullptr;
if (OB_UNLIKELY(write_conflict)) {
ret = OB_TRY_LOCK_ROW_CONFLICT;
} else if (MDS_FAIL(write_op())) {
MDS_LOG_SET(WARN, "execute write op failed");
} else {
MDS_LOG_SET(TRACE, "MdsRow set node success");
}
} while (++try_lock_times &&
OB_TRY_LOCK_ROW_CONFLICT == ret &&// keep trying but release latch
ObClockGenerator::getRealClock() < timeout_ts &&// check timeout
({ob_usleep(50_ms); true;}));// if not, wait and try again
}
if (OB_FAIL(ret)) {
if (lock_timeout_us != 0 && OB_TRY_LOCK_ROW_CONFLICT == ret) {
ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
MDS_LOG_SET(WARN, "write data conflict cause reach lock timeout");
MdsWLockGuard lg(MdsRowBase<K, V>::lock_);// lock row
if (sorted_list_.empty()) {// for now, all writting data are in memory
write_conflict = false;
} else {
MDS_LOG_SET(WARN, "write failed");
if (OB_LIKELY(sorted_list_.get_head().is_decided_())) {
write_conflict = false;
} else if (sorted_list_.get_head().get_writer_() == ctx.get_writer()) {
write_conflict = false;
}
}
UserMdsNode<K, V> *new_node = nullptr;
if (OB_UNLIKELY(write_conflict)) {
ret = OB_EAGAIN;
if (retry_param.check_reach_print_interval_and_update()) {
MDS_LOG_SET(INFO, "mds row write conflict");
}
} else if (MDS_FAIL(construct_insert_record_user_mds_node_(this,
std::forward<DATA>(data),
is_for_remove ? MdsNodeType::DELETE : MdsNodeType::SET,
share::SCN::max_scn(),
ctx))) {
MDS_LOG_SET(WARN, "execute write op failed");
} else {
MDS_LOG_SET(DEBUG, "MdsRow set node success");
}
} else {
MDS_LOG_SET(TRACE, "MdsRow write node success");
}
return ret;
#undef PRINT_WRAPPER
}
template <typename K, typename V>
template <typename DATA>
int MdsRow<K, V>::set(DATA &&data,
MdsCtx &ctx,
const int64_t lock_timeout_us,
const bool is_for_remove)
{
ForwardWrapper forward_wrapper(std::forward<DATA>(data));
return write_operation_wrapper_(
[&forward_wrapper, &ctx, is_for_remove, this]() -> int {
int ret = OB_SUCCESS;
if (forward_wrapper.is_constructed_from_lvalue_) {
ret = construct_insert_record_user_mds_node_(this,
forward_wrapper.data_,
is_for_remove ? MdsNodeType::DELETE : MdsNodeType::SET,
share::SCN::max_scn(),
ctx);
} else {
ret = construct_insert_record_user_mds_node_(this,
std::move(forward_wrapper.data_),
is_for_remove ? MdsNodeType::DELETE : MdsNodeType::SET,
share::SCN::max_scn(),
ctx);
}
return ret;
},
ctx, lock_timeout_us);
}
template <typename K, typename V>
template <typename DATA>
int MdsRow<K, V>::replay(DATA &&data,
@ -352,12 +312,10 @@ int MdsRow<K, V>::replay(DATA &&data,
}
// this function must be called from for_each_lock_ method with lock protection.
// but this function may hung until node decided, so when it sleep, must release lock,
// when it awaken, must add lock again.
template <typename K, typename V>
int MdsRow<K, V>::check_node_snapshot_(const UserMdsNode<K, V> &node,
const share::SCN snapshot,
const int64_t timeout_ts,
const RetryParam &retry_param,
bool &can_read) const
{
#define PRINT_WRAPPER KR(ret), KPC(node), K(typeid(READ_OP).name()), K(retry_times), K(can_read),\
@ -369,11 +327,9 @@ int MdsRow<K, V>::check_node_snapshot_(const UserMdsNode<K, V> &node,
snapshot_converted = share::SCN::scn_dec(snapshot_converted);
}
if (!node.is_decided_() && node.get_prepare_version_() <= snapshot_converted) {
if (ObClockGenerator::getRealClock() > timeout_ts) {
ret = OB_ERR_SHARED_LOCK_CONFLICT;
MDS_LOG(WARN, "timeout when wait node decided", K(node), K(node.get_prepare_version_()), K(snapshot_converted), K(snapshot));
} else {
ret = OB_EAGAIN;
ret = OB_EAGAIN;
if (retry_param.check_reach_print_interval_and_update()) {
MDS_LOG(WARN, "mds row lock_for_read conflict");
}
} else if (node.is_committed_() && node.get_commit_version_() <= snapshot_converted) {
can_read = true;
@ -428,28 +384,24 @@ template <typename READ_OP>
int MdsRow<K, V>::get_snapshot(READ_OP &&read_operation,
const share::SCN snapshot,
const int64_t read_seq,
const int64_t timeout_us) const
const RetryParam &retry_param) const
{
#define PRINT_WRAPPER KR(ret), K(snapshot), K(read_seq), K(timeout_us), K(timeout_ts), K(*this)
#define PRINT_WRAPPER KR(ret), K(snapshot), K(read_seq), K(retry_param), K(*this)
int ret = OB_SUCCESS;
UNUSED(read_seq);
int64_t timeout_ts = ObClockGenerator::getRealClock() + timeout_us;
MDS_TG(5_ms);
do {
MdsRLockGuard lg(MdsRowBase<K, V>::lock_);
if (MDS_FAIL(get_with_read_wrapper_(read_operation,
[&](const UserMdsNode<K, V> &node, bool &can_read) -> int {
return check_node_snapshot_(node, snapshot, timeout_ts, can_read);
}
))) {
if (OB_EAGAIN != ret && OB_SNAPSHOT_DISCARDED != ret) {
MDS_LOG_GET(WARN, "MdsRow get_snapshot failed");
}
} else {
MDS_LOG_GET(TRACE, "MdsRow get_snapshot success");
MdsRLockGuard lg(MdsRowBase<K, V>::lock_);
if (MDS_FAIL(get_with_read_wrapper_(read_operation,
[this, snapshot, &retry_param](const UserMdsNode<K, V> &node, bool &can_read) -> int {
return check_node_snapshot_(node, snapshot, retry_param, can_read);
}
} while (OB_EAGAIN == ret &&
({ob_usleep(10_ms); true;}));
))) {
if (OB_EAGAIN != ret && OB_SNAPSHOT_DISCARDED != ret) {
MDS_LOG_GET(WARN, "MdsRow get_snapshot failed");
}
} else {
MDS_LOG_GET(DEBUG, "MdsRow get_snapshot success");
}
return ret;
#undef PRINT_WRAPPER
}
@ -485,35 +437,30 @@ int MdsRow<K, V>::get_by_writer(READ_OP &&read_operation,
const MdsWriter &writer,
const share::SCN snapshot,
const int64_t read_seq,
const int64_t timeout_us) const
const RetryParam &retry_param) const
{
#define PRINT_WRAPPER KR(ret), K(writer), K(snapshot), K(read_seq), K(timeout_us),\
KTIME(timeout_ts), K(*this)
#define PRINT_WRAPPER KR(ret), K(writer), K(snapshot), K(read_seq), K(retry_param), K(*this)
int ret = OB_SUCCESS;
UNUSED(read_seq);
MDS_TG(5_ms);
int64_t timeout_ts = ObClockGenerator::getRealClock() + timeout_us;
do {
MdsRLockGuard lg(MdsRowBase<K, V>::lock_);
if (MDS_FAIL(get_with_read_wrapper_(read_operation,
[&](const UserMdsNode<K, V> &node, bool &can_read) -> int {
int ret = OB_SUCCESS;
if (node.get_writer_() == writer) {
can_read = true;
} else {
ret = check_node_snapshot_(node, snapshot, timeout_ts, can_read);
}
return ret;
MdsRLockGuard lg(MdsRowBase<K, V>::lock_);
if (MDS_FAIL(get_with_read_wrapper_(read_operation,
[this, snapshot, &writer, &retry_param](const UserMdsNode<K, V> &node, bool &can_read) -> int {
int ret = OB_SUCCESS;
if (node.get_writer_() == writer) {
can_read = true;
} else {
ret = check_node_snapshot_(node, snapshot, retry_param, can_read);
}
))) {
if (OB_UNLIKELY(OB_EAGAIN != ret && OB_SNAPSHOT_DISCARDED != ret)) {
MDS_LOG_GET(WARN, "MdsRow get_by_writer failed");
}
} else {
MDS_LOG_GET(TRACE, "MdsRow get_by_writer success");
return ret;
}
} while (OB_EAGAIN == ret &&
({ob_usleep(10_ms); true;}));
))) {
if (OB_UNLIKELY(OB_EAGAIN != ret && OB_SNAPSHOT_DISCARDED != ret)) {
MDS_LOG_GET(WARN, "MdsRow get_by_writer failed");
}
} else {
MDS_LOG_GET(DEBUG, "MdsRow get_by_writer success");
}
return ret;
#undef PRINT_WRAPPER
}

View File

@ -164,7 +164,7 @@ int MdsTableHandle::set(T &&data, MdsCtx &ctx, const int64_t lock_timeout_us)
if (OB_FAIL(p_mds_table_base_->set(unit_id,
(void*)&dummy_key,
(void*)&data,
std::is_rvalue_reference<T>::value,
std::is_rvalue_reference<decltype(data)>::value,
ctx,
lock_timeout_us))) {
MDS_LOG(WARN, "fail to call set", KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us));

View File

@ -48,6 +48,89 @@ struct KvPair : public ListNode<KvPair<K, V>>
template <typename K, typename V>
class MdsUnit final : public MdsUnitBase<K, V>
{
struct SetOP {
SetOP() = delete;
SetOP(const SetOP &) = delete;
SetOP &operator=(const SetOP &) = delete;
SetOP(MdsUnit<K, V> *p_this,
bool is_lvalue,
MdsTableBase *p_mds_table,
const K &key,
V &value,
MdsCtx &ctx,
bool is_for_remove,
RetryParam &retry_param) :
this_(p_this),
is_lvalue_(is_lvalue),
p_mds_table_(p_mds_table),
key_(key),
value_(value),
ctx_(ctx),
is_for_remove_(is_for_remove),
retry_param_(retry_param) {}
int operator()();
MdsUnit<K, V> *this_;
bool is_lvalue_;
MdsTableBase *p_mds_table_;
const K &key_;
V &value_;
MdsCtx &ctx_;
bool is_for_remove_;
RetryParam &retry_param_;
};
template <typename OP>
struct GetSnapShotOp {
GetSnapShotOp() = delete;
GetSnapShotOp(const GetSnapShotOp &) = delete;
GetSnapShotOp &operator=(const GetSnapShotOp &) = delete;
GetSnapShotOp(const MdsUnit<K, V> *p_this,
const K &key,
OP &read_op,
share::SCN snapshot,
int64_t read_seq,
RetryParam &retry_param)
: this_(p_this),
key_(key),
read_op_(read_op),
snapshot_(snapshot),
read_seq_(read_seq),
retry_param_(retry_param) {}
int operator()();
const MdsUnit<K, V> *this_;
const K &key_;
OP &read_op_;
share::SCN snapshot_;
int64_t read_seq_;
RetryParam &retry_param_;
};
template <typename OP>
struct GetByWriterOp {
GetByWriterOp() = delete;
GetByWriterOp(const GetByWriterOp &) = delete;
GetByWriterOp &operator=(const GetByWriterOp &) = delete;
GetByWriterOp(const MdsUnit<K, V> *p_this,
const K &key,
OP &read_op,
const MdsWriter &writer,
share::SCN snapshot,
int64_t read_seq,
RetryParam &retry_param)
: this_(p_this),
key_(key),
read_op_(read_op),
writer_(writer),
snapshot_(snapshot),
read_seq_(read_seq),
retry_param_(retry_param) {}
int operator()();
const MdsUnit<K, V> *this_;
const K &key_;
OP &read_op_;
const MdsWriter &writer_;
share::SCN snapshot_;
int64_t read_seq_;
RetryParam &retry_param_;
};
public:
typedef K key_type;
typedef V value_type;
@ -124,8 +207,9 @@ public:
const char *file = __builtin_FILE(),
const uint32_t line = __builtin_LINE(),
const char *function_name = __builtin_FUNCTION()) const;
const Row<K, V> *get_row_from_list_(const K &key) const;
int insert_empty_kv_to_list_(const K &key, Row<K, V> *&row, MdsTableBase *p_mds_table);
KvPair<K, Row<K, V>> *get_row_from_list_(const K &key) const;
int insert_empty_kv_to_list_(const K &key, KvPair<K, Row<K, V>> *&p_kv, MdsTableBase *p_mds_table);
void erase_kv_from_list_if_empty_(KvPair<K, Row<K, V>> *p_kv);
SortedList<KvPair<K, Row<K, V>>, SORT_TYPE::ASC> multi_row_list_;
mutable MdsLock lock_;
};
@ -134,6 +218,74 @@ public:
template <typename V>
class MdsUnit<DummyKey, V> final : public MdsUnitBase<DummyKey, V>
{
struct SetOP {
SetOP() = delete;
SetOP(const SetOP &) = delete;
SetOP &operator=(const SetOP &) = delete;
SetOP(MdsUnit<DummyKey, V> *p_this,
bool is_lvalue,
V &value,
MdsCtx &ctx,
RetryParam &retry_param) :
this_(p_this),
is_lvalue_(is_lvalue),
value_(value),
ctx_(ctx),
retry_param_(retry_param) {}
int operator()();
MdsUnit<DummyKey, V> *this_;
bool is_lvalue_;
V &value_;
MdsCtx &ctx_;
RetryParam &retry_param_;
};
template <typename OP>
struct GetSnapShotOp {
GetSnapShotOp() = delete;
GetSnapShotOp(const GetSnapShotOp &) = delete;
GetSnapShotOp &operator=(const GetSnapShotOp &) = delete;
GetSnapShotOp(const MdsUnit<DummyKey, V> *p_this,
OP &read_op,
share::SCN snapshot,
int64_t read_seq,
RetryParam &retry_param)
: this_(p_this),
read_op_(read_op),
snapshot_(snapshot),
read_seq_(read_seq),
retry_param_(retry_param) {}
int operator()();
const MdsUnit<DummyKey, V> *this_;
OP &read_op_;
share::SCN snapshot_;
int64_t read_seq_;
RetryParam &retry_param_;
};
template <typename OP>
struct GetByWriterOp {
GetByWriterOp() = delete;
GetByWriterOp(const GetByWriterOp &) = delete;
GetByWriterOp &operator=(const GetByWriterOp &) = delete;
GetByWriterOp(const MdsUnit<DummyKey, V> *p_this,
OP &read_op,
const MdsWriter &writer,
share::SCN snapshot,
int64_t read_seq,
RetryParam &retry_param)
: this_(p_this),
read_op_(read_op),
writer_(writer),
snapshot_(snapshot),
read_seq_(read_seq),
retry_param_(retry_param) {}
int operator()();
const MdsUnit<DummyKey, V> *this_;
OP &read_op_;
const MdsWriter &writer_;
share::SCN snapshot_;
int64_t read_seq_;
RetryParam &retry_param_;
};
public:
typedef DummyKey key_type;
typedef V value_type;

View File

@ -13,8 +13,12 @@
#define STORAGE_MULTI_DATA_SOURCE_MDS_UNIT_IPP
#include "lib/list/ob_dlist.h"
#include "lib/ob_errno.h"
#include "ob_clock_generator.h"
#include "observer/virtual_table/ob_mds_event_buffer.h"
#include "share/ob_errno.h"
#include "runtime_utility/mds_retry_control.h"
#include <type_traits>
#ifndef STORAGE_MULTI_DATA_SOURCE_MDS_UNIT_H_IPP
#define STORAGE_MULTI_DATA_SOURCE_MDS_UNIT_H_IPP
#include "mds_unit.h"
@ -188,7 +192,7 @@ int MdsUnit<K, V>::for_each_row(FowEachRowAction action_type, OP &&op)// node ma
MDS_TG(1_ms);
const K *p_k = &kv_row.k_;
const Row<K, V> &row = kv_row.v_;
if (MDS_FAIL(op(row))) {
if (MDS_FAIL(op(row))) {// node maybe recycled inside op
MDS_LOG_SCAN(WARN, "fail to scan row", KPC(p_k));
}
// CAUTIONS: not every path scan need recycle empty row, or maybe result some problem unexpected, for example:
@ -197,11 +201,7 @@ int MdsUnit<K, V>::for_each_row(FowEachRowAction action_type, OP &&op)// node ma
// but destroy mds_row will add row's lock inner destruction, which will resulting deadlock in same thread.
// so only operations logic behaves like gc should recycle empty row.
if (FowEachRowAction::RECYCLE == action_type || FowEachRowAction::RESET == action_type) {
if (row.sorted_list_.empty()) {// if this row is recycled, just delete it
KvPair<K, Row<K, V>> *p_kv = &const_cast<KvPair<K, Row<K, V>> &>(kv_row);
multi_row_list_.del(p_kv);
MdsFactory::destroy(p_kv);
}
erase_kv_from_list_if_empty_(&const_cast<KvPair<K, Row<K, V>> &>(kv_row));
}
return OB_SUCCESS != ret;// keep scanning until meet failure
});
@ -210,27 +210,26 @@ int MdsUnit<K, V>::for_each_row(FowEachRowAction action_type, OP &&op)// node ma
}
template <typename K, typename V>
const Row<K, V> *MdsUnit<K, V>::get_row_from_list_(const K &key) const
KvPair<K, Row<K, V>> *MdsUnit<K, V>::get_row_from_list_(const K &key) const
{
const Row<K, V> *row = nullptr;
KvPair<K, Row<K, V>> *p_kv = nullptr;
multi_row_list_.for_each_node_from_head_to_tail_until_true(
[&row, &key](const KvPair<K, Row<K, V>> &kv_pair) {
[&p_kv, &key](const KvPair<K, Row<K, V>> &kv_pair) {
if (kv_pair.k_ == key) {
row = &const_cast<Row<K, V> &>(kv_pair.v_);
p_kv = &const_cast<KvPair<K, Row<K, V>> &>(kv_pair);
}
return nullptr != row;
return nullptr != p_kv;
}
);
return row;
return p_kv;
}
template <typename K, typename V>
int MdsUnit<K, V>::insert_empty_kv_to_list_(const K &key, Row<K, V> *&row, MdsTableBase *p_mds_table)
int MdsUnit<K, V>::insert_empty_kv_to_list_(const K &key, KvPair<K, Row<K, V>> *&p_kv, MdsTableBase *p_mds_table)
{
#define PRINT_WRAPPER KR(ret), K(key), K(typeid(K).name()), K(typeid(V).name())
int ret = OB_SUCCESS;
MDS_TG(1_ms);
KvPair<K, Row<K, V>> *p_kv;
set_mds_mem_check_thread_local_info(p_mds_table->ls_id_,
p_mds_table->tablet_id_,
typeid(p_kv).name());
@ -242,14 +241,57 @@ int MdsUnit<K, V>::insert_empty_kv_to_list_(const K &key, Row<K, V> *&row, MdsTa
} else {
p_kv->v_.key_ = &(p_kv->k_);
multi_row_list_.insert(p_kv);
row = &(p_kv->v_);
report_event_("CREATE_KEY", key);
}
reset_mds_mem_check_thread_local_info();
return ret;
#undef PRINT_WRAPPER
}
template <typename K, typename V>
void MdsUnit<K, V>::erase_kv_from_list_if_empty_(KvPair<K, Row<K, V>> *p_kv)
{
int ret = OB_SUCCESS;
Row<K, V> &mds_row = p_kv->v_;
if (mds_row.sorted_list_.empty()) {
if (!multi_row_list_.check_node_exist(p_kv)) {
MDS_LOG(ERROR, "mds row is not record in list", KPC(p_kv));
} else {
multi_row_list_.del(p_kv);
MdsFactory::destroy(p_kv);
}
}
return;
}
template <typename K, typename V>
int MdsUnit<K, V>::SetOP::operator()() {
#define PRINT_WRAPPER KR(ret), K_(key), K_(value), K_(ctx), K_(retry_param), K_(is_for_remove)
int ret = OB_SUCCESS;
MDS_TG(10_ms);
KvPair<K, Row<K, V>> *p_kv = this_->get_row_from_list_(key_);
if (OB_ISNULL(p_kv)) {
if (MDS_FAIL(this_->insert_empty_kv_to_list_(key_, p_kv, p_mds_table_))) {
MDS_LOG_SET(WARN, "insert new key to unit failed");
} else {
MDS_LOG_SET(INFO, "insert new key to unit");
}
}
if (OB_SUCC(ret)) {
if (is_lvalue_) {
ret = p_kv->v_.set(value_, ctx_, retry_param_, is_for_remove_);
} else {
ret = p_kv->v_.set(std::move(value_), ctx_, retry_param_, is_for_remove_);
}
if (MDS_FAIL(ret)) {
this_->erase_kv_from_list_if_empty_(p_kv);// rollback
MDS_LOG_SET(WARN, "MdsUnit set value failed");
} else {
MDS_LOG_SET(TRACE, "MdsUnit set value success");
}
}
return ret;
#undef PRINT_WRAPPER
}
template <typename K, typename V>
template <typename Value>
int MdsUnit<K, V>::set(MdsTableBase *p_mds_table,
@ -259,31 +301,17 @@ int MdsUnit<K, V>::set(MdsTableBase *p_mds_table,
const int64_t lock_timeout_us,
const bool is_for_remove)
{
#define PRINT_WRAPPER KR(ret), K(key), K(value), K(ctx), K(lock_timeout_us), K(is_for_remove)
int ret = OB_SUCCESS;
MDS_TG(10_ms);
MdsWLockGuard lg(lock_);
CLICK();
Row<K, V> *mds_row = const_cast<Row<K, V> *>(get_row_from_list_(key));
if (OB_ISNULL(mds_row)) {
if (MDS_FAIL(insert_empty_kv_to_list_(key, mds_row, p_mds_table))) {
MDS_LOG_SET(WARN, "insert new key to unit failed");
} else {
MDS_LOG_SET(INFO, "insert new key to unit");
}
}
if (OB_SUCC(ret)) {
if (MDS_FAIL(mds_row->set(std::forward<Value>(value),
ctx,
lock_timeout_us,
is_for_remove))) {
MDS_LOG_SET(WARN, "MdsUnit set value failed");
} else {
MDS_LOG_SET(TRACE, "MdsUnit set value success");
bool is_lvalue = std::is_lvalue_reference<decltype(value)>::value;
RetryParam retry_param(lock_timeout_us);
SetOP op(this, is_lvalue, p_mds_table, key, value, ctx, is_for_remove, retry_param);
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::WRITE>(lock_, retry_param, op))) {
if (OB_TIMEOUT == ret) {
ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
}
}
return ret;
#undef PRINT_WRAPPER
}
template <typename K, typename V>
@ -300,19 +328,20 @@ int MdsUnit<K, V>::replay(MdsTableBase *p_mds_table,
MDS_TG(10_ms);
MdsWLockGuard lg(lock_);
CLICK();
Row<K, V> *mds_row = const_cast<Row<K, V> *>(get_row_from_list_(key));
if (OB_ISNULL(mds_row)) {
if (MDS_FAIL(insert_empty_kv_to_list_(key, mds_row, p_mds_table))) {
KvPair<K, Row<K, V>> *p_kv = get_row_from_list_(key);
if (OB_ISNULL(p_kv)) {
if (MDS_FAIL(insert_empty_kv_to_list_(key, p_kv, p_mds_table))) {
MDS_LOG_SET(WARN, "insert new key to unit failed");
} else {
MDS_LOG_SET(INFO, "insert new key to unit");
}
}
if (OB_SUCC(ret)) {
if (MDS_FAIL(mds_row->replay(std::forward<Value>(value),
if (MDS_FAIL(p_kv->v_.replay(std::forward<Value>(value),
ctx,
scn,
is_for_remove))) {
erase_kv_from_list_if_empty_(p_kv);// rollback
MDS_LOG_SET(WARN, "MdsUnit set value failed");
} else {
MDS_LOG_SET(TRACE, "MdsUnit set value success");
@ -322,6 +351,27 @@ int MdsUnit<K, V>::replay(MdsTableBase *p_mds_table,
#undef PRINT_WRAPPER
}
template <typename K, typename V>
template <typename OP>
int MdsUnit<K, V>::GetSnapShotOp<OP>::operator()() {
#define PRINT_WRAPPER KR(ret), K_(key), K(typeid(OP).name()), K_(snapshot), K_(read_seq), K_(retry_param)
int ret = OB_SUCCESS;
MDS_TG(10_ms);
KvPair<K, Row<K, V>> *p_kv = this_->get_row_from_list_(key_);
if (OB_ISNULL(p_kv)) {
ret = OB_ENTRY_NOT_EXIST;
MDS_LOG_GET(WARN, "row key not exist");
} else if (MDS_FAIL(p_kv->v_.get_snapshot(read_op_,
snapshot_,
read_seq_,
retry_param_))) {
if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret && OB_EAGAIN != ret)) {
MDS_LOG_GET(WARN, "MdsUnit get_snapshot failed");
}
}
return ret;
#undef PRINT_WRAPPER
}
template <typename K, typename V>
template <typename OP>
int MdsUnit<K, V>::get_snapshot(const K &key,
@ -330,30 +380,42 @@ int MdsUnit<K, V>::get_snapshot(const K &key,
const int64_t read_seq,
const int64_t lock_timeout_us) const
{
#define PRINT_WRAPPER KR(ret), K(key), K(typeid(OP).name()), K(snapshot), K(read_seq),\
K(lock_timeout_us)
int ret = OB_SUCCESS;
MDS_TG(10_ms);
MdsRLockGuard lg(lock_);
CLICK();
const Row<K, V> *mds_row = get_row_from_list_(key);
if (OB_ISNULL(mds_row)) {
RetryParam retry_param(lock_timeout_us);
GetSnapShotOp<typename std::remove_reference<OP>::type> op(this, key, read_op, snapshot, read_seq, retry_param);
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::READ>(lock_, retry_param, op))) {
if (OB_TIMEOUT == ret) {
ret = OB_ERR_SHARED_LOCK_CONFLICT;
}
}
return ret;
}
template <typename K, typename V>
template <typename OP>
int MdsUnit<K, V>::GetByWriterOp<OP>::operator()() {
#define PRINT_WRAPPER KR(ret), K_(key), K(typeid(OP).name()), K_(writer), K_(snapshot), K_(read_seq), K_(retry_param)
int ret = OB_SUCCESS;
MDS_TG(10_ms);
KvPair<K, Row<K, V>> *p_kv = this_->get_row_from_list_(key_);
if (OB_ISNULL(p_kv)) {
ret = OB_ENTRY_NOT_EXIST;
MDS_LOG_GET(WARN, "row key not exist");
} else if (MDS_FAIL(mds_row->get_snapshot(std::forward<OP>(read_op),
snapshot,
read_seq,
lock_timeout_us))) {
} else if (MDS_FAIL(p_kv->v_.get_by_writer(read_op_,
writer_,
snapshot_,
read_seq_,
retry_param_))) {
if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) {
MDS_LOG_GET(WARN, "MdsUnit get_snapshot failed");
MDS_LOG_GET(WARN, "MdsUnit get_by_writer failed");
}
} else {
MDS_LOG_GET(TRACE, "MdsUnit get_snapshot success");
MDS_LOG_GET(TRACE, "MdsUnit get_by_writer success");
}
return ret;
#undef PRINT_WRAPPER
}
template <typename K, typename V>
template <typename OP>
int MdsUnit<K, V>::get_by_writer(const K &key,
@ -363,29 +425,16 @@ int MdsUnit<K, V>::get_by_writer(const K &key,
const int64_t read_seq,
const int64_t lock_timeout_us) const
{
#define PRINT_WRAPPER KR(ret), K(key), K(typeid(OP).name()), K(writer), K(snapshot), K(read_seq),\
K(lock_timeout_us)
int ret = OB_SUCCESS;
MDS_TG(10_ms);
MdsRLockGuard lg(lock_);
CLICK();
const Row<K, V> *mds_row = get_row_from_list_(key);
if (OB_ISNULL(mds_row)) {
ret = OB_ENTRY_NOT_EXIST;
MDS_LOG_GET(WARN, "row key not exist");
} else if (MDS_FAIL(mds_row->get_by_writer(std::forward<OP>(read_op),
writer,
snapshot,
read_seq,
lock_timeout_us))) {
if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) {
MDS_LOG_GET(WARN, "MdsUnit get_by_writer failed");
RetryParam retry_param(lock_timeout_us);
GetByWriterOp<typename std::remove_reference<OP>::type> op(this, key, read_op, writer, snapshot, read_seq, retry_param);
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::READ>(lock_, retry_param, op))) {
if (OB_TIMEOUT == ret) {
ret = OB_ERR_SHARED_LOCK_CONFLICT;
}
} else {
MDS_LOG_GET(TRACE, "MdsUnit get_by_writer success");
}
return ret;
#undef PRINT_WRAPPER
}
template <typename K, typename V>
@ -397,11 +446,11 @@ int MdsUnit<K, V>::get_latest(const K &key, OP &&read_op, const int64_t read_seq
MDS_TG(10_ms);
MdsRLockGuard lg(lock_);
CLICK();
const Row<K, V> *mds_row = get_row_from_list_(key);
if (OB_ISNULL(mds_row)) {
KvPair<K, Row<K, V>> *p_kv = get_row_from_list_(key);
if (OB_ISNULL(p_kv)) {
ret = OB_ENTRY_NOT_EXIST;
MDS_LOG_GET(WARN, "row key not exist");
} else if (MDS_FAIL(mds_row->get_latest(std::forward<OP>(read_op), read_seq))) {
} else if (MDS_FAIL(p_kv->v_.get_latest(std::forward<OP>(read_op), read_seq))) {
if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) {
MDS_LOG_GET(WARN, "MdsUnit get_latest failed");
}
@ -603,18 +652,16 @@ typename MdsUnit<DummyKey, V>::const_reverse_iterator MdsUnit<DummyKey, V>::cren
{ return const_reverse_iterator(nullptr); }
template <typename V>
template <typename Value>
int MdsUnit<DummyKey, V>::set(MdsTableBase *p_mds_table,
Value &&value,
MdsCtx &ctx,
const int64_t lock_timeout_us)
{
#define PRINT_WRAPPER KR(ret), K(value), K(ctx), K(lock_timeout_us)
int MdsUnit<DummyKey, V>::SetOP::operator()() {
#define PRINT_WRAPPER KR(ret), K_(value), K_(ctx), K_(retry_param), K_(is_lvalue)
int ret = OB_SUCCESS;
MDS_TG(10_ms);
MdsWLockGuard lg(lock_);
CLICK();
if (MDS_FAIL(single_row_.v_.set(std::forward<Value>(value), ctx, lock_timeout_us))) {
if (is_lvalue_) {
ret = this_->single_row_.v_.set(value_, ctx_, retry_param_);
} else {
ret = this_->single_row_.v_.set(std::move(value_), ctx_, retry_param_);
}
if (MDS_FAIL(ret)) {
MDS_LOG_SET(WARN, "MdsUnit set value failed");
} else {
MDS_LOG_SET(TRACE, "MdsUnit set value success");
@ -622,6 +669,25 @@ int MdsUnit<DummyKey, V>::set(MdsTableBase *p_mds_table,
return ret;
#undef PRINT_WRAPPER
}
template <typename V>
template <typename Value>
int MdsUnit<DummyKey, V>::set(MdsTableBase *p_mds_table,
Value &&value,
MdsCtx &ctx,
const int64_t lock_timeout_us)
{
int ret = OB_SUCCESS;
MDS_TG(10_ms);
RetryParam retry_param(lock_timeout_us);
bool is_lvalue = std::is_lvalue_reference<decltype(value)>::value;
SetOP op(this, is_lvalue, value, ctx, retry_param);
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::WRITE>(lock_, retry_param, op))) {
if (OB_TIMEOUT == ret) {
ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT;
}
}
return ret;
}
template <typename V>
template <typename Value>
@ -646,20 +712,14 @@ int MdsUnit<DummyKey, V>::replay(MdsTableBase *p_mds_table,
template <typename V>
template <typename OP>
int MdsUnit<DummyKey, V>::get_snapshot(OP &&read_op,
const share::SCN snapshot,
const int64_t read_seq,
const int64_t lock_timeout_us) const
{
#define PRINT_WRAPPER KR(ret), K(typeid(OP).name()), K(read_seq), K(lock_timeout_us)
int MdsUnit<DummyKey, V>::GetSnapShotOp<OP>::operator()() {
#define PRINT_WRAPPER KR(ret), K(typeid(OP).name()), K_(read_seq), K_(retry_param)
int ret = OB_SUCCESS;
MDS_TG(10_ms);
MdsRLockGuard lg(lock_);
CLICK();
if (MDS_FAIL(single_row_.v_.get_snapshot(std::forward<OP>(read_op),
snapshot,
read_seq,
lock_timeout_us))) {
if (MDS_FAIL(this_->single_row_.v_.get_snapshot(read_op_,
snapshot_,
read_seq_,
retry_param_))) {
if (OB_SNAPSHOT_DISCARDED != ret) {
MDS_LOG_GET(WARN, "MdsUnit get_snapshot failed");
}
@ -669,26 +729,36 @@ int MdsUnit<DummyKey, V>::get_snapshot(OP &&read_op,
return ret;
#undef PRINT_WRAPPER
}
template <typename V>
template <typename OP>
int MdsUnit<DummyKey, V>::get_snapshot(OP &&read_op,
const share::SCN snapshot,
const int64_t read_seq,
const int64_t lock_timeout_us) const
{
int ret = OB_SUCCESS;
MDS_TG(10_ms);
RetryParam retry_param(lock_timeout_us);
GetSnapShotOp<typename std::remove_reference<OP>::type> op(this, read_op, snapshot, read_seq, retry_param);
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::READ>(lock_, retry_param, op))) {
if (OB_TIMEOUT == ret) {
ret = OB_ERR_SHARED_LOCK_CONFLICT;
}
}
return ret;
}
template <typename V>
template <typename OP>
int MdsUnit<DummyKey, V>::get_by_writer(OP &&read_op,
const MdsWriter &writer,
const share::SCN snapshot,
const int64_t read_seq,
const int64_t lock_timeout_us) const
{
#define PRINT_WRAPPER KR(ret), K(typeid(OP).name()), K(writer), K(snapshot), K(read_seq),\
K(lock_timeout_us)
int MdsUnit<DummyKey, V>::GetByWriterOp<OP>::operator()() {
#define PRINT_WRAPPER KR(ret), K(typeid(OP).name()), K_(writer), K_(snapshot), K_(read_seq), K_(retry_param)
int ret = OB_SUCCESS;
MDS_TG(10_ms);
MdsRLockGuard lg(lock_);
CLICK();
if (MDS_FAIL(single_row_.v_.get_by_writer(std::forward<OP>(read_op),
writer,
snapshot,
read_seq,
lock_timeout_us))) {
if (MDS_FAIL(this_->single_row_.v_.get_by_writer(read_op_,
writer_,
snapshot_,
read_seq_,
retry_param_))) {
if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) {
MDS_LOG_GET(WARN, "MdsUnit get_by_writer failed");
}
@ -698,6 +768,25 @@ int MdsUnit<DummyKey, V>::get_by_writer(OP &&read_op,
return ret;
#undef PRINT_WRAPPER
}
template <typename V>
template <typename OP>
int MdsUnit<DummyKey, V>::get_by_writer(OP &&read_op,
const MdsWriter &writer,
const share::SCN snapshot,
const int64_t read_seq,
const int64_t lock_timeout_us) const
{
int ret = OB_SUCCESS;
MDS_TG(10_ms);
RetryParam retry_param(lock_timeout_us);
GetByWriterOp<typename std::remove_reference<OP>::type> op(this, read_op, writer, snapshot, read_seq, retry_param);
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::READ>(lock_, retry_param, op))) {
if (OB_TIMEOUT == ret) {
ret = OB_ERR_SHARED_LOCK_CONFLICT;
}
}
return ret;
}
template <typename V>
template <typename OP>

View File

@ -5,6 +5,7 @@
#include "lib/oblog/ob_log_module.h"
#include "lib/ob_define.h"
#include "lib/oblog/ob_log.h"
#include "ob_clock_generator.h"
#include "src/share/ob_errno.h"
#include "src/share/scn.h"
#include "src/share/ob_occam_time_guard.h"

View File

@ -0,0 +1,78 @@
#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_UTILITY_MDS_RETRY_CONTROL_H
#define SHARE_STORAGE_MULTI_DATA_SOURCE_UTILITY_MDS_RETRY_CONTROL_H
#include "common_define.h"
#include "mds_lock.h"
namespace oceanbase
{
namespace storage
{
namespace mds
{
struct RetryParam {
RetryParam(int64_t lock_timeout_us, int64_t print_interval = 500_ms) :
start_ts_(ObClockGenerator::getClock()),
last_print_ts_(0),
timeout_ts_(start_ts_ + lock_timeout_us),
retry_cnt_(0),
print_interval_(print_interval) {}
RetryParam &operator++() { ++retry_cnt_; return *this; }
bool check_reach_print_interval_and_update() const {
int64_t current_ts = ObClockGenerator::getClock();
bool ret = ObClockGenerator::getClock() > (last_print_ts_ + print_interval_);
if (ret) {
last_print_ts_ = current_ts;
}
return ret;
}
bool check_timeout() const { return ObClockGenerator::getClock() > timeout_ts_; }
TO_STRING_KV(KTIME_(start_ts), KTIME_(last_print_ts), KTIME_(timeout_ts), K_(retry_cnt), K_(print_interval));
int64_t start_ts_;
mutable int64_t last_print_ts_;
int64_t timeout_ts_;
int64_t retry_cnt_;
int64_t print_interval_;
};
enum class LockMode {
READ = 1,
WRITE = 2,
};
template <LockMode MODE>
struct LockModeGuard;
template <>
struct LockModeGuard<LockMode::READ> { using type = MdsRLockGuard; };
template <>
struct LockModeGuard<LockMode::WRITE> { using type = MdsWLockGuard; };
template <LockMode MODE, typename OP>
int retry_release_lock_with_op_until_timeout(const MdsLock &lock,
RetryParam &retry_param,
OP &&op) {
int ret = OB_SUCCESS;
MDS_TG(10_ms);
do {
int64_t current_ts = ObClockGenerator::getClock();;
typename LockModeGuard<MODE>::type lg(lock);
if (MDS_FAIL(op())) {
if (OB_LIKELY(OB_EAGAIN == ret)) {
if (retry_param.check_timeout()) {
ret = OB_TIMEOUT;
}
}
}
} while (OB_EAGAIN == ret && ({ ob_usleep(10_ms); ++retry_param; true; }));
return ret;
#undef PRINT_WRAPPER
};
}
}
}
#endif

View File

@ -627,6 +627,20 @@ TEST_F(TestMdsTable, test_rw_lock_wwlock) {
t2.join();
}
TEST_F(TestMdsTable, test_rvalue_set) {
ExampleUserData2 data;
ASSERT_EQ(OB_SUCCESS, data.assign(MdsAllocator::get_instance(), "123"));
MdsCtx ctx(mds::MdsWriter(ObTransID(100)));
ObString str = data.data_;
ASSERT_EQ(OB_SUCCESS, mds_table_.set(std::move(data), ctx, 0));
bool is_committed = false;
ASSERT_EQ(OB_SUCCESS, mds_table_.get_latest<ExampleUserData2>([&data, str](const ExampleUserData2 &read_data) -> int {
MDS_ASSERT(data.data_ == nullptr);
MDS_ASSERT(str.ptr() == read_data.data_.ptr());
return OB_SUCCESS;
}, is_committed));
}
}
}