[CP] reserve the memory of a final mds array before submit commit/abort log

This commit is contained in:
KyrielightWei 2024-03-11 03:20:56 +00:00 committed by ob-robot
parent 09bbafdb7a
commit b07ff0cb13
5 changed files with 101 additions and 61 deletions

View File

@ -1810,6 +1810,7 @@ public:
incremental_participants_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "INC_PART`")),
intermediate_participants_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "INTER_PART`")),
redo_lsns_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "REDO_LSNS")),
multi_data_source_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "MDS_ARRAY")),
checksum_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "TX_CHECKSUM")),
checksum_scn_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "TX_CHECKSUM")),
prepare_log_info_arr_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator, "PREPARE_INFO"))

View File

@ -2274,7 +2274,6 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
const SCN log_ts = log_cb->get_log_ts();
const palf::LSN log_lsn = log_cb->get_lsn();
const ObTxCbArgArray &cb_arg_array = log_cb->get_cb_arg_array();
ObTxBufferNodeArray tmp_array;
if (OB_FAIL(common_on_success_(log_cb))) {
TRANS_LOG(WARN, "common_on_success_ failed", K(ret));
@ -2446,12 +2445,13 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
}
} else {
const NotifyType type = NotifyType::ON_ABORT;
tmp_array.reset();
if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) {
TRANS_LOG(WARN, "set end log ts failed", K(ret));
} else if (OB_FAIL(gen_total_mds_array_(tmp_array))) {
} else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_,
true /*need_merge_cache*/,
true /*allow_log_overflow*/))) {
TRANS_LOG(WARN, "gen total mds array failed", K(ret));
} else if (OB_FAIL(notify_data_source_(type, log_ts, false, tmp_array))) {
} else if (OB_FAIL(notify_data_source_(type, log_ts, false, mds_cache_.get_final_notify_array()))) {
TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this));
}
ObTwoPhaseCommitLogType two_phase_log_type;
@ -3294,8 +3294,8 @@ int ObPartTransCtx::submit_commit_log_()
palf::LSN prev_lsn;
bool has_redo = false;
ObRedoLogSubmitHelper helper;
ObTxBufferNodeArray multi_source_data;
const int64_t replay_hint = trans_id_.get_id();
const bool local_tx = is_local_tx_();
using LogBarrierType = logservice::ObReplayBarrierType;
LogBarrierType commit_info_log_barrier = LogBarrierType::NO_NEED_BARRIER;
@ -3303,13 +3303,17 @@ int ObPartTransCtx::submit_commit_log_()
|| get_downstream_state() == ObTxState::ABORT) {
ret = OB_TRANS_KILLED;
TRANS_LOG(WARN, "tx has been aborting, can not submit prepare log", K(ret));
} else if (OB_FAIL(gen_final_mds_array_(multi_source_data))) {
TRANS_LOG(WARN, "gen total multi source data failed", KR(ret), K(*this));
} else if (OB_FAIL(mds_cache_.reserve_final_notify_array(exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "reserve mds cache memory failed", KR(ret), K(*this));
} else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_,
true /*need_merge_cache*/,
false /*allow_log_overflow*/))) {
TRANS_LOG(WARN, "generate final notify array failed", K(ret), K(mds_cache_), KPC(this));
} else {
bool log_block_inited = false;
int64_t suggested_buf_size = ObTxAdaptiveLogBuf::NORMAL_LOG_BUF_SIZE;
if (local_tx &&
multi_source_data.count() == 0 &&
mds_cache_.get_final_notify_array().count() == 0 &&
// 512B
((mt_ctx_.get_pending_log_size() < ObTxAdaptiveLogBuf::MIN_LOG_BUF_SIZE / 4) ||
// for corner case test
@ -3397,9 +3401,8 @@ int ObPartTransCtx::submit_commit_log_()
collapsed_checksum,
checksum_sig,
exec_info_.incremental_participants_,
multi_source_data, exec_info_.trans_type_, prev_lsn,
coord_prepare_info_arr_,
prev_log_type);
mds_cache_.get_final_notify_array(), exec_info_.trans_type_, prev_lsn,
coord_prepare_info_arr_, prev_log_type);
ObTxLogCb *log_cb = NULL;
bool redo_log_submitted = false;
LogBarrierType commit_log_barrier_type = LogBarrierType::NO_NEED_BARRIER;
@ -3553,16 +3556,19 @@ int ObPartTransCtx::submit_abort_log_()
int ret = OB_SUCCESS;
ObTxLogCb *log_cb = NULL;
ObTxLogBlock log_block;
ObTxBufferNodeArray tmp_array;
const int64_t replay_hint = trans_id_.get_id();
using LogBarrierType = logservice::ObReplayBarrierType;
logservice::ObReplayBarrierType barrier = LogBarrierType::NO_NEED_BARRIER;
if (OB_FAIL(gen_final_mds_array_(tmp_array, false))) {
if (OB_FAIL(mds_cache_.reserve_final_notify_array(exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "reserve final notify array failed", K(ret), K(mds_cache_), KPC(this));
} else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_,
true /*need_merge_cache*/,
false /*allow_log_overflow*/))) {
TRANS_LOG(WARN, "gen abort mds array failed", K(ret));
}
ObTxAbortLog abort_log(tmp_array);
ObTxAbortLog abort_log(mds_cache_.get_final_notify_array());
if (OB_SUCC(ret)) {
if ((exec_info_.multi_data_source_.count() > 0 || mds_cache_.count() > 0)) {
@ -5617,19 +5623,18 @@ int ObPartTransCtx::replay_abort(const ObTxAbortLog &abort_log,
}
if (OB_SUCC(ret)) {
// we must notify mds tx_end before invoking trans_replay_abort_ for clearing tablet lock
ObTxBufferNodeArray tmp_array;
if (OB_FAIL(gen_total_mds_array_(tmp_array))) {
if (OB_FAIL(mds_cache_.generate_final_notify_array(
exec_info_.multi_data_source_, true /*need_merge_cache*/, true /*allow_log_overflow*/))) {
TRANS_LOG(WARN, "gen total mds array failed", K(ret));
} else if (OB_FAIL(notify_data_source_(NotifyType::TX_END,
timestamp,
true,
} else if (OB_FAIL(notify_data_source_(NotifyType::TX_END, timestamp, true,
exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "notify data source for TX_END failed", KR(ret), K(*this));
} else if (OB_FAIL(trans_replay_abort_(timestamp))) {
TRANS_LOG(WARN, "transaction replay end error", KR(ret), "context", *this);
} else if (OB_FAIL(trans_clear_())) {
TRANS_LOG(WARN, "transaction clear error", KR(ret), "context", *this);
} else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT, timestamp, true, tmp_array))) {
} else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT, timestamp, true,
mds_cache_.get_final_notify_array()))) {
TRANS_LOG(WARN, "notify data source failed", KR(ret), K(abort_log));
} else if ((!ctx_tx_data_.is_read_only()) && OB_FAIL(ctx_tx_data_.insert_into_tx_table())) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
@ -6423,35 +6428,16 @@ const common::ObAddr &ObPartTransCtx::get_scheduler() const
return exec_info_.scheduler_;
}
int ObPartTransCtx::gen_final_mds_array_(ObTxBufferNodeArray &array, bool is_committing) const
int ObPartTransCtx::gen_total_mds_array_(ObTxBufferNodeArray &mds_array)
{
int ret = OB_SUCCESS;
// If the is_committing is true, some redo log have not confirmed.
if (OB_FAIL(array.assign(exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "assign multi source data failed", KR(ret), K(*this));
} else if (is_committing && OB_FAIL(mds_cache_.copy_to(array))) {
TRANS_LOG(WARN, "copy from mds_cache_ failed", K(ret));
}
if (array.get_serialize_size() > ObTxMultiDataSourceLog::MAX_MDS_LOG_SIZE) {
TRANS_LOG(WARN, "MDS array is overflow, use empty MDS array", K(array.get_serialize_size()));
array.reset();
}
return ret;
}
int ObPartTransCtx::gen_total_mds_array_(ObTxBufferNodeArray &mds_array) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(mds_array.assign(exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "assign multi source data failed", KR(ret), K(*this));
} else if (OB_FAIL(mds_cache_.copy_to(mds_array))) {
if (OB_FAIL(mds_cache_.generate_final_notify_array(
exec_info_.multi_data_source_, true /*need_merge_cache*/, true /*allow_log_overflow*/))) {
TRANS_LOG(WARN, "generate final notify array failed", K(ret), KPC(this));
} else if (OB_FAIL(mds_array.assign(mds_cache_.get_final_notify_array()))) {
TRANS_LOG(WARN, "assign multi source data failed", KR(ret), K(*this));
}
return ret;
}
@ -6837,16 +6823,16 @@ int ObPartTransCtx::prepare_mul_data_source_tx_end_(bool is_commit)
int ret = OB_SUCCESS;
if (OB_SUCC(ret)) {
ObTxBufferNodeArray tmp_array;
if (is_commit
&& mds_cache_.count() > 0
&& OB_FAIL(submit_log_impl_(ObTxLogType::TX_MULTI_DATA_SOURCE_LOG))) {
if (is_commit && mds_cache_.count() > 0
&& OB_FAIL(submit_log_impl_(ObTxLogType::TX_MULTI_DATA_SOURCE_LOG))) {
TRANS_LOG(WARN, "submit multi data souce log failed", K(ret));
} else if (OB_FAIL(gen_total_mds_array_(tmp_array))) {
} else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_,
true /*need_merge_cache*/,
true /*allow_log_overflo*/))) {
TRANS_LOG(WARN, "copy total mds array failed", K(ret));
} else if (OB_FAIL(notify_data_source_(NotifyType::TX_END, SCN(), false,
tmp_array))) {
mds_cache_.get_final_notify_array()))) {
TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this));
}
}
@ -8420,8 +8406,8 @@ int ObPartTransCtx::do_force_kill_tx_()
if (get_downstream_state() >= ObTxState::COMMIT) {
// do nothing
} else if (OB_FAIL(gen_total_mds_array_(tmp_array))) {
TRANS_LOG(WARN, "gen total mds array failed", KR(ret), K(*this));
// } else if (OB_FAIL(gen_total_mds_array_(tmp_array))) {
// TRANS_LOG(WARN, "gen total mds array failed", KR(ret), K(*this));
// } else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT,
// ctx_tx_data_.get_end_log_ts() /*invalid_scn*/, false,
// tmp_array, true /*is_force_kill*/))) {
@ -8501,16 +8487,16 @@ int ObPartTransCtx::on_local_abort_tx_()
{
int ret = OB_SUCCESS;
ObTxBufferNodeArray tmp_array;
if (OB_FAIL(tx_end_(false /*commit*/))) {
TRANS_LOG(WARN, "trans end error", KR(ret), "context", *this);
} else if (OB_FAIL(trans_clear_())) {
TRANS_LOG(WARN, "local tx clear error", KR(ret), K(*this));
} else if (OB_FAIL(gen_total_mds_array_(tmp_array))) {
} else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_,
true /*need_merge_cache*/,
true /*allow_log_overflow*/))) {
TRANS_LOG(WARN, "gen total mds array failed", KR(ret), K(*this));
} else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT, ctx_tx_data_.get_end_log_ts(), false,
tmp_array))) {
mds_cache_.get_final_notify_array()))) {
TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this));
} else if (FALSE_IT(set_durable_state_(ObTxState::ABORT))) {
@ -8526,6 +8512,7 @@ int ObPartTransCtx::on_local_abort_tx_()
return ret;
}
int ObPartTransCtx::dump_2_text(FILE *fd)
{
int ret = OB_SUCCESS;

View File

@ -618,8 +618,7 @@ private:
const bool for_replay,
const ObTxBufferNodeArray &notify_array,
const bool is_force_kill = false);
int gen_final_mds_array_(ObTxBufferNodeArray &array, bool is_committing = true) const;
int gen_total_mds_array_(ObTxBufferNodeArray &mds_array) const;
int gen_total_mds_array_(ObTxBufferNodeArray &mds_array);
int deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
ObTxBufferNodeArray &incremental_array,
bool need_replace = false);

