From 16062de3b3269ebf0d009906231342ebca8dd9a0 Mon Sep 17 00:00:00 2001 From: gl0 Date: Mon, 24 Jan 2022 15:58:13 +0800 Subject: [PATCH] Limit the size of part trans ctx --- src/storage/transaction/ob_trans_define.h | 3 + src/storage/transaction/ob_trans_part_ctx.cpp | 80 ++++++++++++++++--- src/storage/transaction/ob_trans_part_ctx.h | 8 +- 3 files changed, 77 insertions(+), 14 deletions(-) diff --git a/src/storage/transaction/ob_trans_define.h b/src/storage/transaction/ob_trans_define.h index e1532ca35..dedb8de59 100644 --- a/src/storage/transaction/ob_trans_define.h +++ b/src/storage/transaction/ob_trans_define.h @@ -69,6 +69,9 @@ class AggreLogTask; class ObPartTransCtxMgr; class ObPartitionTransCtxMgr; +// Reserve 50KB to store the fields in trans ctx except undo_status, participants and redo_log +static const int64_t OB_MAX_TRANS_SERIALIZE_SIZE = common::OB_MAX_USER_ROW_LENGTH - 51200; + class ObTransErrsim { public: static inline bool is_memory_errsim() diff --git a/src/storage/transaction/ob_trans_part_ctx.cpp b/src/storage/transaction/ob_trans_part_ctx.cpp index aeb2a52bb..db2ee3297 100644 --- a/src/storage/transaction/ob_trans_part_ctx.cpp +++ b/src/storage/transaction/ob_trans_part_ctx.cpp @@ -175,6 +175,8 @@ int ObPartTransCtx::init(const uint64_t tenant_id, const ObTransID& trans_id, co } is_listener_ = false; listener_handler_ = NULL; + ctx_serialize_size_ = undo_status_.get_serialize_size() + partition_log_info_arr_.get_serialize_size() + + prev_redo_log_ids_.get_serialize_size(); } if (OB_FAIL(ret)) { if (NULL != redo_sync_task_) { @@ -388,6 +390,7 @@ void ObPartTransCtx::reset() last_redo_log_mutator_size_ = 0; has_write_or_replay_mutator_redo_log_ = false; is_in_redo_with_prepare_ = false; + ctx_serialize_size_ = 0; } int ObPartTransCtx::construct_context(const ObTransMsg& msg) @@ -1209,7 +1212,7 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg) case OB_TRANS_CLEAR_REQUEST: { if (OB_FAIL(set_scheduler_(msg.get_scheduler()))) { TRANS_LOG(WARN, "set scheduler error", KR(ret), K(msg)); - } else if (OB_FAIL(set_participants_(msg.get_participants()))) { + } else if (OB_FAIL(calc_serialize_size_and_set_participants_(msg.get_participants()))) { TRANS_LOG(WARN, "set participants error", KR(ret), K(msg)); } else if (OB_FAIL(handle_trans_clear_request_(msg))) { TRANS_LOG(WARN, "handle trans clear request error", KR(ret), K(msg)); @@ -1248,7 +1251,7 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg) TRANS_LOG(WARN, "set scheduler error", KR(ret), K(msg)); } else if (OB_FAIL(set_coordinator_(msg.get_coordinator()))) { TRANS_LOG(WARN, "set coordinator error", KR(ret), K(msg)); - } else if (OB_FAIL(set_participants_(msg.get_participants()))) { + } else if (OB_FAIL(calc_serialize_size_and_set_participants_(msg.get_participants()))) { TRANS_LOG(WARN, "set participants error", KR(ret), K(msg)); } else if (OB_FAIL(set_xid_(msg.get_xid()))) { TRANS_LOG(WARN, "set xid error", KR(ret), K(msg)); @@ -1296,7 +1299,7 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg) TRANS_LOG(WARN, "set scheduler error", KR(ret), K(msg)); } else if (OB_FAIL(set_coordinator_(msg.get_coordinator()))) { TRANS_LOG(WARN, "set coordinator error", KR(ret), K(msg)); - } else if (OB_FAIL(set_participants_(msg.get_participants()))) { + } else if (OB_FAIL(calc_serialize_size_and_set_participants_(msg.get_participants()))) { TRANS_LOG(WARN, "set participants error", KR(ret), K(msg)); } else { // do nothing @@ -1680,7 +1683,7 @@ int ObPartTransCtx::on_sync_log_success( // The log is completed, we need verify the txn checksum need_checksum_ = true; } - if (OB_FAIL(prev_redo_log_ids_.push_back(log_id))) { + if (OB_FAIL(calc_serialize_size_and_set_redo_log_(log_id))) { TRANS_LOG(WARN, "sp redo log id push back error", KR(ret), "context", *this, K(log_id)); } else if (!not_need_write_next_log_(log_type) && OB_FAIL(submit_log_task_(OB_LOG_SP_TRANS_COMMIT, has_redo_log))) { @@ -1788,7 +1791,7 @@ int ObPartTransCtx::on_sync_log_success( if (OB_LOG_TRANS_REDO == log_type) { start_us = ObTimeUtility::fast_current_time(); // record the redo log id - if (!is_xa_last_empty_redo_log_() && OB_FAIL(prev_redo_log_ids_.push_back(log_id))) { + if (!is_xa_last_empty_redo_log_() && OB_FAIL(calc_serialize_size_and_set_redo_log_(log_id))) { TRANS_LOG(WARN, "redo log id push back error", KR(ret), "context", *this, K(log_id)); } else if (not_need_write_next_log_(log_type)) { // No need to write log for dup table in order to prevent the leader @@ -1950,7 +1953,7 @@ int ObPartTransCtx::on_sync_log_success( need_checksum_ = true; } start_us = ObTimeUtility::fast_current_time(); - if (OB_FAIL(prev_redo_log_ids_.push_back(log_id))) { + if (OB_FAIL(calc_serialize_size_and_set_redo_log_(log_id))) { TRANS_LOG(WARN, "redo log id push back error", KR(ret), "context", *this, K(log_id)); } else if ((OB_LOG_TRANS_STATE & log_type) != 0) { // do nothing @@ -7116,7 +7119,7 @@ int ObPartTransCtx::post_stmt_response_( status, request_id_))) { TRANS_LOG(WARN, "message init error", K(ret), K_(scheduler), K_(tmp_scheduler), K(msg_type)); - // 将request的发送时间戳记录到response中,用于scheduler对消息超时的校验 + // record request timestamp into response for checking timeout in scheduler } else if (OB_FAIL(msg.set_msg_timeout(request_timeout))) { TRANS_LOG(INFO, "set message start timestamp error", @@ -7856,7 +7859,7 @@ int ObPartTransCtx::handle_2pc_local_prepare_request(const int64_t request_id, c TRANS_LOG(WARN, "set scheduler error", K(ret), K(scheduler), "context", *this); } else if (OB_FAIL(set_coordinator_(coordinator))) { TRANS_LOG(WARN, "set coordinator error", K(ret), K(coordinator), "context", *this); - } else if (OB_FAIL(set_participants_(participants))) { + } else if (OB_FAIL(calc_serialize_size_and_set_participants_(participants))) { TRANS_LOG(WARN, "set participants error", K(ret), K(participants), "context", *this); } else if (Ob2PCState::INIT != get_state_()) { ret = OB_EAGAIN; @@ -8036,7 +8039,7 @@ int ObPartTransCtx::handle_2pc_pre_prepare_request(const int64_t prepare_version TRANS_LOG(WARN, "set scheduler error", KR(ret), K(scheduler), "context", *this); } else if (OB_FAIL(set_coordinator_(coordinator))) { TRANS_LOG(WARN, "set coordinator error", KR(ret), K(coordinator), "context", *this); - } else if (OB_FAIL(set_participants_(participants))) { + } else if (OB_FAIL(calc_serialize_size_and_set_participants_(participants))) { TRANS_LOG(WARN, "set participants error", KR(ret), K(participants), "context", *this); } else if (Ob2PCState::INIT != get_state_()) { ret = OB_EAGAIN; @@ -8373,7 +8376,7 @@ int ObPartTransCtx::handle_2pc_request(const ObTrxMsgBase& msg, const int64_t ms TRANS_LOG(WARN, "set scheduler error", K(ret), K(*req)); } else if (OB_FAIL(set_coordinator_(req->coordinator_))) { TRANS_LOG(WARN, "set coordinator error", K(ret), K(*req)); - } else if (OB_FAIL(set_participants_(req->participants_))) { + } else if (OB_FAIL(calc_serialize_size_and_set_participants_(req->participants_))) { TRANS_LOG(WARN, "set participants error", K(ret), K(*req)); } else { // do nothing @@ -8427,7 +8430,7 @@ int ObPartTransCtx::handle_2pc_request(const ObTrxMsgBase& msg, const int64_t ms // TRANS_LOG(WARN, "set scheduler error", K(ret), K(*req)); if (OB_FAIL(set_coordinator_(req->coordinator_))) { TRANS_LOG(WARN, "set coordinator error", K(ret), K(*req)); - } else if (OB_FAIL(set_participants_(req->participants_))) { + } else if (OB_FAIL(calc_serialize_size_and_set_participants_(req->participants_))) { TRANS_LOG(WARN, "set participants error", K(ret), K(*req)); } else if (OB_FAIL(set_xid_(req->xid_))) { TRANS_LOG(WARN, "set xid error", K(ret), K(*this), K(*req)); @@ -10917,7 +10920,7 @@ int ObPartTransCtx::rollback_to_(const int32_t sql_no) } if (OB_SUCC(ret)) { - if (OB_FAIL(undo_status_.undo(sql_no, curr_sql_no))) { + if (OB_FAIL(calc_serialize_size_and_set_undo_(sql_no, curr_sql_no))) { TRANS_LOG(WARN, "record rollback action failed", K(ret), K(sql_no), K(curr_sql_no)); } } @@ -12079,5 +12082,58 @@ void ObPartTransCtx::DEBUG_SYNC_slow_txn_during_2pc_prepare_phase_for_physical_b } } +int ObPartTransCtx::calc_serialize_size_and_set_redo_log_(const int64_t log_id) +{ + int ret = OB_SUCCESS; + if ((ctx_serialize_size_ += serialization::encoded_length_vi64(log_id)) > OB_MAX_TRANS_SERIALIZE_SIZE) { + ret = OB_SIZE_OVERFLOW; + TRANS_LOG(WARN, "size overflow when set redo log.", KR(ret), K(ctx_serialize_size_), K(log_id)); + } else if (OB_FAIL(prev_redo_log_ids_.push_back(log_id))) { + ctx_serialize_size_ -= serialization::encoded_length_vi64(log_id); + TRANS_LOG(WARN, "sp redo log id push back error", KR(ret), "context", *this, K(log_id)); + } else { + // push back redo log success + } + return ret; +} + +int ObPartTransCtx::calc_serialize_size_and_set_participants_(const ObPartitionArray &participants) +{ + int ret = OB_SUCCESS; + if ((ctx_serialize_size_ += participants.get_serialize_size()) > OB_MAX_TRANS_SERIALIZE_SIZE) { + set_status_(OB_TRANS_NEED_ROLLBACK); + ret = OB_SIZE_OVERFLOW; + TRANS_LOG(WARN, + "size overflow when set participants.", + KR(ret), + K(ctx_serialize_size_), + K(participants.get_serialize_size())); + } else if (OB_FAIL(set_participants_(participants))) { + ctx_serialize_size_ -= participants.get_serialize_size(); + TRANS_LOG(WARN, "set participants error", KR(ret), K(participants)); + } + return ret; +} + +int ObPartTransCtx::calc_serialize_size_and_set_undo_(const int64_t undo_to, const int64_t undo_from) +{ + int ret = OB_SUCCESS; + ObUndoAction undo_action(undo_to, undo_from); + if ((ctx_serialize_size_ += undo_action.get_serialize_size()) > OB_MAX_TRANS_SERIALIZE_SIZE) { + set_status_(OB_TRANS_NEED_ROLLBACK); + ret = OB_SIZE_OVERFLOW; + TRANS_LOG(WARN, + "size overflow when set undo action", + KR(ret), + K(ctx_serialize_size_), + K(ctx_serialize_size_), + K(OB_MAX_TRANS_SERIALIZE_SIZE)); + } else if (OB_FAIL(undo_status_.undo(undo_to, undo_from))) { + ctx_serialize_size_ -= undo_action.get_serialize_size(); + TRANS_LOG(WARN, "record rollback action failed", K(ret), K(undo_to), K(undo_from)); + } + return ret; +} + } // namespace transaction } // namespace oceanbase diff --git a/src/storage/transaction/ob_trans_part_ctx.h b/src/storage/transaction/ob_trans_part_ctx.h index 7d99de8f9..17269b61f 100644 --- a/src/storage/transaction/ob_trans_part_ctx.h +++ b/src/storage/transaction/ob_trans_part_ctx.h @@ -402,7 +402,7 @@ public: K(mt_ctx_.get_checksum_log_ts()), K_(is_changing_leader), K_(has_trans_state_log), K_(is_trans_state_sync_finished), K_(status), K_(same_leader_batch_partitions_count), K_(is_hazardous_ctx), K(mt_ctx_.get_callback_count()), K_(in_xa_prepare_state), K_(is_listener), K_(last_replayed_redo_log_id), - K_(status), K_(is_xa_trans_prepared)); + K_(status), K_(is_xa_trans_prepared), K_(ctx_serialize_size)); public: static const int64_t OP_LOCAL_NUM = 16; @@ -608,6 +608,9 @@ private: bool is_xa_last_empty_redo_log_() const; int fake_kill_(const int64_t terminate_log_ts); int kill_v2_(const int64_t terminate_log_ts); + int calc_serialize_size_and_set_redo_log_(const int64_t log_id); + int calc_serialize_size_and_set_participants_(const ObPartitionArray &participants); + int calc_serialize_size_and_set_undo_(const int64_t undo_to, const int64_t undo_from); private: DISALLOW_COPY_AND_ASSIGN(ObPartTransCtx); @@ -623,7 +626,7 @@ private: private: bool is_inited_; ObIClogAdapter* clog_adapter_; - ObTransSubmitLogCb submit_log_cb_; +ObTransSubmitLogCb submit_log_cb_; memtable::ObMemtableCtx mt_ctx_; memtable::ObIMemtableCtxFactory* mt_ctx_factory_; ObTransTaskWorker* big_trans_worker_; @@ -743,6 +746,7 @@ private: bool is_xa_trans_prepared_; bool has_write_or_replay_mutator_redo_log_; bool is_in_redo_with_prepare_; + int64_t ctx_serialize_size_; }; #if defined(__x86_64__)