Files
oceanbase/unittest/storage/tx/ob_mock_2pc_ctx.h

179 lines
6.7 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_UNITTEST_STORAGE_TX_OB_MOCK_2PC_CTX
#define OCEANBASE_UNITTEST_STORAGE_TX_OB_MOCK_2PC_CTX
#include <vector>
#include <deque>
#include <map>
#include "ob_mailbox.h"
#include "lib/ob_errno.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/utility/ob_print_utils.h"
#include "lib/lock/ob_spin_lock.h"
#include "storage/tx/ob_committer_define.h"
#include "storage/tx/ob_one_phase_committer.h"
#include "storage/tx/ob_two_phase_committer.h"
namespace oceanbase
{
namespace common
{
template <>
int64_t to_string<std::vector<int64_t>>(const std::vector<int64_t> &obj, char *buffer, const int64_t buffer_size);
}
namespace transaction
{
class MockObParticipants : public std::vector<int64_t>
{
public:
int64_t to_string(char *buffer, const int64_t size) const;
};
class MockObLogQueue : public std::deque<ObTwoPhaseCommitLogType>
{
public:
int64_t to_string(char *buffer, const int64_t size) const;
};
// ObTxCtx inherit ObTxCycleTwoPhaseCommitter, ObTxOnePhaseCommitter and ObITxCommitter.
// It organize user interface based on ObITxCommitter. and implements it using both two
// transaction committer, ObTxCycleTwoPhaseCommitter and ObTxOnePhaseCommitter based on
// participants number
class MockOb2pcCtx : public ObTxCycleTwoPhaseCommitter,
public ObMailHandler<ObTwoPhaseCommitMsgType>
{
public:
int init(ObMailBoxMgr<ObTwoPhaseCommitMsgType> *mgr);
virtual int handle(const ObMail<ObTwoPhaseCommitMsgType>& mail) override;
int64_t get_addr() { return addr_; }
int handle(const bool must_have = true);
int handle_all();
int apply();
// participant abnormally abort itself
int abort();
bool check_status_valid(const bool should_commit);
// transaction commit implementation. The transaction can be committed in one phase
// (with one consensus round and zero transport round latency) if it involves only
// one participant. Otherwise the transaction will invoke the circular two phase
// commit(with one consensus round and 2*H transport round latency)
int commit(const MockObParticipants& participants);
int64_t get_coordinator() { return coordinator_; }
bool is_real_downstream() { return true; }
INHERIT_TO_STRING_KV("ObTxCycleTwoPhaseCommitter",
ObTxCycleTwoPhaseCommitter,
K_(addr),
K_(mailbox),
K_(upstream_state),
K_(downstream_state),
K_(tx_state),
K_(log_queue),
K_(participants),
K_(intermediate_participants),
K_(coordinator),
K_(sender));
protected:
// Oceanbase's optimized transaction commit hook for its user.
// We promise the failure atomicity of the method and we mainly use these method
// to solve the concurrency control problem.
//
// For example, we use do/on_prepare to build the prepare version and mvcc_write
// to maintain read write conflict. We also use on_commit/abort to release lock and
// decide final transaction state. In Oceanbase's optimized, do_pre_commit is used to
// optimize single machine read latency and do/on_clear is used to maintain the state
// to recovery
virtual int do_prepare(bool &no_need_submit_log) override;
virtual int on_prepare() override;
virtual int do_pre_commit(bool& need_wait) override;
virtual int do_commit() override;
virtual int on_commit() override;
virtual int do_abort() override;
virtual int on_abort() override;
virtual int do_clear() override;
virtual int on_clear() override;
// for xa
virtual int reply_to_scheduler_for_sub2pc(int64_t msg_type) override;
// We implements the above concurrency control between transaction commit and transfer
// It's based on that if 2pc log is ahead transfer log, the transfer procedure need bring
// the 2pc state to the dst. Otherwise, the 2pc will invoke a cycle style commit
// which the parent node waits for the node's 2pc state response before responsing
// its 2pc state to his parent node.
virtual int64_t get_downstream_size() const override;
virtual int64_t get_self_id() override;
virtual Ob2PCRole get_2pc_role() const override;
virtual ObTxState get_downstream_state() const override;
virtual int set_downstream_state(const ObTxState state) override;
virtual ObTxState get_upstream_state() const override;
virtual int set_upstream_state(const ObTxState state) override;
virtual bool is_2pc_logging() const override;
virtual bool is_2pc_blocking() const { return false; };
// for xa
virtual bool is_sub2pc() const override;
virtual bool is_dup_tx() const override;
virtual int merge_intermediate_participants() override;
void add_intermediate_participants(const int64_t ls_id);
void print_downstream();
virtual bool is_real_upstream() override;
bool need_to_advance();
// Oceanbase's optimized log handler, if it returns success, the log is definitely proposed
// to the consensus layer and we can rely on its sequential commitment to submit the log
// without waiting consensus commit. While the log may be lost under consensus, so we need
// handle it with exception handler.
virtual int submit_log(const ObTwoPhaseCommitLogType& log_type) override;
// The msg poster is best effort, so we need rely on the timeout handler to retry the last
// message should be sent
// virtual int post_msg(const ObTwoPhaseCommitMsgType& msg_type) override;
virtual int post_msg(const ObTwoPhaseCommitMsgType& msg_type,
const int64_t participant_id) override;
private:
int64_t find_participant_id(int64_t participant_key);
virtual int apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) override;
public:
common::ObSpinLock latch_;
int64_t addr_;
ObMailBox<ObTwoPhaseCommitMsgType> mailbox_;
ObTxState upstream_state_;
ObTxState downstream_state_;
// final state for tx commit
ObTxState tx_state_;
MockObLogQueue log_queue_;
int64_t coordinator_;
int64_t sender_;
MockObParticipants participants_;
MockObParticipants intermediate_participants_;
ObMailBoxMgr<ObTwoPhaseCommitMsgType>* mailbox_mgr_;
};
} // transaction
} // oceanbase
#endif // OCEANBASE_UNITTEST_STORAGE_TX_OB_MOCK_2PC_CTX