From fc1052428f1a1ddfcd1dc640506a70cce34ace27 Mon Sep 17 00:00:00 2001 From: handora Date: Tue, 15 Mar 2022 12:00:50 +0800 Subject: [PATCH] fix max majority too much when switch leader to follower --- src/clog/ob_log_sliding_window.cpp | 4 ++- src/clog/ob_partition_log_service.cpp | 5 ++++ src/clog/ob_partition_log_service.h | 2 ++ src/storage/ob_i_partition_group.h | 1 + src/storage/ob_partition_group.cpp | 20 ++++++++++---- src/storage/ob_partition_group.h | 1 + src/storage/transaction/ob_trans_part_ctx.cpp | 27 +++++++++++++++++++ src/storage/transaction/ob_trans_part_ctx.h | 1 + unittest/clog/mock_ob_partition_log_service.h | 5 ++++ .../storage/mockcontainer/mock_ob_partition.h | 1 + 10 files changed, 61 insertions(+), 6 deletions(-) diff --git a/src/clog/ob_log_sliding_window.cpp b/src/clog/ob_log_sliding_window.cpp index 61180ecb7..bf0aab6d7 100644 --- a/src/clog/ob_log_sliding_window.cpp +++ b/src/clog/ob_log_sliding_window.cpp @@ -3155,11 +3155,13 @@ int ObLogSlidingWindow::majority_cb( ret = OB_ERR_NULL_VALUE; } else { log_task = static_cast(log_data); - try_update_max_majority_log(log_id, log_task->get_submit_timestamp()); if (OB_FAIL(log_task->submit_log_succ_cb(partition_key_, log_id, batch_committed, batch_first_participant))) { CLOG_LOG(WARN, "submit log majority_cb failed", K(ret), K_(partition_key), K(log_id), K(batch_committed)); ret = OB_SUCCESS; } + if (ObLogType::OB_LOG_START_MEMBERSHIP != log_task->get_log_type()) { + try_update_max_majority_log(log_id, log_task->get_submit_timestamp()); + } } if (NULL != ref && OB_SUCCESS != (tmp_ret = sw_.revert(ref))) { CLOG_LOG(ERROR, "revert failed", K_(partition_key), K(tmp_ret)); diff --git a/src/clog/ob_partition_log_service.cpp b/src/clog/ob_partition_log_service.cpp index 0f1dd081c..2503deb9e 100644 --- a/src/clog/ob_partition_log_service.cpp +++ b/src/clog/ob_partition_log_service.cpp @@ -8161,6 +8161,11 @@ int ObPartitionLogService::process_check_rebuild_req( return ret; } +void ObPartitionLogService::try_update_max_majority_log(const uint64_t log_id, const int64_t log_ts) +{ + sw_.try_update_max_majority_log(log_id, log_ts); +} + void ObPartitionLogService::get_max_majority_log(uint64_t& log_id, int64_t& log_ts) const { sw_.get_max_majority_log(log_id, log_ts); diff --git a/src/clog/ob_partition_log_service.h b/src/clog/ob_partition_log_service.h index edbe89390..66417c458 100644 --- a/src/clog/ob_partition_log_service.h +++ b/src/clog/ob_partition_log_service.h @@ -476,6 +476,7 @@ public: virtual int process_check_rebuild_req( const common::ObAddr& server, const uint64_t start_log_id, const int64_t cluster_id) = 0; virtual void get_max_majority_log(uint64_t& log_id, int64_t& log_ts) const = 0; + virtual void try_update_max_majority_log(const uint64_t log_id, const int64_t log_ts) = 0; virtual int set_archive_restore_state(const int16_t archive_restore_state) = 0; virtual uint64_t get_max_confirmed_log_id() const = 0; virtual bool is_archive_restoring() const = 0; @@ -700,6 +701,7 @@ public: virtual int process_check_rebuild_req( const common::ObAddr& server, const uint64_t start_log_id, const int64_t cluster_id) override; virtual void get_max_majority_log(uint64_t& log_id, int64_t& log_ts) const override; + virtual void try_update_max_majority_log(const uint64_t log_id, const int64_t log_ts) override; virtual uint64_t get_max_confirmed_log_id() const override; public: diff --git a/src/storage/ob_i_partition_group.h b/src/storage/ob_i_partition_group.h index 7cbeaaeff..761cc705f 100644 --- a/src/storage/ob_i_partition_group.h +++ b/src/storage/ob_i_partition_group.h @@ -523,6 +523,7 @@ public: virtual int reset_for_replay() = 0; virtual int inc_pending_batch_commit_count(memtable::ObMemtableCtx& mt_ctx, const int64_t log_ts) = 0; virtual int inc_pending_elr_count(memtable::ObMemtableCtx& mt_ctx, const int64_t log_ts) = 0; + virtual int update_max_majority_log(const uint64_t log_id, const int64_t log_ts) = 0; TO_STRING_KV(K_(ref_cnt)); protected: diff --git a/src/storage/ob_partition_group.cpp b/src/storage/ob_partition_group.cpp index d2f66c558..9d2e0ad48 100644 --- a/src/storage/ob_partition_group.cpp +++ b/src/storage/ob_partition_group.cpp @@ -3451,8 +3451,7 @@ int ObPartitionGroup::get_freeze_cut_(ObMemtable& frozen_memtable, const bool is // max_log_ts as overflow(the requirement from the storage layer). while // the data may already synced and we have no chance to mark the data // except traversing all data in the memtable. So we choose to mark the - // end_log_ts as the max_majority_log_ts as well. The detailed issue can - // be found in https://work.aone.alibaba-inc.com/issue/33865988 + // end_log_ts as the max_majority_log_ts as well. // // NB: we never maintain the max_mjority_log_ts for follower, so we just // use the variable for the corner case of leader transfer. @@ -3940,11 +3939,8 @@ int ObPartitionGroup::freeze(const bool emergency, const bool force, int64_t& fr ObPartitionGroupLockGuard guard(lock_, 0, PGLOCKSTORAGE); if (with_data_()) { - // https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/replica-overview - // FULL(F)/ READONLY(R)replica have sstable and memtable, need to be frozen ret = freeze_log_and_data_v2_(emergency, force, freeze_snapshot); } else { - // LOGONLY(L)replica or empty PG ret = freeze_log_(force); } @@ -6124,5 +6120,19 @@ int ObPartitionGroup::inc_pending_elr_count(memtable::ObMemtableCtx& mt_ctx, con return ret; } +int ObPartitionGroup::update_max_majority_log(const uint64_t log_id, const int64_t log_ts) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "Partition object not initialized", K(ret), K(is_inited_)); + } else { + pls_->try_update_max_majority_log(log_id, log_ts); + } + + return ret; +} + } // namespace storage } // end of namespace oceanbase diff --git a/src/storage/ob_partition_group.h b/src/storage/ob_partition_group.h index bfa720c71..51166a825 100644 --- a/src/storage/ob_partition_group.h +++ b/src/storage/ob_partition_group.h @@ -434,6 +434,7 @@ public: virtual int register_txs_change_leader(const common::ObAddr& server, ObTsWindows& changing_leader_windows) override; virtual int check_physical_split(bool& finished) override; + virtual int update_max_majority_log(const uint64_t log_id, const int64_t log_ts) override; TO_STRING_KV(K_(pkey), K_(replay_status), K_(partition_state)); private: diff --git a/src/storage/transaction/ob_trans_part_ctx.cpp b/src/storage/transaction/ob_trans_part_ctx.cpp index 64e6ed935..9aee24938 100644 --- a/src/storage/transaction/ob_trans_part_ctx.cpp +++ b/src/storage/transaction/ob_trans_part_ctx.cpp @@ -4779,11 +4779,38 @@ int ObPartTransCtx::replay_start_working_log(const int64_t timestamp, const uint TRANS_STAT_ABORT_TRANS_INC(tenant_id_); } + if (OB_SUCC(ret)) { + if (OB_FAIL(update_max_majority_log(log_id, timestamp))) { + TRANS_LOG(ERROR, "update max majority log failed", K(ret), K(*this)); + } + } + REC_TRANS_TRACE_EXT(tlog_, replay_start_working_log, OB_ID(ret), ret, OB_ID(uref), get_uref()); return ret; } +int ObPartTransCtx::update_max_majority_log(const uint64_t log_id, const int64_t log_ts) +{ + int ret = OB_SUCCESS; + storage::ObIPartitionGroupGuard pg_guard; + + if (OB_NOT_NULL(pg_)) { + if (OB_FAIL(pg_->update_max_majority_log(log_id, log_ts))) { + TRANS_LOG(WARN, "update max majority log error", K(*this)); + } + } else if (OB_FAIL(partition_service_->get_partition(self_, pg_guard))) { + TRANS_LOG(WARN, "get partition error", KR(ret), "context", *this); + } else if (NULL == pg_guard.get_partition_group()) { + TRANS_LOG(ERROR, "partition is null, unexpected error", KP(pg_guard.get_partition_group()), "context", *this); + ret = OB_ERR_UNEXPECTED; + } else if (OB_FAIL(pg_guard.get_partition_group()->update_max_majority_log(log_id, log_ts))) { + TRANS_LOG(WARN, "update max majority log error", K(*this)); + } + + return ret; +} + // The txn that have been prepared but not yet committed bool ObPartTransCtx::is_prepared() const { diff --git a/src/storage/transaction/ob_trans_part_ctx.h b/src/storage/transaction/ob_trans_part_ctx.h index 0266a8d27..170e96609 100644 --- a/src/storage/transaction/ob_trans_part_ctx.h +++ b/src/storage/transaction/ob_trans_part_ctx.h @@ -390,6 +390,7 @@ public: virtual int64_t get_part_trans_action() const override; int rollback_stmt(const int64_t from_sql_no, const int64_t to_sql_no); bool need_update_schema_version(const uint64_t log_id, const int64_t log_ts); + int update_max_majority_log(const uint64_t log_id, const int64_t log_ts); public: INHERIT_TO_STRING_KV("ObDistTransCtx", ObDistTransCtx, K_(snapshot_version), K_(local_trans_version), diff --git a/unittest/clog/mock_ob_partition_log_service.h b/unittest/clog/mock_ob_partition_log_service.h index 41e6383d9..72a00a05b 100644 --- a/unittest/clog/mock_ob_partition_log_service.h +++ b/unittest/clog/mock_ob_partition_log_service.h @@ -189,6 +189,11 @@ public: UNUSED(idc); return OB_SUCCESS; } + virtual void try_update_max_majority_log(const uint64_t log_id, const int64_t log_ts) + { + UNUSED(log_id); + UNUSED(log_ts); + } virtual int fetch_register_server_resp_v2(const common::ObAddr& sender, const bool is_assign_parent_succeed, const share::ObCascadMemberList& candidate_list, const int32_t msg_type) { diff --git a/unittest/storage/mockcontainer/mock_ob_partition.h b/unittest/storage/mockcontainer/mock_ob_partition.h index b63549a50..af24dc57a 100644 --- a/unittest/storage/mockcontainer/mock_ob_partition.h +++ b/unittest/storage/mockcontainer/mock_ob_partition.h @@ -249,6 +249,7 @@ public: MOCK_METHOD1(get_checkpoint, int(int64_t& checkpoint)); MOCK_METHOD1(update_last_checkpoint, int(const int64_t checkpoint)); MOCK_METHOD1(set_replay_checkpoint, int(const int64_t checkpoint)); + MOCK_METHOD2(update_max_majority_log, int(const uint64_t log_id, const int64_t log_ts)); // MOCK_METHOD1(get_replay_checkpoint, int(int64_t &checkpoint)); int get_replay_checkpoint(int64_t& checkpoint) {