[MDS] break retry lock_for_read if LS in gc state
This commit is contained in:
@ -665,6 +665,7 @@ ob_set_subtarget(ob_storage multi_data_source
|
||||
multi_data_source/adapter_define/mds_dump_node.cpp
|
||||
multi_data_source/runtime_utility/mds_tenant_service.cpp
|
||||
multi_data_source/runtime_utility/mds_factory.cpp
|
||||
multi_data_source/runtime_utility/mds_retry_control.cpp
|
||||
multi_data_source/ob_mds_table_merge_dag.cpp
|
||||
multi_data_source/ob_mds_table_merge_dag_param.cpp
|
||||
multi_data_source/ob_mds_table_merge_task.cpp
|
||||
|
@ -304,7 +304,7 @@ int MdsUnit<K, V>::set(MdsTableBase *p_mds_table,
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(10_ms);
|
||||
bool is_lvalue = std::is_lvalue_reference<decltype(value)>::value;
|
||||
RetryParam retry_param(lock_timeout_us);
|
||||
RetryParam retry_param(MdsUnitBase<K, V>::p_mds_table_->ls_id_, lock_timeout_us);
|
||||
SetOP op(this, is_lvalue, p_mds_table, key, value, ctx, is_for_remove, retry_param);
|
||||
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::WRITE>(lock_, retry_param, op))) {
|
||||
if (OB_TIMEOUT == ret) {
|
||||
@ -382,7 +382,7 @@ int MdsUnit<K, V>::get_snapshot(const K &key,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(10_ms);
|
||||
RetryParam retry_param(lock_timeout_us);
|
||||
RetryParam retry_param(MdsUnitBase<K, V>::p_mds_table_->ls_id_, lock_timeout_us);
|
||||
GetSnapShotOp<typename std::remove_reference<OP>::type> op(this, key, read_op, snapshot, read_seq, retry_param);
|
||||
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::READ>(lock_, retry_param, op))) {
|
||||
if (OB_TIMEOUT == ret) {
|
||||
@ -427,7 +427,7 @@ int MdsUnit<K, V>::get_by_writer(const K &key,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(10_ms);
|
||||
RetryParam retry_param(lock_timeout_us);
|
||||
RetryParam retry_param(MdsUnitBase<K, V>::p_mds_table_->ls_id_, lock_timeout_us);
|
||||
GetByWriterOp<typename std::remove_reference<OP>::type> op(this, key, read_op, writer, snapshot, read_seq, retry_param);
|
||||
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::READ>(lock_, retry_param, op))) {
|
||||
if (OB_TIMEOUT == ret) {
|
||||
@ -678,7 +678,7 @@ int MdsUnit<DummyKey, V>::set(MdsTableBase *p_mds_table,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(10_ms);
|
||||
RetryParam retry_param(lock_timeout_us);
|
||||
RetryParam retry_param(MdsUnitBase<DummyKey, V>::p_mds_table_->ls_id_, lock_timeout_us);
|
||||
bool is_lvalue = std::is_lvalue_reference<decltype(value)>::value;
|
||||
SetOP op(this, is_lvalue, value, ctx, retry_param);
|
||||
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::WRITE>(lock_, retry_param, op))) {
|
||||
@ -738,7 +738,7 @@ int MdsUnit<DummyKey, V>::get_snapshot(OP &&read_op,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(10_ms);
|
||||
RetryParam retry_param(lock_timeout_us);
|
||||
RetryParam retry_param(MdsUnitBase<DummyKey, V>::p_mds_table_->ls_id_, lock_timeout_us);
|
||||
GetSnapShotOp<typename std::remove_reference<OP>::type> op(this, read_op, snapshot, read_seq, retry_param);
|
||||
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::READ>(lock_, retry_param, op))) {
|
||||
if (OB_TIMEOUT == ret) {
|
||||
@ -778,7 +778,7 @@ int MdsUnit<DummyKey, V>::get_by_writer(OP &&read_op,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(10_ms);
|
||||
RetryParam retry_param(lock_timeout_us);
|
||||
RetryParam retry_param(MdsUnitBase<DummyKey, V>::p_mds_table_->ls_id_, lock_timeout_us);
|
||||
GetByWriterOp<typename std::remove_reference<OP>::type> op(this, read_op, writer, snapshot, read_seq, retry_param);
|
||||
if (MDS_FAIL(retry_release_lock_with_op_until_timeout<LockMode::READ>(lock_, retry_param, op))) {
|
||||
if (OB_TIMEOUT == ret) {
|
||||
|
@ -0,0 +1,53 @@
|
||||
/**
|
||||
* Copyright (c) 2023 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "mds_retry_control.h"
|
||||
#include "lib/ob_errno.h"
|
||||
#include "logservice/ob_garbage_collector.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
|
||||
bool RetryParam::check_ls_in_gc_state() const {
|
||||
int ret = OB_SUCCESS;
|
||||
bool ret_bool = false;
|
||||
logservice::ObGCHandler *gc_handler = nullptr;
|
||||
ObLSService *ls_service = MTL(ObLSService *);
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
|
||||
if (OB_ISNULL(ls_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG(ERROR, "ls service is null", K(*this));
|
||||
} else if (OB_FAIL(ls_service->get_ls(ls_id_, ls_handle, ObLSGetMod::MDS_TABLE_MOD))) {
|
||||
MDS_LOG(WARN, "fail to get ls", K(*this));
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
MDS_LOG(WARN, "fail to get ls", K(*this));
|
||||
} else if (OB_ISNULL(gc_handler = ls->get_gc_handler())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG(WARN, "gc handler is null", K(*this));
|
||||
} else if (gc_handler->is_log_sync_stopped()) {
|
||||
ret_bool = true;
|
||||
}
|
||||
|
||||
return ret_bool;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
@ -15,6 +15,8 @@
|
||||
|
||||
#include "common_define.h"
|
||||
#include "mds_lock.h"
|
||||
#include "share/ob_errno.h"
|
||||
#include "storage/multi_data_source/mds_table_mgr.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -24,7 +26,8 @@ namespace mds
|
||||
{
|
||||
|
||||
struct RetryParam {
|
||||
RetryParam(int64_t lock_timeout_us, int64_t print_interval = 500_ms) :
|
||||
RetryParam(share::ObLSID ls_id, int64_t lock_timeout_us, int64_t print_interval = 500_ms) :
|
||||
ls_id_(ls_id),
|
||||
start_ts_(ObClockGenerator::getClock()),
|
||||
last_print_ts_(0),
|
||||
timeout_ts_(start_ts_ + lock_timeout_us),
|
||||
@ -40,7 +43,9 @@ struct RetryParam {
|
||||
return ret;
|
||||
}
|
||||
bool check_timeout() const { return ObClockGenerator::getClock() > timeout_ts_; }
|
||||
TO_STRING_KV(KTIME_(start_ts), KTIME_(last_print_ts), KTIME_(timeout_ts), K_(retry_cnt), K_(print_interval));
|
||||
bool check_ls_in_gc_state() const;
|
||||
TO_STRING_KV(K_(ls_id), KTIME_(start_ts), KTIME_(last_print_ts), KTIME_(timeout_ts), K_(retry_cnt), K_(print_interval));
|
||||
share::ObLSID ls_id_;
|
||||
int64_t start_ts_;
|
||||
mutable int64_t last_print_ts_;
|
||||
int64_t timeout_ts_;
|
||||
@ -69,7 +74,8 @@ int retry_release_lock_with_op_until_timeout(const MdsLock &lock,
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(10_ms);
|
||||
do {
|
||||
int64_t current_ts = ObClockGenerator::getClock();;
|
||||
int64_t current_ts = ObClockGenerator::getClock();
|
||||
{
|
||||
typename LockModeGuard<MODE>::type lg(lock);
|
||||
if (MDS_FAIL(op())) {
|
||||
if (OB_LIKELY(OB_EAGAIN == ret)) {
|
||||
@ -78,6 +84,16 @@ int retry_release_lock_with_op_until_timeout(const MdsLock &lock,
|
||||
}
|
||||
}
|
||||
}
|
||||
} // release lock
|
||||
if (OB_EAGAIN == ret && MODE == LockMode::READ) {
|
||||
if ((retry_param.retry_cnt_ % 50) == 0) {// every 500ms check ls status
|
||||
#ifndef UNITTEST_DEBUG
|
||||
if (retry_param.check_ls_in_gc_state()) {
|
||||
ret = OB_REPLICA_NOT_READABLE;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
} while (OB_EAGAIN == ret && ({ ob_usleep(10_ms); ++retry_param; true; }));
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
|
@ -100,7 +100,7 @@ struct UserDataWithCallBack
|
||||
TEST_F(TestMdsNode, call_user_method) {
|
||||
MdsRow<DummyKey, UserDataWithCallBack> row;
|
||||
MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(100)));// commit finally
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(1), ctx, 0));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(1), ctx, {share::ObLSID(0), 0}));
|
||||
ctx.on_redo(mock_scn(1));
|
||||
ctx.before_prepare();
|
||||
ctx.on_prepare(mock_scn(2));
|
||||
@ -114,7 +114,7 @@ TEST_F(TestMdsNode, call_user_method) {
|
||||
TEST_F(TestMdsNode, release_node_while_node_in_ctx) {
|
||||
MdsRow<DummyKey, UserDataWithCallBack> row;
|
||||
MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(100)));// commit finally
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(1), ctx, 0));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(1), ctx, {share::ObLSID(0), 0}));
|
||||
ctx.on_redo(mock_scn(1));
|
||||
ctx.before_prepare();
|
||||
row.~MdsRow();
|
||||
@ -131,11 +131,11 @@ TEST_F(TestMdsNode, release_node_while_node_in_ctx_concurrent) {
|
||||
MdsRow<DummyKey, UserDataWithCallBack> row;
|
||||
MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(100)));// commit finally
|
||||
// 提交这些node将会耗时50ms
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(1), ctx, 0));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(2), ctx, 0));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(3), ctx, 0));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(4), ctx, 0));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(5), ctx, 0));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(1), ctx, {share::ObLSID(0), 0}));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(2), ctx, {share::ObLSID(0), 0}));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(3), ctx, {share::ObLSID(0), 0}));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(4), ctx, {share::ObLSID(0), 0}));
|
||||
ASSERT_EQ(OB_SUCCESS, row.set(UserDataWithCallBack(5), ctx, {share::ObLSID(0), 0}));
|
||||
|
||||
std::thread t1([&ctx]() {
|
||||
OCCAM_LOG(DEBUG, "t1 start");
|
||||
|
Reference in New Issue
Block a user