diff --git a/src/storage/multi_data_source/mds_table_base.cpp b/src/storage/multi_data_source/mds_table_base.cpp index 7456766c70..03880fe21b 100644 --- a/src/storage/multi_data_source/mds_table_base.cpp +++ b/src/storage/multi_data_source/mds_table_base.cpp @@ -18,6 +18,7 @@ #include "storage/tx_storage/ob_ls_service.h" #include "storage/compaction/ob_schedule_dag_func.h" #include "storage/multi_data_source/ob_mds_table_merge_dag_param.h" +#include "storage/tx/ob_multi_data_source.h" namespace oceanbase { @@ -26,6 +27,8 @@ namespace storage namespace mds { +TLOCAL(transaction::NotifyType, TLOCAL_MDS_TRANS_NOTIFY_TYPE) = transaction::NotifyType::UNKNOWN; + int MdsTableBase::advance_state_to(State new_state) const { int ret = OB_SUCCESS; diff --git a/src/storage/multi_data_source/mds_table_base.h b/src/storage/multi_data_source/mds_table_base.h index dcadb614d1..416a6e173e 100644 --- a/src/storage/multi_data_source/mds_table_base.h +++ b/src/storage/multi_data_source/mds_table_base.h @@ -24,6 +24,10 @@ namespace oceanbase { +namespace transaction +{ +enum class NotifyType : int64_t; +} namespace share { class SCN; @@ -33,6 +37,7 @@ namespace storage class ObTabletPointer; namespace mds { +extern TLOCAL(transaction::NotifyType, TLOCAL_MDS_TRANS_NOTIFY_TYPE); template class MdsRow; template diff --git a/src/storage/multi_data_source/mds_table_handle.ipp b/src/storage/multi_data_source/mds_table_handle.ipp index b6b3472fdb..09dd03941a 100644 --- a/src/storage/multi_data_source/mds_table_handle.ipp +++ b/src/storage/multi_data_source/mds_table_handle.ipp @@ -161,13 +161,33 @@ int MdsTableHandle::set(T &&data, MdsCtx &ctx, const int64_t lock_timeout_us) ret = MdsTableHandleHelper::template get_unit_id<0>(mds_table_id_, unit_id); DummyKey dummy_key; if (OB_SUCC(ret)) { - if (OB_FAIL(p_mds_table_base_->set(unit_id, - (void*)&dummy_key, - (void*)&data, - std::is_rvalue_reference::value, - ctx, - lock_timeout_us))) { - MDS_LOG(WARN, "fail to call set", KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us)); + int64_t converted_timeout = 0; + if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict + converted_timeout = lock_timeout_us; + } else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) { + if (lock_timeout_us > 30_s) {// timeout no more than 30s + MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", + KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us), K(converted_timeout)); + converted_timeout = 30_s; + } else { + converted_timeout = lock_timeout_us; + } + } else {// do mds data maybe hang operation is not allowed in other phase callback + ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided + MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :" + " on_redo/before_prepare/on_prepare/on_commit/on_abort", + KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us), K(converted_timeout)); + MDS_ASSERT(false);// abort in test environment + } + if (OB_SUCC(ret)) { + if (OB_FAIL(p_mds_table_base_->set(unit_id, + (void*)&dummy_key, + (void*)&data, + std::is_rvalue_reference::value, + ctx, + converted_timeout))) { + MDS_LOG(WARN, "fail to call set", KR(ret), K(unit_id), K(data), K(ctx), K(lock_timeout_us), K(converted_timeout)); + } } } return ret; @@ -236,15 +256,35 @@ int MdsTableHandle::get_snapshot(OP &&read_op, return read_op(*reinterpret_cast(data)); }; if (OB_SUCC(ret)) { - if (OB_FAIL(p_mds_table_base_->get_snapshot(unit_id, - (void*)&dummy_key, - function, - snapshot, - read_seq, - timeout_us))) { - if (OB_SNAPSHOT_DISCARDED != ret) { - MDS_LOG(WARN, "fail to call get_snapshot", KR(ret), K(unit_id), K(snapshot), - K(read_seq), K(timeout_us)); + int64_t converted_timeout = 0; + if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict + converted_timeout = timeout_us; + } else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) { + if (timeout_us > 30_s) {// timeout no more than 30s + MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(snapshot), + K(read_seq), K(timeout_us), K(converted_timeout)); + converted_timeout = 30_s; + } else { + converted_timeout = timeout_us; + } + } else {// do mds data maybe hang operation is not allowed in other phase callback + ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided + MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :" + " on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(snapshot), + K(read_seq), K(timeout_us), K(converted_timeout)); + MDS_ASSERT(false);// abort in test environment + } + if (OB_SUCC(ret)) { + if (OB_FAIL(p_mds_table_base_->get_snapshot(unit_id, + (void*)&dummy_key, + function, + snapshot, + read_seq, + converted_timeout))) { + if (OB_SNAPSHOT_DISCARDED != ret) { + MDS_LOG(WARN, "fail to call get_snapshot", KR(ret), K(unit_id), K(snapshot), + K(read_seq), K(timeout_us), K(converted_timeout)); + } } } } @@ -268,16 +308,36 @@ int MdsTableHandle::get_by_writer(OP &&read_op, return read_op(*reinterpret_cast(data)); }; if (OB_SUCC(ret)) { - if (OB_FAIL(p_mds_table_base_->get_by_writer(unit_id, - (void*)&dummy_key, - function, - writer, - snapshot, - read_seq, - timeout_us))) { - if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { - MDS_LOG(WARN, "fail to call get_by_writer", KR(ret), K(unit_id), K(writer), - K(snapshot), K(read_seq), K(timeout_us)); + int64_t converted_timeout = 0; + if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict + converted_timeout = timeout_us; + } else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) { + if (timeout_us > 30_s) {// timeout no more than 30s + MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(writer), + K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout)); + converted_timeout = 30_s; + } else { + converted_timeout = timeout_us; + } + } else {// do mds data maybe hang operation is not allowed in other phase callback + ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided + MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :" + " on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(writer), + K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout)); + MDS_ASSERT(false);// abort in test environment + } + if (OB_SUCC(ret)) { + if (OB_FAIL(p_mds_table_base_->get_by_writer(unit_id, + (void*)&dummy_key, + function, + writer, + snapshot, + read_seq, + converted_timeout))) { + if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { + MDS_LOG(WARN, "fail to call get_by_writer", KR(ret), K(unit_id), K(writer), + K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout)); + } } } } @@ -314,14 +374,34 @@ int MdsTableHandle::set(const Key &key, Value &&data, MdsCtx &ctx, const int64_t uint8_t unit_id = INT8_MAX; ret = MdsTableHandleHelper::template get_unit_id<0>(mds_table_id_, unit_id); if (OB_SUCC(ret)) { - if (OB_FAIL(p_mds_table_base_->set(unit_id, - (void*)&key, - (void*)&data, - std::is_rvalue_reference::value, - ctx, - lock_timeout_us))) { - MDS_LOG(WARN, "fail to call set", KR(ret), K(unit_id), K(key), K(data), K(ctx), - K(lock_timeout_us)); + int64_t converted_timeout = 0; + if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict + converted_timeout = lock_timeout_us; + } else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) { + if (lock_timeout_us > 30_s) {// timeout no more than 30s + MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(key), K(data), K(ctx), + K(lock_timeout_us), K(converted_timeout)); + converted_timeout = 30_s; + } else { + converted_timeout = lock_timeout_us; + } + } else {// do mds data maybe hang operation is not allowed in other phase callback + ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided + MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :" + " on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(key), K(data), + K(ctx), K(lock_timeout_us), K(converted_timeout)); + MDS_ASSERT(false);// abort in test environment + } + if (OB_SUCC(ret)) { + if (OB_FAIL(p_mds_table_base_->set(unit_id, + (void*)&key, + (void*)&data, + std::is_rvalue_reference::value, + ctx, + converted_timeout))) { + MDS_LOG(WARN, "fail to call set", KR(ret), K(unit_id), K(key), K(data), K(ctx), + K(lock_timeout_us), K(converted_timeout)); + } } } return ret; @@ -355,12 +435,32 @@ int MdsTableHandle::remove(const Key &key, MdsCtx &ctx, const int64_t lock_timeo uint8_t unit_id = INT8_MAX; ret = MdsTableHandleHelper::template get_unit_id<0>(mds_table_id_, unit_id); if (OB_SUCC(ret)) { - if (OB_FAIL(p_mds_table_base_->remove(unit_id, - (void*)&key, - ctx, - lock_timeout_us))) { - MDS_LOG(WARN, "fail to call remove", KR(ret), K(unit_id), K(key), K(ctx), - K(lock_timeout_us)); + int64_t converted_timeout = 0; + if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict + converted_timeout = lock_timeout_us; + } else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) { + if (lock_timeout_us > 30_s) {// timeout no more than 30s + MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(key), K(ctx), + K(lock_timeout_us), K(converted_timeout)); + converted_timeout = 30_s; + } else { + converted_timeout = lock_timeout_us; + } + } else {// do mds data maybe hang operation is not allowed in other phase callback + ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided + MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :" + " on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(key), K(ctx), + K(lock_timeout_us), K(converted_timeout)); + MDS_ASSERT(false);// abort in test environment + } + if (OB_SUCC(ret)) { + if (OB_FAIL(p_mds_table_base_->remove(unit_id, + (void*)&key, + ctx, + converted_timeout))) { + MDS_LOG(WARN, "fail to call remove", KR(ret), K(unit_id), K(key), K(ctx), + K(lock_timeout_us), K(converted_timeout)); + } } } return ret; @@ -426,15 +526,35 @@ int MdsTableHandle::get_snapshot(const Key &key, return read_op(*reinterpret_cast(data)); }; if (OB_SUCC(ret)) { - if (OB_FAIL(p_mds_table_base_->get_snapshot(unit_id, - (void*)&key, - function, - snapshot, - read_seq, - timeout_us))) { - if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { - MDS_LOG(WARN, "fail to call get_snapshot", KR(ret), K(unit_id), K(key), K(snapshot), - K(read_seq), K(timeout_us)); + int64_t converted_timeout = 0; + if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict + converted_timeout = timeout_us; + } else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) { + if (timeout_us > 30_s) {// timeout no more than 30s + MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(key), K(snapshot), + K(read_seq), K(timeout_us), K(converted_timeout)); + converted_timeout = 30_s; + } else { + converted_timeout = timeout_us; + } + } else {// do mds data maybe hang operation is not allowed in other phase callback + ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided + MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :" + " on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(key), K(snapshot), + K(read_seq), K(timeout_us), K(converted_timeout)); + MDS_ASSERT(false);// abort in test environment + } + if (OB_SUCC(ret)) { + if (OB_FAIL(p_mds_table_base_->get_snapshot(unit_id, + (void*)&key, + function, + snapshot, + read_seq, + converted_timeout))) { + if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { + MDS_LOG(WARN, "fail to call get_snapshot", KR(ret), K(unit_id), K(key), K(snapshot), + K(read_seq), K(timeout_us), K(converted_timeout)); + } } } } @@ -457,16 +577,36 @@ int MdsTableHandle::get_by_writer(const Key &key, return read_op(*reinterpret_cast(data)); }; if (OB_SUCC(ret)) { - if (OB_FAIL(p_mds_table_base_->get_by_writer(unit_id, - (void*)&key, - function, - writer, - snapshot, - read_seq, - timeout_us))) { - if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { - MDS_LOG(WARN, "fail to call get_by_writer", KR(ret), K(unit_id), K(key), K(writer), - K(snapshot), K(read_seq), K(timeout_us)); + int64_t converted_timeout = 0; + if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::UNKNOWN) { // no restrict + converted_timeout = timeout_us; + } else if (TLOCAL_MDS_TRANS_NOTIFY_TYPE == transaction::NotifyType::REGISTER_SUCC) { + if (timeout_us > 30_s) {// timeout no more than 30s + MDS_LOG(INFO, "timeout ts mustn't more than 30s in current version", KR(ret), K(unit_id), K(key), K(writer), + K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout)); + converted_timeout = 30_s; + } else { + converted_timeout = timeout_us; + } + } else {// do mds data maybe hang operation is not allowed in other phase callback + ret = OB_OP_NOT_ALLOW;// this call may deadlock with other threads and can not be avoided + MDS_LOG(ERROR, "you mustn't do maybe hung operation in trans callbacks :" + " on_redo/before_prepare/on_prepare/on_commit/on_abort", KR(ret), K(unit_id), K(key), K(writer), + K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout)); + MDS_ASSERT(false);// abort in test environment + } + if (OB_SUCC(ret)) { + if (OB_FAIL(p_mds_table_base_->get_by_writer(unit_id, + (void*)&key, + function, + writer, + snapshot, + read_seq, + converted_timeout))) { + if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) { + MDS_LOG(WARN, "fail to call get_by_writer", KR(ret), K(unit_id), K(key), K(writer), + K(snapshot), K(read_seq), K(timeout_us), K(converted_timeout)); + } } } } diff --git a/src/storage/tx/ob_multi_data_source.cpp b/src/storage/tx/ob_multi_data_source.cpp index 65f1961d38..b8fe7f990f 100644 --- a/src/storage/tx/ob_multi_data_source.cpp +++ b/src/storage/tx/ob_multi_data_source.cpp @@ -220,6 +220,7 @@ int ObMulSourceTxDataNotifier::notify(const ObTxBufferNodeArray &array, } } } else { + mds::TLOCAL_MDS_TRANS_NOTIFY_TYPE = notify_type; switch (node.type_) { #define NEED_GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION #define _GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION_(HELPER_CLASS, BUFFER_CTX_TYPE, ID, ENUM_NAME) \ @@ -281,6 +282,7 @@ int ObMulSourceTxDataNotifier::notify(const ObTxBufferNodeArray &array, default: ob_abort(); } + mds::TLOCAL_MDS_TRANS_NOTIFY_TYPE = NotifyType::UNKNOWN; } if (OB_FAIL(ret)) { TRANS_LOG(WARN, "notify data source failed", KR(ret), K(node)); diff --git a/src/storage/tx/ob_multi_data_source.h b/src/storage/tx/ob_multi_data_source.h index 5c3ed08c15..5fc535d1d8 100644 --- a/src/storage/tx/ob_multi_data_source.h +++ b/src/storage/tx/ob_multi_data_source.h @@ -69,6 +69,7 @@ enum class ObTxDataSourceType : int64_t enum class NotifyType : int64_t { + UNKNOWN = -1, REGISTER_SUCC = 0, ON_REDO = 1, TX_END = 2,