View File

@ -352,7 +352,47 @@ int ObTxMDSCache::decide_cache_state_log_mds_barrier_type(
return ret;
}
int ObTxMDSCache::copy_to(ObTxBufferNodeArray &tmp_array) const
int ObTxMDSCache::reserve_final_notify_array(const ObTxBufferNodeArray &mds_durable_arr)
{
int ret = OB_SUCCESS;
if (OB_FAIL(final_notify_array_.reserve(mds_list_.size() + mds_durable_arr.count()))) {
TRANS_LOG(WARN, "reserve notify array space failed", K(ret), K(mds_list_.size()),
K(mds_durable_arr.count()));
}
return ret;
}
int ObTxMDSCache::generate_final_notify_array(const ObTxBufferNodeArray &mds_durable_arr,
bool need_merge_cache,
bool allow_log_overflow)
{
int ret = OB_SUCCESS;
if (OB_FAIL(final_notify_array_.assign(mds_durable_arr))) {
TRANS_LOG(WARN, "assign mds_durable_arr failed", K(ret), K(mds_durable_arr.count()),
K(final_notify_array_.get_capacity()), KPC(this));
} else if (need_merge_cache) {
if (OB_FAIL(copy_to_(final_notify_array_))) {
TRANS_LOG(WARN, "merge mds_cache into final_notify_array failed", K(ret),
K(mds_durable_arr.count()), K(final_notify_array_.get_capacity()), KPC(this));
}
}
if (!allow_log_overflow) {
if (final_notify_array_.get_serialize_size() > ObTxMultiDataSourceLog::MAX_MDS_LOG_SIZE) {
TRANS_LOG(WARN, "MDS array is overflow, use empty MDS array",
K(final_notify_array_.get_serialize_size()), K(mds_durable_arr.count()), KPC(this));
final_notify_array_.reuse();
}
}
return ret;
}
int ObTxMDSCache::copy_to_(ObTxBufferNodeArray &tmp_array) const
{
int ret = OB_SUCCESS;

View File

@ -50,7 +50,11 @@ typedef common::hash::ObHashMap<uint64_t, ObMDSMemStat> ObTxMDSMemStatHash;
class ObTxMDSCache
{
public:
ObTxMDSCache(TransModulePageAllocator &allocator) : mds_list_(allocator) { reset(); }
ObTxMDSCache(TransModulePageAllocator &allocator)
: mds_list_(allocator), final_notify_array_(OB_MALLOC_NORMAL_BLOCK_SIZE, allocator)
{
reset();
}
int init(const int64_t tenant_id, const share::ObLSID ls_id, const ObTransID tx_id);
void reset();
void destroy();
@ -77,7 +81,12 @@ public:
const ObTxLogType state_log_type,
logservice::ObReplayBarrierType &cache_final_barrier_type);
int earse_from_cache(const ObTxBufferNode &node) { return mds_list_.erase(node); }
int copy_to(ObTxBufferNodeArray &tmp_array) const;
int reserve_final_notify_array(const ObTxBufferNodeArray &mds_durable_arr);
int generate_final_notify_array(const ObTxBufferNodeArray &mds_durable_arr,
bool need_merge_cache,
bool allow_log_overflow);
ObTxBufferNodeArray &get_final_notify_array() { return final_notify_array_; }
int64_t get_unsubmitted_size() const { return unsubmitted_size_; }
int64_t count() const { return mds_list_.size(); }
@ -93,6 +102,9 @@ public:
TO_STRING_KV(K(unsubmitted_size_), K(mds_list_.size()), K(max_register_no_));
private:
int copy_to_(ObTxBufferNodeArray &tmp_array) const;
private:
// TransModulePageAllocator allocator_;
uint64_t max_register_no_;
@ -100,6 +112,7 @@ private:
int64_t unsubmitted_size_;
ObTxBufferNodeList mds_list_;
ObTxBufferNodeList::iterator submitted_iterator_;
ObTxBufferNodeArray final_notify_array_;
#ifdef ENABLE_DEBUG_LOG
int64_t tenant_id_;