/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #include #include #include "share/ob_define.h" #include "storage/ob_storage_log_type.h" #include "storage/transaction/ob_trans_log.h" #include "obcdc/src/ob_log_instance.h" #include "obcdc/src/ob_log_utils.h" // get_timestamp using namespace oceanbase; using namespace common; using namespace liboblog; using namespace transaction; using namespace storage; using namespace clog; namespace oceanbase { namespace unittest { // prepare log static const int64_t SP_PREPARE_TIMESTAMP = 10 * 1000 * 1000; // commit log static const int64_t SP_GLOBAL_TRANS_VERSION = 100; // SP Transaction log parameters struct TransParam2 { ObPartitionKey pkey_; ObTransID trans_id_; ObStartTransParam trans_param_; }; // Sp Transaction Log Generator class TransLogGenerator2 { public: TransLogGenerator2() : param_(), redo_(), commit_(), abort_() { } virtual ~TransLogGenerator2() { } public: void next_trans(const TransParam2 ¶m) { param_ = param; } const ObSpTransRedoLog& next_redo(const uint64_t log_id) { int err = OB_SUCCESS; uint64_t tenant_id = 100; const uint64_t cluster_id = 1000; redo_.reset(); ObVersion active_memstore_version(1); err = redo_.init(OB_LOG_SP_TRANS_REDO, param_.pkey_, param_.trans_id_, tenant_id, log_id, param_.trans_param_, cluster_id, active_memstore_version); EXPECT_EQ(OB_SUCCESS, err); ObTransMutator &mutator = redo_.get_mutator(); if (NULL == mutator.get_mutator_buf()) { mutator.init(true); } const char *data = "fly"; char *buf = static_cast(mutator.alloc(strlen(data))); strcpy(buf, data); return redo_; } const ObSpTransCommitLog& next_commit( const ObRedoLogIdArray &all_redos, const uint64_t redo_log_id) { int err = OB_SUCCESS; uint64_t tenant_id = 100; const uint64_t cluster_id = 1000; int64_t checksum = 0; ObVersion active_memstore_version(1); ObString trace_id; commit_.reset(); err = commit_.init(OB_LOG_SP_TRANS_COMMIT, param_.pkey_, tenant_id, param_.trans_id_, SP_GLOBAL_TRANS_VERSION, checksum, cluster_id, all_redos, param_.trans_param_, active_memstore_version, redo_log_id, trace_id); EXPECT_EQ(OB_SUCCESS, err); return commit_; } const ObSpTransAbortLog& next_abort() { int err = OB_SUCCESS; const uint64_t cluster_id = 1000; abort_.reset(); err = abort_.init(OB_LOG_SP_TRANS_ABORT, param_.pkey_, param_.trans_id_, cluster_id); EXPECT_EQ(OB_SUCCESS, err); return abort_; } const ObSpTransCommitLog& next_redo_with_commit( const ObRedoLogIdArray &all_redos, const uint64_t redo_log_id) { int err = OB_SUCCESS; uint64_t tenant_id = 100; const uint64_t cluster_id = 1000; int64_t checksum = 0; ObVersion active_memstore_version(1); ObString trace_id; commit_.reset(); err = commit_.init(OB_LOG_SP_TRANS_COMMIT, param_.pkey_, tenant_id, param_.trans_id_, SP_GLOBAL_TRANS_VERSION, checksum, cluster_id, all_redos, param_.trans_param_, active_memstore_version, redo_log_id, trace_id); EXPECT_EQ(OB_SUCCESS, err); // write redo log ObTransMutator &mutator = commit_.get_mutator(); if (NULL == mutator.get_mutator_buf()) { mutator.init(true); } const char *data = "fly"; char *buf = static_cast(mutator.alloc(strlen(data))); strcpy(buf, data); return commit_; } private: TransParam2 param_; ObSpTransRedoLog redo_; ObSpTransCommitLog commit_; ObSpTransAbortLog abort_; }; /* * Responsible for generating Sp transaction logs */ class SpTransLogEntryGeneratorBase { static const ObAddr SCHEDULER; public: // Pass in the ObTransID, which can be used to specify different transactions for the same partition SpTransLogEntryGeneratorBase(const ObPartitionKey &pkey, const ObTransID &trans_id) : pkey_(pkey), log_id_(0), remain_log_cnt_(0), is_commit_(false), param_(), trans_log_gen_(), redos_(), redo_cnt_(0), commit_log_id_(-1), data_len_(-1) { param_.pkey_ = pkey_; param_.trans_id_ = trans_id; param_.trans_param_.set_access_mode(ObTransAccessMode::READ_WRITE); param_.trans_param_.set_isolation(ObTransIsolation::READ_COMMITED); param_.trans_param_.set_type(ObTransType::TRANS_NORMAL); buf_ = new char[buf_len_]; EXPECT_TRUE(NULL != buf_); } virtual ~SpTransLogEntryGeneratorBase() { delete[] buf_; } // Generate normal Sp logs. redo, redo.... .redo, commit/abort // Call next_trans to start a new transaction // Call next_log_entry to get the number of redo entries in order by specifying the number of redo entries void next_trans(const int64_t redo_cnt, bool is_commit) { // total log count of normal trans = redo_log_count + 1(commit/abort log) remain_log_cnt_ = redo_cnt + 1; is_commit_ = is_commit; redos_.reset(); redo_cnt_ = redo_cnt; commit_log_id_ = -1; trans_log_gen_.next_trans(param_); } // Generate special Sp logs: redo, redo.... .redo, redo-commit (redo and commit logs in the same log entry) // call next_trans to start a new transaction // Call next_log_entry_with_redo_commit to get the number of redo entries in order by specifying void next_trans_with_redo_commit(const int64_t redo_cnt) { next_trans(redo_cnt, true); // redo-commit in the same log entrey, remain_log_cnt_reassigned remain_log_cnt_ = redo_cnt; } // Get next log entry. // get redo, redo..., commit/abort by order int next_log_entry(clog::ObLogEntry &log_entry) { int ret = OB_SUCCESS; if (1 < remain_log_cnt_) { next_redo_(log_id_, log_entry); // Store redo id. int err = redos_.push_back(log_id_); EXPECT_EQ(OB_SUCCESS, err); log_id_ += 1; remain_log_cnt_ -= 1; } else if (1 == remain_log_cnt_ && is_commit_) { commit_log_id_ = log_id_; next_commit_(commit_log_id_, log_entry); log_id_ += 1; remain_log_cnt_ -= 1; } else if (1 == remain_log_cnt_ && !is_commit_) { next_abort_(log_entry); log_id_ += 1; remain_log_cnt_ -= 1; } else { ret = OB_ITER_END; } return ret; } // Get next log entry. // get redo, redo...redo-commit by order int next_log_entry_with_redo_commit(clog::ObLogEntry &log_entry) { int ret = OB_SUCCESS; if (1 < remain_log_cnt_) { next_redo_(log_id_, log_entry); // Store redo id. int err = redos_.push_back(log_id_); EXPECT_EQ(OB_SUCCESS, err); log_id_ += 1; remain_log_cnt_ -= 1; } else if (1 == remain_log_cnt_) { // redo-commit commit_log_id_ = log_id_; next_redo_with_commit_(commit_log_id_, log_entry); log_id_ += 1; remain_log_cnt_ -= 1; } else { ret = OB_ITER_END; } return ret; } public: uint64_t get_log_id() { return log_id_; } protected: // Returns the redo log with the specified log_id void next_redo_(const uint64_t redo_log_id, clog::ObLogEntry &log_entry) { int err = OB_SUCCESS; // Gen trans log. const ObSpTransRedoLog &redo = trans_log_gen_.next_redo(redo_log_id); int64_t pos = 0; err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_SP_TRANS_REDO); EXPECT_EQ(OB_SUCCESS, err); err = serialization::encode_i64(buf_, buf_len_, pos, 0); EXPECT_EQ(OB_SUCCESS, err); err = redo.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); data_len_ = pos; // Gen entry header. ObLogEntryHeader header; header.generate_header(OB_LOG_SUBMIT, pkey_, redo_log_id, buf_, data_len_, get_timestamp(), get_timestamp(), ObProposalID(), get_timestamp(), ObVersion(0)); // Gen log entry. log_entry.generate_entry(header, buf_); } void next_commit_(uint64_t commit_log_id, clog::ObLogEntry &log_entry) { int err = OB_SUCCESS; // Gen trans log. const ObSpTransCommitLog &commit = trans_log_gen_.next_commit(redos_, 1); int64_t pos = 0; err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_SP_TRANS_COMMIT); EXPECT_EQ(OB_SUCCESS, err); err = serialization::encode_i64(buf_, buf_len_, pos, 0); EXPECT_EQ(OB_SUCCESS, err); err = commit.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); data_len_ = pos; // Gen entry header. ObLogEntryHeader header; // log submit timestamp using SP_PREPARE_TIMESTAMP, because for sp transactions, the partition task stores the prepare timestamp // commit log timestamp, for correctness verification header.generate_header(OB_LOG_SUBMIT, pkey_, commit_log_id, buf_, data_len_, get_timestamp(), get_timestamp(), ObProposalID(), SP_PREPARE_TIMESTAMP, ObVersion(0)); // Gen log entry. log_entry.generate_entry(header, buf_); } void next_abort_(clog::ObLogEntry &log_entry) { int err = OB_SUCCESS; // Gen trans log. const ObSpTransAbortLog &abort = trans_log_gen_.next_abort(); int64_t pos = 0; err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_SP_TRANS_ABORT); EXPECT_EQ(OB_SUCCESS, err); err = serialization::encode_i64(buf_, buf_len_, pos, 0); EXPECT_EQ(OB_SUCCESS, err); err = abort.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); data_len_ = pos; // Gen entry header. ObLogEntryHeader header; // log submit timestamp using SP_PREPARE_TIMESTAMP, because for sp transactions, the partition task stores the prepare timestamp // commit log timestamp, for correctness verification header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_, data_len_, get_timestamp(), get_timestamp(), ObProposalID(), SP_PREPARE_TIMESTAMP, ObVersion(0)); // Gen log entry. log_entry.generate_entry(header, buf_); } void next_redo_with_commit_(uint64_t commit_log_id, clog::ObLogEntry &log_entry) { int err = OB_SUCCESS; // Gen trans log. const ObSpTransCommitLog &commit = trans_log_gen_.next_redo_with_commit(redos_, 1); int64_t pos = 0; err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_SP_TRANS_COMMIT); EXPECT_EQ(OB_SUCCESS, err); err = serialization::encode_i64(buf_, buf_len_, pos, 0); EXPECT_EQ(OB_SUCCESS, err); err = commit.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); data_len_ = pos; // Gen entry header. ObLogEntryHeader header; // submit timestamp use SP_PREPARE_TIMESTAMP, because for sp_trans, prepare ts is stored in PartTransTask // commit log ts is used for check correctness. header.generate_header(OB_LOG_SUBMIT, pkey_, commit_log_id, buf_, data_len_, get_timestamp(), get_timestamp(), ObProposalID(), SP_PREPARE_TIMESTAMP, ObVersion(0)); // Gen log entry. log_entry.generate_entry(header, buf_); } protected: // Params. ObPartitionKey pkey_; uint64_t log_id_; int64_t remain_log_cnt_; // Indicates whether the current transaction has been committed or not bool is_commit_; // Gen. TransParam2 param_; TransLogGenerator2 trans_log_gen_; ObRedoLogIdArray redos_; int64_t redo_cnt_; // prepare log id and commit log id are same for sp trans uint64_t commit_log_id_; // Buf. int64_t data_len_; static const int64_t buf_len_ = 2 * _M_; char *buf_; }; /* * test missing redo log, When the commit log is read, the missing redo can be detected * * two case: * 1. redo, redo, redo...redo, commit * 2. redo, redo, redo...redo, redo-commit */ enum SpCaseType { SP_NORMAL_TRAN, SP_REDO_WITH_COMMIT_TRAN }; class SpTransLogEntryGenerator1 : public SpTransLogEntryGeneratorBase { public: SpTransLogEntryGenerator1(const ObPartitionKey &pkey, const ObTransID &trans_id) : SpTransLogEntryGeneratorBase(pkey, trans_id), is_first(false), miss_redo_cnt_(0) {} ~SpTransLogEntryGenerator1() {} public: // Specify the number of redo logs in redo_cnt, and the number of missing redo logs void next_trans_with_miss_redo(const int64_t redo_cnt, const int64_t miss_redo_cnt, SpCaseType type) { if (SP_NORMAL_TRAN == type) { next_trans(redo_cnt, true); } else if(SP_REDO_WITH_COMMIT_TRAN == type) { next_trans_with_redo_commit(redo_cnt); } else { } miss_redo_cnt_ = miss_redo_cnt; is_first = true; } int next_log_entry_missing_redo(SpCaseType type, clog::ObLogEntry &log_entry) { int ret = OB_SUCCESS; // add redo log to redos list for miss_redo_cnt_ logs before miss if (is_first) { for (int64_t idx = 0; idx < miss_redo_cnt_; idx++) { next_redo_(log_id_, log_entry); // Store redo id. int err = redos_.push_back(log_id_); EXPECT_EQ(OB_SUCCESS, err); log_id_ += 1; remain_log_cnt_ -= 1; } is_first = false; } if (SP_NORMAL_TRAN == type) { ret = next_log_entry(log_entry); } else if(SP_REDO_WITH_COMMIT_TRAN == type) { ret = next_log_entry_with_redo_commit(log_entry); } else { } return ret; } int next_miss_log_entry(const uint64_t miss_log_id, clog::ObLogEntry &miss_log_entry) { int ret = OB_SUCCESS; next_redo_(miss_log_id, miss_log_entry); return ret; } int get_commit_log_entry(SpCaseType type, clog::ObLogEntry &log_entry) { int ret = OB_SUCCESS; if (SP_NORMAL_TRAN == type) { next_commit_(commit_log_id_, log_entry); } else if(SP_REDO_WITH_COMMIT_TRAN == type) { next_redo_with_commit_(commit_log_id_, log_entry); } else { } return ret; } private: bool is_first; int64_t miss_redo_cnt_; }; } }