|
|
|
|
@ -160,14 +160,34 @@ int MdsTableHandle::set(T &&data, MdsCtx &ctx, const int64_t lock_timeout_us)
|
|
|
|
|
uint8_t unit_id = INT8_MAX;
|
|
|
|
|
ret = MdsTableHandleHelper<DummyKey, T>::template get_unit_id<0>(mds_table_id_, unit_id);
|
|
|
|
|
DummyKey dummy_key;
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
int64_t converted_timeout = 0;
|
|
|
|
|
if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict
|
|
|
|
|
converted_timeout = lock_timeout_us;
|
|
|
|
|
} else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) {
|
|
|
|
|
if (lock_timeout_us > 30_s) {// timeout no more than 30s
|
|
|
|
|
MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version",
|
|
|
|
|
KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us), K(converted_timeout));
|
|
|
|
|
converted_timeout = 30_s;
|
|
|
|
|
} else {
|
|
|
|
|
converted_timeout = lock_timeout_us;
|
|
|
|
|
}
|
|
|
|
|
} else {// do mds data maybe hang operation is not allowed in other phase callback
|
|
|
|
|
ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided
|
|
|
|
|
MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :"
|
|
|
|
|
" on_redo/before_prepare/on_prepare/on_commit/on_abort",
|
|
|
|
|
KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us), K(converted_timeout));
|
|
|
|
|
MDS_ASSERT(false);// abort in test environment
|
|
|
|
|
}
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(p_mds_table_base_->set(unit_id,
|
|
|
|
|
(void*)&dummy_key,
|
|
|
|
|
(void*)&data,
|
|
|
|
|
std::is_rvalue_reference<decltype(data)>::value,
|
|
|
|
|
ctx,
|
|
|
|
|
lock_timeout_us))) {
|
|
|
|
|
MDS_LOG(WARN, "fail to call set", KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us));
|
|
|
|
|
converted_timeout))) {
|
|
|
|
|
MDS_LOG(WARN, "fail to call set", KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us), K(converted_timeout));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
@ -235,16 +255,36 @@ int MdsTableHandle::get_snapshot(OP &&read_op,
|
|
|
|
|
ObFunction<int(void *)> function = [&read_op](void *data) -> int {
|
|
|
|
|
return read_op(*reinterpret_cast<const T*>(data));
|
|
|
|
|
};
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
int64_t converted_timeout = 0;
|
|
|
|
|
if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict
|
|
|
|
|
converted_timeout = timeout_us;
|
|
|
|
|
} else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) {
|
|
|
|
|
if (timeout_us > 30_s) {// timeout no more than 30s
|
|
|
|
|
MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(snapshot),
|
|
|
|
|
K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
converted_timeout = 30_s;
|
|
|
|
|
} else {
|
|
|
|
|
converted_timeout = timeout_us;
|
|
|
|
|
}
|
|
|
|
|
} else {// do mds data maybe hang operation is not allowed in other phase callback
|
|
|
|
|
ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided
|
|
|
|
|
MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :"
|
|
|
|
|
" on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(snapshot),
|
|
|
|
|
K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
MDS_ASSERT(false);// abort in test environment
|
|
|
|
|
}
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(p_mds_table_base_->get_snapshot(unit_id,
|
|
|
|
|
(void*)&dummy_key,
|
|
|
|
|
function,
|
|
|
|
|
snapshot,
|
|
|
|
|
read_seq,
|
|
|
|
|
timeout_us))) {
|
|
|
|
|
converted_timeout))) {
|
|
|
|
|
if (OB_SNAPSHOT_DISCARDED != ret) {
|
|
|
|
|
MDS_LOG(WARN, "fail to call get_snapshot", KR(ret), K(unit_id), K(snapshot),
|
|
|
|
|
K(read_seq), K(timeout_us));
|
|
|
|
|
K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -267,6 +307,25 @@ int MdsTableHandle::get_by_writer(OP &&read_op,
|
|
|
|
|
ObFunction<int(void *)> function = [&read_op](void *data) -> int {
|
|
|
|
|
return read_op(*reinterpret_cast<const T*>(data));
|
|
|
|
|
};
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
int64_t converted_timeout = 0;
|
|
|
|
|
if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict
|
|
|
|
|
converted_timeout = timeout_us;
|
|
|
|
|
} else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) {
|
|
|
|
|
if (timeout_us > 30_s) {// timeout no more than 30s
|
|
|
|
|
MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(writer),
|
|
|
|
|
K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
converted_timeout = 30_s;
|
|
|
|
|
} else {
|
|
|
|
|
converted_timeout = timeout_us;
|
|
|
|
|
}
|
|
|
|
|
} else {// do mds data maybe hang operation is not allowed in other phase callback
|
|
|
|
|
ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided
|
|
|
|
|
MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :"
|
|
|
|
|
" on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(writer),
|
|
|
|
|
K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
MDS_ASSERT(false);// abort in test environment
|
|
|
|
|
}
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(p_mds_table_base_->get_by_writer(unit_id,
|
|
|
|
|
(void*)&dummy_key,
|
|
|
|
|
@ -274,10 +333,11 @@ int MdsTableHandle::get_by_writer(OP &&read_op,
|
|
|
|
|
writer,
|
|
|
|
|
snapshot,
|
|
|
|
|
read_seq,
|
|
|
|
|
timeout_us))) {
|
|
|
|
|
converted_timeout))) {
|
|
|
|
|
if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) {
|
|
|
|
|
MDS_LOG(WARN, "fail to call get_by_writer", KR(ret), K(unit_id), K(writer),
|
|
|
|
|
K(snapshot), K(read_seq), K(timeout_us));
|
|
|
|
|
K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -313,15 +373,35 @@ int MdsTableHandle::set(const Key &key, Value &&data, MdsCtx &ctx, const int64_t
|
|
|
|
|
CHECK_MDS_TABLE_INIT();
|
|
|
|
|
uint8_t unit_id = INT8_MAX;
|
|
|
|
|
ret = MdsTableHandleHelper<Key, Value>::template get_unit_id<0>(mds_table_id_, unit_id);
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
int64_t converted_timeout = 0;
|
|
|
|
|
if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict
|
|
|
|
|
converted_timeout = lock_timeout_us;
|
|
|
|
|
} else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) {
|
|
|
|
|
if (lock_timeout_us > 30_s) {// timeout no more than 30s
|
|
|
|
|
MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(key), K(data), K(ctx),
|
|
|
|
|
K(lock_timeout_us), K(converted_timeout));
|
|
|
|
|
converted_timeout = 30_s;
|
|
|
|
|
} else {
|
|
|
|
|
converted_timeout = lock_timeout_us;
|
|
|
|
|
}
|
|
|
|
|
} else {// do mds data maybe hang operation is not allowed in other phase callback
|
|
|
|
|
ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided
|
|
|
|
|
MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :"
|
|
|
|
|
" on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(key), K(data),
|
|
|
|
|
K(ctx), K(lock_timeout_us), K(converted_timeout));
|
|
|
|
|
MDS_ASSERT(false);// abort in test environment
|
|
|
|
|
}
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(p_mds_table_base_->set(unit_id,
|
|
|
|
|
(void*)&key,
|
|
|
|
|
(void*)&data,
|
|
|
|
|
std::is_rvalue_reference<Value>::value,
|
|
|
|
|
ctx,
|
|
|
|
|
lock_timeout_us))) {
|
|
|
|
|
converted_timeout))) {
|
|
|
|
|
MDS_LOG(WARN, "fail to call set", KR(ret), K(unit_id), K(key), K(data), K(ctx),
|
|
|
|
|
K(lock_timeout_us));
|
|
|
|
|
K(lock_timeout_us), K(converted_timeout));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
@ -354,13 +434,33 @@ int MdsTableHandle::remove(const Key &key, MdsCtx &ctx, const int64_t lock_timeo
|
|
|
|
|
CHECK_MDS_TABLE_INIT();
|
|
|
|
|
uint8_t unit_id = INT8_MAX;
|
|
|
|
|
ret = MdsTableHandleHelper<Key, Value>::template get_unit_id<0>(mds_table_id_, unit_id);
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
int64_t converted_timeout = 0;
|
|
|
|
|
if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict
|
|
|
|
|
converted_timeout = lock_timeout_us;
|
|
|
|
|
} else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) {
|
|
|
|
|
if (lock_timeout_us > 30_s) {// timeout no more than 30s
|
|
|
|
|
MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(key), K(ctx),
|
|
|
|
|
K(lock_timeout_us), K(converted_timeout));
|
|
|
|
|
converted_timeout = 30_s;
|
|
|
|
|
} else {
|
|
|
|
|
converted_timeout = lock_timeout_us;
|
|
|
|
|
}
|
|
|
|
|
} else {// do mds data maybe hang operation is not allowed in other phase callback
|
|
|
|
|
ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided
|
|
|
|
|
MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :"
|
|
|
|
|
" on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(key), K(ctx),
|
|
|
|
|
K(lock_timeout_us), K(converted_timeout));
|
|
|
|
|
MDS_ASSERT(false);// abort in test environment
|
|
|
|
|
}
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(p_mds_table_base_->remove(unit_id,
|
|
|
|
|
(void*)&key,
|
|
|
|
|
ctx,
|
|
|
|
|
lock_timeout_us))) {
|
|
|
|
|
converted_timeout))) {
|
|
|
|
|
MDS_LOG(WARN, "fail to call remove", KR(ret), K(unit_id), K(key), K(ctx),
|
|
|
|
|
K(lock_timeout_us));
|
|
|
|
|
K(lock_timeout_us), K(converted_timeout));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
@ -425,16 +525,36 @@ int MdsTableHandle::get_snapshot(const Key &key,
|
|
|
|
|
ObFunction<int(void *)> function = [&read_op](void *data) -> int {
|
|
|
|
|
return read_op(*reinterpret_cast<const Value*>(data));
|
|
|
|
|
};
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
int64_t converted_timeout = 0;
|
|
|
|
|
if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict
|
|
|
|
|
converted_timeout = timeout_us;
|
|
|
|
|
} else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) {
|
|
|
|
|
if (timeout_us > 30_s) {// timeout no more than 30s
|
|
|
|
|
MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(key), K(snapshot),
|
|
|
|
|
K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
converted_timeout = 30_s;
|
|
|
|
|
} else {
|
|
|
|
|
converted_timeout = timeout_us;
|
|
|
|
|
}
|
|
|
|
|
} else {// do mds data maybe hang operation is not allowed in other phase callback
|
|
|
|
|
ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided
|
|
|
|
|
MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :"
|
|
|
|
|
" on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(key), K(snapshot),
|
|
|
|
|
K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
MDS_ASSERT(false);// abort in test environment
|
|
|
|
|
}
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(p_mds_table_base_->get_snapshot(unit_id,
|
|
|
|
|
(void*)&key,
|
|
|
|
|
function,
|
|
|
|
|
snapshot,
|
|
|
|
|
read_seq,
|
|
|
|
|
timeout_us))) {
|
|
|
|
|
converted_timeout))) {
|
|
|
|
|
if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) {
|
|
|
|
|
MDS_LOG(WARN, "fail to call get_snapshot", KR(ret), K(unit_id), K(key), K(snapshot),
|
|
|
|
|
K(read_seq), K(timeout_us));
|
|
|
|
|
K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -456,6 +576,25 @@ int MdsTableHandle::get_by_writer(const Key &key,
|
|
|
|
|
ObFunction<int(void *)> function = [&read_op](void *data) -> int {
|
|
|
|
|
return read_op(*reinterpret_cast<const Value*>(data));
|
|
|
|
|
};
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
int64_t converted_timeout = 0;
|
|
|
|
|
if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict
|
|
|
|
|
converted_timeout = timeout_us;
|
|
|
|
|
} else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) {
|
|
|
|
|
if (timeout_us > 30_s) {// timeout no more than 30s
|
|
|
|
|
MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(key), K(writer),
|
|
|
|
|
K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
converted_timeout = 30_s;
|
|
|
|
|
} else {
|
|
|
|
|
converted_timeout = timeout_us;
|
|
|
|
|
}
|
|
|
|
|
} else {// do mds data maybe hang operation is not allowed in other phase callback
|
|
|
|
|
ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided
|
|
|
|
|
MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :"
|
|
|
|
|
" on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(key), K(writer),
|
|
|
|
|
K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
MDS_ASSERT(false);// abort in test environment
|
|
|
|
|
}
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(p_mds_table_base_->get_by_writer(unit_id,
|
|
|
|
|
(void*)&key,
|
|
|
|
|
@ -463,10 +602,11 @@ int MdsTableHandle::get_by_writer(const Key &key,
|
|
|
|
|
writer,
|
|
|
|
|
snapshot,
|
|
|
|
|
read_seq,
|
|
|
|
|
timeout_us))) {
|
|
|
|
|
converted_timeout))) {
|
|
|
|
|
if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) {
|
|
|
|
|
MDS_LOG(WARN, "fail to call get_by_writer", KR(ret), K(unit_id), K(key), K(writer),
|
|
|
|
|
K(snapshot), K(read_seq), K(timeout_us));
|
|
|
|
|
K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|