From b07ff0cb1301ab05b6369eaa86af2ee3047b9846 Mon Sep 17 00:00:00 2001 From: KyrielightWei Date: Mon, 11 Mar 2024 03:20:56 +0000 Subject: [PATCH] [CP] reserve the memory of a final mds array before submit commit/abort log --- src/storage/tx/ob_trans_define.h | 1 + src/storage/tx/ob_trans_part_ctx.cpp | 99 ++++++++++++---------------- src/storage/tx/ob_trans_part_ctx.h | 3 +- src/storage/tx/ob_tx_ctx_mds.cpp | 42 +++++++++++- src/storage/tx/ob_tx_ctx_mds.h | 17 ++++- 5 files changed, 101 insertions(+), 61 deletions(-) diff --git a/src/storage/tx/ob_trans_define.h b/src/storage/tx/ob_trans_define.h index befeaae25..171386a21 100644 --- a/src/storage/tx/ob_trans_define.h +++ b/src/storage/tx/ob_trans_define.h @@ -1810,6 +1810,7 @@ public: incremental_participants_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "INC_PART`")), intermediate_participants_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "INTER_PART`")), redo_lsns_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "REDO_LSNS")), + multi_data_source_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "MDS_ARRAY")), checksum_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "TX_CHECKSUM")), checksum_scn_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "TX_CHECKSUM")), prepare_log_info_arr_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "PREPARE_INFO")) diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index fe517f5a2..e9d719212 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -2274,7 +2274,6 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) const SCN log_ts = log_cb->get_log_ts(); const palf::LSN log_lsn = log_cb->get_lsn(); const ObTxCbArgArray &cb_arg_array = log_cb->get_cb_arg_array(); - ObTxBufferNodeArray tmp_array; if (OB_FAIL(common_on_success_(log_cb))) { TRANS_LOG(WARN, "common_on_success_ failed", K(ret)); @@ -2446,12 +2445,13 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) } } else { const NotifyType type = NotifyType::ON_ABORT; - tmp_array.reset(); if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) { TRANS_LOG(WARN, "set end log ts failed", K(ret)); - } else if (OB_FAIL(gen_total_mds_array_(tmp_array))) { + } else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_, + true /*need_merge_cache*/, + true /*allow_log_overflow*/))) { TRANS_LOG(WARN, "gen total mds array failed", K(ret)); - } else if (OB_FAIL(notify_data_source_(type, log_ts, false, tmp_array))) { + } else if (OB_FAIL(notify_data_source_(type, log_ts, false, mds_cache_.get_final_notify_array()))) { TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this)); } ObTwoPhaseCommitLogType two_phase_log_type; @@ -3294,8 +3294,8 @@ int ObPartTransCtx::submit_commit_log_() palf::LSN prev_lsn; bool has_redo = false; ObRedoLogSubmitHelper helper; - ObTxBufferNodeArray multi_source_data; const int64_t replay_hint = trans_id_.get_id(); + const bool local_tx = is_local_tx_(); using LogBarrierType = logservice::ObReplayBarrierType; LogBarrierType commit_info_log_barrier = LogBarrierType::NO_NEED_BARRIER; @@ -3303,13 +3303,17 @@ int ObPartTransCtx::submit_commit_log_() || get_downstream_state() == ObTxState::ABORT) { ret = OB_TRANS_KILLED; TRANS_LOG(WARN, "tx has been aborting, can not submit prepare log", K(ret)); - } else if (OB_FAIL(gen_final_mds_array_(multi_source_data))) { - TRANS_LOG(WARN, "gen total multi source data failed", KR(ret), K(*this)); + } else if (OB_FAIL(mds_cache_.reserve_final_notify_array(exec_info_.multi_data_source_))) { + TRANS_LOG(WARN, "reserve mds cache memory failed", KR(ret), K(*this)); + } else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_, + true /*need_merge_cache*/, + false /*allow_log_overflow*/))) { + TRANS_LOG(WARN, "generate final notify array failed", K(ret), K(mds_cache_), KPC(this)); } else { bool log_block_inited = false; int64_t suggested_buf_size = ObTxAdaptiveLogBuf::NORMAL_LOG_BUF_SIZE; if (local_tx && - multi_source_data.count() == 0 && + mds_cache_.get_final_notify_array().count() == 0 && // 512B ((mt_ctx_.get_pending_log_size() < ObTxAdaptiveLogBuf::MIN_LOG_BUF_SIZE / 4) || // for corner case test @@ -3397,9 +3401,8 @@ int ObPartTransCtx::submit_commit_log_() collapsed_checksum, checksum_sig, exec_info_.incremental_participants_, - multi_source_data, exec_info_.trans_type_, prev_lsn, - coord_prepare_info_arr_, - prev_log_type); + mds_cache_.get_final_notify_array(), exec_info_.trans_type_, prev_lsn, + coord_prepare_info_arr_, prev_log_type); ObTxLogCb *log_cb = NULL; bool redo_log_submitted = false; LogBarrierType commit_log_barrier_type = LogBarrierType::NO_NEED_BARRIER; @@ -3553,16 +3556,19 @@ int ObPartTransCtx::submit_abort_log_() int ret = OB_SUCCESS; ObTxLogCb *log_cb = NULL; ObTxLogBlock log_block; - ObTxBufferNodeArray tmp_array; const int64_t replay_hint = trans_id_.get_id(); using LogBarrierType = logservice::ObReplayBarrierType; logservice::ObReplayBarrierType barrier = LogBarrierType::NO_NEED_BARRIER; - if (OB_FAIL(gen_final_mds_array_(tmp_array, false))) { + if (OB_FAIL(mds_cache_.reserve_final_notify_array(exec_info_.multi_data_source_))) { + TRANS_LOG(WARN, "reserve final notify array failed", K(ret), K(mds_cache_), KPC(this)); + } else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_, + true /*need_merge_cache*/, + false /*allow_log_overflow*/))) { TRANS_LOG(WARN, "gen abort mds array failed", K(ret)); } - ObTxAbortLog abort_log(tmp_array); + ObTxAbortLog abort_log(mds_cache_.get_final_notify_array()); if (OB_SUCC(ret)) { if ((exec_info_.multi_data_source_.count() > 0 || mds_cache_.count() > 0)) { @@ -5617,19 +5623,18 @@ int ObPartTransCtx::replay_abort(const ObTxAbortLog &abort_log, } if (OB_SUCC(ret)) { // we must notify mds tx_end before invoking trans_replay_abort_ for clearing tablet lock - ObTxBufferNodeArray tmp_array; - if (OB_FAIL(gen_total_mds_array_(tmp_array))) { + if (OB_FAIL(mds_cache_.generate_final_notify_array( + exec_info_.multi_data_source_, true /*need_merge_cache*/, true /*allow_log_overflow*/))) { TRANS_LOG(WARN, "gen total mds array failed", K(ret)); - } else if (OB_FAIL(notify_data_source_(NotifyType::TX_END, - timestamp, - true, + } else if (OB_FAIL(notify_data_source_(NotifyType::TX_END, timestamp, true, exec_info_.multi_data_source_))) { TRANS_LOG(WARN, "notify data source for TX_END failed", KR(ret), K(*this)); } else if (OB_FAIL(trans_replay_abort_(timestamp))) { TRANS_LOG(WARN, "transaction replay end error", KR(ret), "context", *this); } else if (OB_FAIL(trans_clear_())) { TRANS_LOG(WARN, "transaction clear error", KR(ret), "context", *this); - } else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT, timestamp, true, tmp_array))) { + } else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT, timestamp, true, + mds_cache_.get_final_notify_array()))) { TRANS_LOG(WARN, "notify data source failed", KR(ret), K(abort_log)); } else if ((!ctx_tx_data_.is_read_only()) && OB_FAIL(ctx_tx_data_.insert_into_tx_table())) { TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); @@ -6423,35 +6428,16 @@ const common::ObAddr &ObPartTransCtx::get_scheduler() const return exec_info_.scheduler_; } -int ObPartTransCtx::gen_final_mds_array_(ObTxBufferNodeArray &array, bool is_committing) const +int ObPartTransCtx::gen_total_mds_array_(ObTxBufferNodeArray &mds_array) { int ret = OB_SUCCESS; - // If the is_committing is true, some redo log have not confirmed. - if (OB_FAIL(array.assign(exec_info_.multi_data_source_))) { - TRANS_LOG(WARN, "assign multi source data failed", KR(ret), K(*this)); - } else if (is_committing && OB_FAIL(mds_cache_.copy_to(array))) { - TRANS_LOG(WARN, "copy from mds_cache_ failed", K(ret)); - } - - if (array.get_serialize_size() > ObTxMultiDataSourceLog::MAX_MDS_LOG_SIZE) { - TRANS_LOG(WARN, "MDS array is overflow, use empty MDS array", K(array.get_serialize_size())); - array.reset(); - } - - return ret; -} - -int ObPartTransCtx::gen_total_mds_array_(ObTxBufferNodeArray &mds_array) const -{ - int ret = OB_SUCCESS; - - if (OB_FAIL(mds_array.assign(exec_info_.multi_data_source_))) { - TRANS_LOG(WARN, "assign multi source data failed", KR(ret), K(*this)); - } else if (OB_FAIL(mds_cache_.copy_to(mds_array))) { + if (OB_FAIL(mds_cache_.generate_final_notify_array( + exec_info_.multi_data_source_, true /*need_merge_cache*/, true /*allow_log_overflow*/))) { + TRANS_LOG(WARN, "generate final notify array failed", K(ret), KPC(this)); + } else if (OB_FAIL(mds_array.assign(mds_cache_.get_final_notify_array()))) { TRANS_LOG(WARN, "assign multi source data failed", KR(ret), K(*this)); } - return ret; } @@ -6837,16 +6823,16 @@ int ObPartTransCtx::prepare_mul_data_source_tx_end_(bool is_commit) int ret = OB_SUCCESS; if (OB_SUCC(ret)) { - ObTxBufferNodeArray tmp_array; - if (is_commit - && mds_cache_.count() > 0 - && OB_FAIL(submit_log_impl_(ObTxLogType::TX_MULTI_DATA_SOURCE_LOG))) { + if (is_commit && mds_cache_.count() > 0 + && OB_FAIL(submit_log_impl_(ObTxLogType::TX_MULTI_DATA_SOURCE_LOG))) { TRANS_LOG(WARN, "submit multi data souce log failed", K(ret)); - } else if (OB_FAIL(gen_total_mds_array_(tmp_array))) { + } else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_, + true /*need_merge_cache*/, + true /*allow_log_overflo*/))) { TRANS_LOG(WARN, "copy total mds array failed", K(ret)); } else if (OB_FAIL(notify_data_source_(NotifyType::TX_END, SCN(), false, - tmp_array))) { + mds_cache_.get_final_notify_array()))) { TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this)); } } @@ -8420,8 +8406,8 @@ int ObPartTransCtx::do_force_kill_tx_() if (get_downstream_state() >= ObTxState::COMMIT) { // do nothing - } else if (OB_FAIL(gen_total_mds_array_(tmp_array))) { - TRANS_LOG(WARN, "gen total mds array failed", KR(ret), K(*this)); + // } else if (OB_FAIL(gen_total_mds_array_(tmp_array))) { + // TRANS_LOG(WARN, "gen total mds array failed", KR(ret), K(*this)); // } else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT, // ctx_tx_data_.get_end_log_ts() /*invalid_scn*/, false, // tmp_array, true /*is_force_kill*/))) { @@ -8501,16 +8487,16 @@ int ObPartTransCtx::on_local_abort_tx_() { int ret = OB_SUCCESS; - ObTxBufferNodeArray tmp_array; - if (OB_FAIL(tx_end_(false /*commit*/))) { TRANS_LOG(WARN, "trans end error", KR(ret), "context", *this); } else if (OB_FAIL(trans_clear_())) { TRANS_LOG(WARN, "local tx clear error", KR(ret), K(*this)); - } else if (OB_FAIL(gen_total_mds_array_(tmp_array))) { + } else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_, + true /*need_merge_cache*/, + true /*allow_log_overflow*/))) { TRANS_LOG(WARN, "gen total mds array failed", KR(ret), K(*this)); } else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT, ctx_tx_data_.get_end_log_ts(), false, - tmp_array))) { + mds_cache_.get_final_notify_array()))) { TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this)); } else if (FALSE_IT(set_durable_state_(ObTxState::ABORT))) { @@ -8526,6 +8512,7 @@ int ObPartTransCtx::on_local_abort_tx_() return ret; } + int ObPartTransCtx::dump_2_text(FILE *fd) { int ret = OB_SUCCESS; diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 9d8b86fd3..1dbbcb9fb 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -618,8 +618,7 @@ private: const bool for_replay, const ObTxBufferNodeArray ¬ify_array, const bool is_force_kill = false); - int gen_final_mds_array_(ObTxBufferNodeArray &array, bool is_committing = true) const; - int gen_total_mds_array_(ObTxBufferNodeArray &mds_array) const; + int gen_total_mds_array_(ObTxBufferNodeArray &mds_array); int deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, ObTxBufferNodeArray &incremental_array, bool need_replace = false); diff --git a/src/storage/tx/ob_tx_ctx_mds.cpp b/src/storage/tx/ob_tx_ctx_mds.cpp index eb84784c6..fc63c70ef 100644 --- a/src/storage/tx/ob_tx_ctx_mds.cpp +++ b/src/storage/tx/ob_tx_ctx_mds.cpp @@ -352,7 +352,47 @@ int ObTxMDSCache::decide_cache_state_log_mds_barrier_type( return ret; } -int ObTxMDSCache::copy_to(ObTxBufferNodeArray &tmp_array) const +int ObTxMDSCache::reserve_final_notify_array(const ObTxBufferNodeArray &mds_durable_arr) +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(final_notify_array_.reserve(mds_list_.size() + mds_durable_arr.count()))) { + TRANS_LOG(WARN, "reserve notify array space failed", K(ret), K(mds_list_.size()), + K(mds_durable_arr.count())); + } + + return ret; +} + +int ObTxMDSCache::generate_final_notify_array(const ObTxBufferNodeArray &mds_durable_arr, + bool need_merge_cache, + bool allow_log_overflow) +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(final_notify_array_.assign(mds_durable_arr))) { + TRANS_LOG(WARN, "assign mds_durable_arr failed", K(ret), K(mds_durable_arr.count()), + K(final_notify_array_.get_capacity()), KPC(this)); + + } else if (need_merge_cache) { + if (OB_FAIL(copy_to_(final_notify_array_))) { + TRANS_LOG(WARN, "merge mds_cache into final_notify_array failed", K(ret), + K(mds_durable_arr.count()), K(final_notify_array_.get_capacity()), KPC(this)); + } + } + + if (!allow_log_overflow) { + if (final_notify_array_.get_serialize_size() > ObTxMultiDataSourceLog::MAX_MDS_LOG_SIZE) { + TRANS_LOG(WARN, "MDS array is overflow, use empty MDS array", + K(final_notify_array_.get_serialize_size()), K(mds_durable_arr.count()), KPC(this)); + final_notify_array_.reuse(); + } + } + + return ret; +} + +int ObTxMDSCache::copy_to_(ObTxBufferNodeArray &tmp_array) const { int ret = OB_SUCCESS; diff --git a/src/storage/tx/ob_tx_ctx_mds.h b/src/storage/tx/ob_tx_ctx_mds.h index 7ae0d5f9b..697b9af5a 100644 --- a/src/storage/tx/ob_tx_ctx_mds.h +++ b/src/storage/tx/ob_tx_ctx_mds.h @@ -50,7 +50,11 @@ typedef common::hash::ObHashMap ObTxMDSMemStatHash; class ObTxMDSCache { public: - ObTxMDSCache(TransModulePageAllocator &allocator) : mds_list_(allocator) { reset(); } + ObTxMDSCache(TransModulePageAllocator &allocator) + : mds_list_(allocator), final_notify_array_(OB_MALLOC_NORMAL_BLOCK_SIZE, allocator) + { + reset(); + } int init(const int64_t tenant_id, const share::ObLSID ls_id, const ObTransID tx_id); void reset(); void destroy(); @@ -77,7 +81,12 @@ public: const ObTxLogType state_log_type, logservice::ObReplayBarrierType &cache_final_barrier_type); int earse_from_cache(const ObTxBufferNode &node) { return mds_list_.erase(node); } - int copy_to(ObTxBufferNodeArray &tmp_array) const; + + int reserve_final_notify_array(const ObTxBufferNodeArray &mds_durable_arr); + int generate_final_notify_array(const ObTxBufferNodeArray &mds_durable_arr, + bool need_merge_cache, + bool allow_log_overflow); + ObTxBufferNodeArray &get_final_notify_array() { return final_notify_array_; } int64_t get_unsubmitted_size() const { return unsubmitted_size_; } int64_t count() const { return mds_list_.size(); } @@ -93,6 +102,9 @@ public: TO_STRING_KV(K(unsubmitted_size_), K(mds_list_.size()), K(max_register_no_)); +private: + int copy_to_(ObTxBufferNodeArray &tmp_array) const; + private: // TransModulePageAllocator allocator_; uint64_t max_register_no_; @@ -100,6 +112,7 @@ private: int64_t unsubmitted_size_; ObTxBufferNodeList mds_list_; ObTxBufferNodeList::iterator submitted_iterator_; + ObTxBufferNodeArray final_notify_array_; #ifdef ENABLE_DEBUG_LOG int64_t tenant_id_;