From 10706ab7d4bdff9ced17fc869b2e24bffcbbaaf0 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 7 Feb 2024 09:53:02 +0000 Subject: [PATCH] [MDS] mds table flush by order --- mittest/simple_server/test_mds_recover.cpp | 4 +- src/storage/CMakeLists.txt | 2 + .../mds_for_each_map_flush_operation.cpp | 43 +++ .../mds_for_each_map_flush_operation.h | 41 +++ .../multi_data_source/mds_table_base.h | 2 +- .../multi_data_source/mds_table_handle.h | 2 +- .../multi_data_source/mds_table_handle.ipp | 4 +- .../multi_data_source/mds_table_impl.h | 6 +- .../multi_data_source/mds_table_impl.ipp | 55 ++-- .../multi_data_source/mds_table_mgr.cpp | 163 +++++----- src/storage/multi_data_source/mds_table_mgr.h | 29 +- .../mds_table_order_flusher.cpp | 31 ++ .../mds_table_order_flusher.h | 135 ++++++++ .../mds_table_order_flusher.ipp | 168 ++++++++++ .../runtime_utility/common_define.h | 2 + src/storage/tablet/ob_tablet.cpp | 4 +- src/storage/tablet/ob_tablet.h | 2 +- unittest/storage/CMakeLists.txt | 1 + .../multi_data_source/test_mds_table.cpp | 20 +- .../test_mds_table_flush.cpp | 37 +-- .../test_mds_table_flusher.cpp | 302 ++++++++++++++++++ 21 files changed, 878 insertions(+), 175 deletions(-) create mode 100644 src/storage/multi_data_source/mds_for_each_map_flush_operation.cpp create mode 100644 src/storage/multi_data_source/mds_for_each_map_flush_operation.h create mode 100644 src/storage/multi_data_source/mds_table_order_flusher.cpp create mode 100644 src/storage/multi_data_source/mds_table_order_flusher.h create mode 100644 src/storage/multi_data_source/mds_table_order_flusher.ipp create mode 100644 unittest/storage/multi_data_source/test_mds_table_flusher.cpp diff --git a/mittest/simple_server/test_mds_recover.cpp b/mittest/simple_server/test_mds_recover.cpp index e671d6b0db..f55ae4b5f4 100644 --- a/mittest/simple_server/test_mds_recover.cpp +++ b/mittest/simple_server/test_mds_recover.cpp @@ -172,8 +172,10 @@ void do_flush_mds_table(share::ObLSID ls_id, ObTabletID tablet_id) ObTabletPointer *tablet_pointer = pointer_handle.get_resource_ptr(); // 4. 做flush动作 mds::MdsTableHandle handle; + share::SCN max_decided_scn; + ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_max_decided_scn(max_decided_scn)); ASSERT_EQ(OB_SUCCESS, tablet_pointer->get_mds_table(tablet_id, handle)); - ASSERT_EQ(OB_SUCCESS, handle.flush(share::SCN::max_scn())); + ASSERT_EQ(OB_SUCCESS, handle.flush(share::SCN::max_scn(), max_decided_scn)); ASSERT_EQ(true, handle.p_mds_table_base_->flushing_scn_.is_valid()); // 5. 等flush完成 share::SCN rec_scn = share::SCN::min_scn(); diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 1222bd6552..1cddcf4c75 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -648,8 +648,10 @@ ob_set_subtarget(ob_storage memtable_mvcc ob_set_subtarget(ob_storage multi_data_source multi_data_source/buffer_ctx.cpp multi_data_source/mds_ctx.cpp + multi_data_source/mds_for_each_map_flush_operation.cpp multi_data_source/mds_node.cpp multi_data_source/mds_table_mgr.cpp + multi_data_source/mds_table_order_flusher.cpp multi_data_source/mds_table_base.cpp multi_data_source/mds_writer.cpp multi_data_source/mds_table_handler.cpp diff --git a/src/storage/multi_data_source/mds_for_each_map_flush_operation.cpp b/src/storage/multi_data_source/mds_for_each_map_flush_operation.cpp new file mode 100644 index 0000000000..1d47e9f3e1 --- /dev/null +++ b/src/storage/multi_data_source/mds_for_each_map_flush_operation.cpp @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "mds_for_each_map_flush_operation.h" +#include "mds_table_base.h" + +namespace oceanbase +{ +namespace storage +{ +namespace mds +{ + +bool FlushOp::operator()(const ObTabletID &, MdsTableBase *&mds_table) +{ + int ret = OB_SUCCESS; + if (mds_table->is_switched_to_empty_shell()) { + MDS_LOG_RET(INFO, ret, "skip empty shell tablet mds_table flush", + KPC(mds_table), K(scan_mds_table_cnt_), K_(max_consequent_callbacked_scn)); + } else if (OB_FAIL(mds_table->flush(do_flush_limit_scn_, max_consequent_callbacked_scn_))) { + MDS_LOG_RET(WARN, ret, "flush mds table failed", + KR(ret), KPC(mds_table), K_(scan_mds_table_cnt), K_(max_consequent_callbacked_scn)); + if (OB_SIZE_OVERFLOW == ret) { + is_dag_full_ = true; + } + } else { + ++scan_mds_table_cnt_; + } + return !is_dag_full_;// true means iterating the next mds table +} + +} +} +} \ No newline at end of file diff --git a/src/storage/multi_data_source/mds_for_each_map_flush_operation.h b/src/storage/multi_data_source/mds_for_each_map_flush_operation.h new file mode 100644 index 0000000000..dd549c272b --- /dev/null +++ b/src/storage/multi_data_source/mds_for_each_map_flush_operation.h @@ -0,0 +1,41 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef STORAGE_MULTI_DATA_SOURCE_MDS_FOR_EACH_MAP_FLUSH_OPERATION_H +#define STORAGE_MULTI_DATA_SOURCE_MDS_FOR_EACH_MAP_FLUSH_OPERATION_H +#include "share/scn.h" +#include "common/ob_tablet_id.h" +namespace oceanbase +{ +namespace storage +{ +namespace mds +{ +class MdsTableBase; +struct FlushOp { + FlushOp(share::SCN do_flush_limit_scn, int64_t &scan_mds_table_cnt, share::SCN max_consequent_callbacked_scn) + : do_flush_limit_scn_(do_flush_limit_scn), + scan_mds_table_cnt_(scan_mds_table_cnt), + max_consequent_callbacked_scn_(max_consequent_callbacked_scn), + is_dag_full_(false) {} + bool operator()(const ObTabletID &, MdsTableBase *&mds_table); + bool dag_full() const { return is_dag_full_; } + share::SCN do_flush_limit_scn_; + int64_t &scan_mds_table_cnt_; + share::SCN max_consequent_callbacked_scn_; + bool is_dag_full_; +}; + +} +} +} +#endif \ No newline at end of file diff --git a/src/storage/multi_data_source/mds_table_base.h b/src/storage/multi_data_source/mds_table_base.h index 416a6e173e..985a0b7c58 100644 --- a/src/storage/multi_data_source/mds_table_base.h +++ b/src/storage/multi_data_source/mds_table_base.h @@ -165,7 +165,7 @@ public: int64_t get_node_cnt() const; virtual share::SCN get_rec_scn(); virtual int operate(const ObFunction &operation) = 0; - virtual int flush(share::SCN need_advanced_rec_scn_lower_limit) = 0; + virtual int flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn) = 0; virtual ObTabletID get_tablet_id() const; virtual bool is_flushing() const; virtual int fill_virtual_info(ObIArray &mds_node_info_array) const = 0; diff --git a/src/storage/multi_data_source/mds_table_handle.h b/src/storage/multi_data_source/mds_table_handle.h index e03e40fa04..7573494c70 100644 --- a/src/storage/multi_data_source/mds_table_handle.h +++ b/src/storage/multi_data_source/mds_table_handle.h @@ -102,7 +102,7 @@ public: int for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump(DUMP_OP &&for_each_op, const int64_t mds_construct_sequence, const bool for_flush) const; - int flush(share::SCN need_advanced_rec_scn_lower_limit); + int flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn); int is_flushing(bool &is_flushing) const; void on_flush(const share::SCN &flush_scn, const int flush_ret); int try_recycle(const share::SCN &recycle_scn);// release nodes diff --git a/src/storage/multi_data_source/mds_table_handle.ipp b/src/storage/multi_data_source/mds_table_handle.ipp index 7e400e619e..b21dd8de7b 100644 --- a/src/storage/multi_data_source/mds_table_handle.ipp +++ b/src/storage/multi_data_source/mds_table_handle.ipp @@ -653,13 +653,13 @@ int MdsTableHandle::for_each_unit_from_small_key_to_big_from_old_node_to_new_to_ return ret; } -inline int MdsTableHandle::flush(share::SCN need_advanced_rec_scn_lower_limit) +inline int MdsTableHandle::flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn) { int ret = OB_SUCCESS; // return ret;// FIXME: for lixia test, will block CLOG recycle #ifndef TEST_MDS_TRANSACTION CHECK_MDS_TABLE_INIT(); - ret = p_mds_table_base_->flush(need_advanced_rec_scn_lower_limit); + ret = p_mds_table_base_->flush(need_advanced_rec_scn_lower_limit, max_decided_scn); #endif return ret; } diff --git a/src/storage/multi_data_source/mds_table_impl.h b/src/storage/multi_data_source/mds_table_impl.h index 614ac0459d..c250eb22ae 100644 --- a/src/storage/multi_data_source/mds_table_impl.h +++ b/src/storage/multi_data_source/mds_table_impl.h @@ -140,10 +140,8 @@ public: const int64_t mds_construct_sequence, const bool for_flush) const override; virtual int operate(const ObFunction &operation) override; - int calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN need_advanced_rec_scn_lower_limit, - share::SCN &flush_scn, - int64_t &need_dumped_nodes_cnt); - virtual int flush(share::SCN need_advanced_rec_scn_lower_limit) override; + int calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN &flush_scn, int64_t &need_dumped_nodes_cnt); + virtual int flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn) override; void on_flush_(const share::SCN &flush_scn, const int flush_ret); virtual void on_flush(const share::SCN &flush_scn, const int flush_ret) override; virtual int try_recycle(const share::SCN recycle_scn) override; diff --git a/src/storage/multi_data_source/mds_table_impl.ipp b/src/storage/multi_data_source/mds_table_impl.ipp index 4d6244b2e3..6f6451ced0 100644 --- a/src/storage/multi_data_source/mds_table_impl.ipp +++ b/src/storage/multi_data_source/mds_table_impl.ipp @@ -921,60 +921,39 @@ struct CountUnDumpdedNodesBelowDoFlushScn// To filter unnecessary flush operatio const share::SCN &do_flush_scn_; }; template -int MdsTableImpl::calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN need_advanced_rec_scn_lower_limit, - share::SCN &do_flush_scn, - int64_t &need_dumped_nodes_cnt) +int MdsTableImpl::calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN &do_flush_scn, int64_t &need_dumped_nodes_cnt) { - #define PRINT_WRAPPER KR(ret), K(*this), K(need_advanced_rec_scn_lower_limit), K(do_flush_scn), \ - K(need_dumped_nodes_cnt) + #define PRINT_WRAPPER KR(ret), K(*this), K(do_flush_scn), K(need_dumped_nodes_cnt) MDS_TG(100_ms); int ret = OB_SUCCESS; - share::SCN calculated_flush_scn; - share::SCN ls_max_consequent_callbacked_scn = share::SCN::max_scn(); -#ifndef UNITTEST_DEBUG - if (MDS_FAIL(get_ls_max_consequent_callbacked_scn_(ls_max_consequent_callbacked_scn))) { - MDS_LOG_FLUSH(WARN, "fail to get ls_max_consequent_callbacked_scn"); - } else if (ls_max_consequent_callbacked_scn.is_max()) { - ret = OB_ERR_UNEXPECTED; - MDS_LOG_FLUSH(ERROR, "invalid ls max consequent callbacked scn"); - } else { - do_flush_scn = ls_max_consequent_callbacked_scn; - } -#else - do_flush_scn = need_advanced_rec_scn_lower_limit; -#endif - if (OB_SUCC(ret)) { - RecalculateFlushScnCauseOnlySuppportDumpCommittedNodeOP op1(do_flush_scn);// recalculate flush scn - CountUnDumpdedNodesBelowDoFlushScn op2(need_dumped_nodes_cnt, do_flush_scn);// count nodes need dump - if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCUALTE_FLUSH_SCN, op1))) { - MDS_LOG_FLUSH(WARN, "for each to calculate flush scn failed"); - } else if (MDS_FAIL(for_each_scan_row(FowEachRowAction::COUNT_NODES_BEFLOW_FLUSH_SCN, op2))) { - MDS_LOG_FLUSH(WARN, "for each to count undumped nodes failed"); - } - } + RecalculateFlushScnCauseOnlySuppportDumpCommittedNodeOP op1(do_flush_scn);// recalculate flush scn + CountUnDumpdedNodesBelowDoFlushScn op2(need_dumped_nodes_cnt, do_flush_scn);// count nodes need dump + if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCUALTE_FLUSH_SCN, op1))) { + MDS_LOG_FLUSH(WARN, "for each to calculate flush scn failed"); + } else if (MDS_FAIL(for_each_scan_row(FowEachRowAction::COUNT_NODES_BEFLOW_FLUSH_SCN, op2))) { + MDS_LOG_FLUSH(WARN, "for each to count undumped nodes failed"); + } return ret; #undef PRINT_WRAPPER } template -int MdsTableImpl::flush(share::SCN need_advanced_rec_scn_lower_limit) +int MdsTableImpl::flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn) {// if rec_scn below this limit, need generate dag and try hard to advance it - #define PRINT_WRAPPER KR(ret), K(*this), K(need_advanced_rec_scn_lower_limit), K(do_flush_scn), K(undump_node_cnt) + #define PRINT_WRAPPER KR(ret), K(*this), K(need_advanced_rec_scn_lower_limit), K(do_flush_scn), K(max_decided_scn), K(undump_node_cnt) MDS_TG(100_ms); int ret = OB_SUCCESS; - share::SCN do_flush_scn;// this scn is defined for calculation + share::SCN do_flush_scn = max_decided_scn;// this scn is defined for calculation int64_t undump_node_cnt = 0; MdsWLockGuard lg(lock_); - if (!need_advanced_rec_scn_lower_limit.is_valid()) { + if (!need_advanced_rec_scn_lower_limit.is_valid() || !max_decided_scn.is_valid() || max_decided_scn.is_max()) { ret = OB_INVALID_ARGUMENT; MDS_LOG_FLUSH(WARN, "invalid recycle scn"); } else if (get_rec_scn().is_max()) { MDS_LOG_FLUSH(TRACE, "no need do flush cause rec_scn is MAX already"); } else if (need_advanced_rec_scn_lower_limit < get_rec_scn()) {// no need dump this mds table to advance rec_scn MDS_LOG_FLUSH(TRACE, "no need do flush need_advanced_rec_scn_lower_limit less than rec_scn"); - } else if (MDS_FAIL(calculate_flush_scn_and_need_dumped_nodes_cnt_(need_advanced_rec_scn_lower_limit, - do_flush_scn, - undump_node_cnt))) { + } else if (MDS_FAIL(calculate_flush_scn_and_need_dumped_nodes_cnt_(do_flush_scn, undump_node_cnt))) { MDS_LOG_FLUSH(WARN, "fail to call calculate_flush_scn_and_need_dumped_nodes_cnt_"); } else if (undump_node_cnt == 0) {// no need do flush actually // mds_ckpt_scn on tablet won't be advanced, @@ -985,10 +964,12 @@ int MdsTableImpl::flush(share::SCN need_advanced_rec_scn_lower_lim } else { #ifndef UNITTEST_DEBUG if (MDS_FAIL(merge(construct_sequence_, do_flush_scn))) { - if (OB_EAGAIN == ret || OB_SIZE_OVERFLOW == ret) { + if (OB_EAGAIN == ret) { if (REACH_TIME_INTERVAL(100_ms)) { - MDS_LOG_FLUSH(WARN, "failed to commit merge mds table dag cause already exist or queue already full"); + MDS_LOG_FLUSH(WARN, "failed to commit merge mds table dag cause already exist"); ret = OB_SUCCESS; + } else if (OB_SIZE_OVERFLOW == ret) {// throw out + MDS_LOG_FLUSH(WARN, "failed to commit merge mds table dag cause queue already full"); } } else { MDS_LOG_FLUSH(WARN, "failed to commit merge mds table dag"); diff --git a/src/storage/multi_data_source/mds_table_mgr.cpp b/src/storage/multi_data_source/mds_table_mgr.cpp index daa2ea57a5..1c2f16f7e4 100644 --- a/src/storage/multi_data_source/mds_table_mgr.cpp +++ b/src/storage/multi_data_source/mds_table_mgr.cpp @@ -13,6 +13,7 @@ #include "lib/ob_errno.h" #include "mds_ctx.h" #include "ob_tablet_id.h" +#include "storage/multi_data_source/mds_table_order_flusher.h" #define USING_LOG_PREFIX MDS #include "mds_table_mgr.h" @@ -135,62 +136,21 @@ void ObMdsTableMgr::unregister_from_removed_mds_table_recorder(MdsTableBase *p_m } } -int ObMdsTableMgr::first_scan_to_get_min_rec_scn_(share::SCN &min_rec_scn, ObIArray &min_rec_scn_ids) -{ - MDS_TG(10_s); - int ret = OB_SUCCESS; - int64_t scan_cnt = 0; - auto get_min_rec_scn_op = - [&min_rec_scn, &scan_cnt, &min_rec_scn_ids](const common::ObTabletID &, MdsTableBase *&mds_table) { - if (!mds_table->is_switched_to_empty_shell()) { - share::SCN rec_scn = mds_table->get_rec_scn(); - if (rec_scn == min_rec_scn) { - if (min_rec_scn_ids.count() < 128) { - (void) min_rec_scn_ids.push_back(mds_table->get_tablet_id()); - } - } else if (rec_scn < min_rec_scn) { - min_rec_scn = rec_scn; - min_rec_scn_ids.reset(); - (void) min_rec_scn_ids.push_back(mds_table->get_tablet_id()); - } - ++scan_cnt; +template +struct OrderOp { + OrderOp(Flusher &flusher) : flusher_(flusher) {} + int operator()(MdsTableBase &mds_table) { + if (!mds_table.is_switched_to_empty_shell()) { + share::SCN rec_scn = mds_table.get_rec_scn(); + flusher_.record_mds_table({mds_table.get_tablet_id(), rec_scn}); } - return true;// true means iterating the next mds table - }; - if (OB_FAIL(mds_table_map_.for_each(get_min_rec_scn_op))) { - MDS_LOG(WARN, "fail to do map for_each", KR(ret), K(*this), K(scan_cnt), K(min_rec_scn), K(min_rec_scn_ids)); + return OB_SUCCESS; } - return ret; -} - -int ObMdsTableMgr::second_scan_to_do_flush_(share::SCN do_flush_limit_scn) -{ - MDS_TG(10_s); - int ret = OB_SUCCESS; - int64_t scan_mds_table_cnt = 0; - auto flush_op = [do_flush_limit_scn, &scan_mds_table_cnt](const common::ObTabletID &tablet_id, - MdsTableBase *&mds_table) { - int tmp_ret = OB_SUCCESS; - if (mds_table->is_switched_to_empty_shell()) { - MDS_LOG_RET(INFO, ret, "skip empty shell tablet mds_table flush", K(tablet_id), K(scan_mds_table_cnt)); - } else if (OB_TMP_FAIL(mds_table->flush(do_flush_limit_scn))) { - MDS_LOG_RET(WARN, ret, "flush mds table failed", KR(tmp_ret), K(tablet_id), K(scan_mds_table_cnt)); - } else { - ++scan_mds_table_cnt; - } - return true;// true means iterating the next mds table - }; - if (OB_FAIL(mds_table_map_.for_each(flush_op))) { - MDS_LOG(WARN, "fail to do map for_each", KR(ret), K(*this), K(scan_mds_table_cnt), K(do_flush_limit_scn)); - } else { - MDS_LOG(INFO, "success to do second scan to do flush", KR(ret), K(*this), K(scan_mds_table_cnt), K(do_flush_limit_scn)); - } - return ret; -} - + Flusher &flusher_; +}; int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze) { - #define PRINT_WRAPPER KR(ret), K(ls_->get_ls_id()), K(recycle_scn), K(need_freeze), K(min_rec_scn),\ + #define PRINT_WRAPPER KR(ret), K(ls_->get_ls_id()), K(recycle_scn), K(need_freeze), K(order_flusher_for_some),\ K(max_consequent_callbacked_scn), K(*this) MDS_TG(10_s); int ret = OB_SUCCESS; @@ -198,26 +158,25 @@ int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze) freeze_guard.init(this); int64_t flushed_table_cnt = 0; MdsTableHandle *flushing_mds_table = nullptr; - share::SCN min_rec_scn = share::SCN::max_scn(); share::SCN max_consequent_callbacked_scn; - ObSEArray min_rec_scn_tablet_ids; + FlusherForSome order_flusher_for_some; if (IS_NOT_INIT) { ret = OB_NOT_INIT; MDS_LOG_FREEZE(ERROR, "mds table mgr not inited"); } else if (!freeze_guard.can_freeze()) { MDS_LOG_FREEZE(INFO, "mds table mgr is doing flush, skip flush once"); - } else if (MDS_FAIL(first_scan_to_get_min_rec_scn_(min_rec_scn, min_rec_scn_tablet_ids))) { - MDS_LOG_FREEZE(WARN, "do first_scan_to_get_min_rec_scn_ failed"); - } else if (min_rec_scn == share::SCN::max_scn()) {// no mds table + } else if (MDS_FAIL(for_each_in_t3m_mds_table(OrderOp(order_flusher_for_some)))) { + MDS_LOG_FREEZE(WARN, "do first scan failed"); + } else if (order_flusher_for_some.empty()) {// no mds table MDS_LOG_FREEZE(INFO, "no valid mds table there, no need do flush"); - } else if (MDS_FAIL(ls_->get_freezer()->get_max_consequent_callbacked_scn(max_consequent_callbacked_scn))) { + } else if (MDS_FAIL(ls_->get_max_decided_scn(max_consequent_callbacked_scn))) { MDS_LOG_FREEZE(WARN, "fail to get max_consequent_callbacked_scn", KR(ret), K(*this)); } else if (!max_consequent_callbacked_scn.is_valid() || max_consequent_callbacked_scn.is_max() || max_consequent_callbacked_scn.is_min()) { ret = OB_ERR_UNEXPECTED; MDS_LOG_FREEZE(WARN, "invalid max_consequent_callbacked_scn", KR(ret), K(*this)); } else { - if (need_freeze) { + if (need_freeze) {// need advance freezing scn if (recycle_scn.is_max() || !recycle_scn.is_valid()) { recycle_scn = max_consequent_callbacked_scn; } @@ -230,12 +189,8 @@ int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze) freezing_scn_ = max_consequent_callbacked_scn; MDS_LOG_FREEZE(INFO, "freezing_scn decline to max_consequent_callbacked_scn"); } - if (min_rec_scn <= freezing_scn_) { - if (MDS_FAIL(second_scan_to_do_flush_(freezing_scn_))) { - MDS_LOG_FREEZE(WARN, "fail to do flush"); - } else { - MDS_LOG_FREEZE(INFO, "success to do flush"); - } + if (order_flusher_for_some.min_key().rec_scn_ <= freezing_scn_) { + order_flush_(order_flusher_for_some, freezing_scn_, max_consequent_callbacked_scn); } else { MDS_LOG_FREEZE(INFO, "no need do flush cause min_rec_scn is larger than freezing scn"); } @@ -244,6 +199,41 @@ int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze) #undef PRINT_WRAPPER } +void ObMdsTableMgr::order_flush_(FlusherForSome &order_flusher_for_some, + share::SCN freezing_scn, + share::SCN max_consequent_callbacked_scn) +{ + #define PRINT_WRAPPER KR(ret), K(ls_->get_ls_id()), K(freezing_scn), K(order_flusher_for_some),\ + K(third_sacn_mds_table_cnt), K(max_consequent_callbacked_scn), K(order_flusher_for_all.count()),\ + K(*this) + MDS_TG(10_s); + int ret = OB_SUCCESS; + int64_t third_sacn_mds_table_cnt = 0; + FlusherForAll order_flusher_for_all; + FlushOp flush_op(freezing_scn, third_sacn_mds_table_cnt, max_consequent_callbacked_scn); + if (!order_flusher_for_some.full() || order_flusher_for_some.max_key().rec_scn_ > freezing_scn_) { + // than means all mds_tables needed be flushed is included in order_flusher_for_some + order_flusher_for_some.flush_by_order(mds_table_map_, freezing_scn_, max_consequent_callbacked_scn); + MDS_LOG_FREEZE(INFO, "flush all mds_tables(little number) by total order"); + } else { + order_flusher_for_all.reserve_memory(mds_table_map_.count()); + if (MDS_FAIL(for_each_in_t3m_mds_table(OrderOp(order_flusher_for_all)))) {// second scan + MDS_LOG_FREEZE(WARN, "do scan failed"); + } + order_flusher_for_all.flush_by_order(mds_table_map_, freezing_scn_, max_consequent_callbacked_scn); + if (order_flusher_for_all.incomplete()) {// need do third scan + if (MDS_FAIL(mds_table_map_.for_each(flush_op))) {// third scan + MDS_LOG_FREEZE(WARN, "do scan failed"); + } else { + MDS_LOG_FREEZE(INFO, "flush some mds_tables by order, and others out of order"); + } + } else { + MDS_LOG_FREEZE(INFO, "flush all mds_tables(large number) by total order"); + } + } + #undef PRINT_WRAPPER +} + SCN ObMdsTableMgr::get_freezing_scn() const { return freezing_scn_.atomic_get(); } SCN ObMdsTableMgr::get_rec_scn() @@ -252,35 +242,29 @@ SCN ObMdsTableMgr::get_rec_scn() return get_rec_scn(tablet_id); } -share::SCN ObMdsTableMgr::get_rec_scn(ObTabletID &tablet_id) +SCN ObMdsTableMgr::get_rec_scn(ObTabletID &tablet_id) { + #define PRINT_WRAPPER KR(ret), K(min_rec_scn), K(order_flusher_for_one), K(*this) MDS_TG(1_s); int ret = OB_SUCCESS; SCN min_rec_scn = share::SCN::max_scn(); - ObSEArray min_rec_scn_tablet_ids; - + FlusherForOne order_flusher_for_one; if (IS_NOT_INIT) { ret = OB_NOT_INIT; MDS_LOG(ERROR, "regsiter mds table failed", KR(ret)); - } else if (MDS_FAIL(first_scan_to_get_min_rec_scn_(min_rec_scn, min_rec_scn_tablet_ids))) { - min_rec_scn = SCN::min_scn(); - MDS_LOG(WARN, "fail to scan get min_rec_scn", KR(ret), K(min_rec_scn), K(min_rec_scn_tablet_ids), K(*this)); + } else if (MDS_FAIL(for_each_in_t3m_mds_table(OrderOp(order_flusher_for_one)))) { + min_rec_scn.set_min(); + MDS_LOG_FREEZE(WARN, "do first scan failed"); + } else if (order_flusher_for_one.empty()) {// no mds table + MDS_LOG_FREEZE(INFO, "no valid mds table there, return MAX SCN"); } else { - if (!min_rec_scn_tablet_ids.empty()) { - tablet_id = min_rec_scn_tablet_ids.at(0); - } - MDS_LOG(INFO, "get rec_scn from MdsTableMgr", KR(ret), K(min_rec_scn), K(min_rec_scn_tablet_ids), K(*this)); + FlushKey key = order_flusher_for_one.min_key(); + min_rec_scn = key.rec_scn_; + tablet_id = key.tablet_id_; + MDS_LOG_FREEZE(INFO, "get rec_scn from MdsTableMgr"); } return min_rec_scn; -} - -DEF_TO_STRING(ObMdsTableMgr) -{ - int64_t pos = 0; - J_OBJ_START(); - J_KV(KP(this), K_(is_inited), K_(freezing_scn), KPC_(ls)); - J_OBJ_END(); - return pos; + #undef PRINT_WRAPPER } int MdsTableFreezeGuard::init(ObMdsTableMgr *mds_mgr) @@ -341,6 +325,15 @@ int MdsTableMgrHandle::set_mds_table_mgr(ObMdsTableMgr *mds_table_mgr) return ret; } +DEF_TO_STRING(ObMdsTableMgr) +{ + int64_t pos = 0; + J_OBJ_START(); + J_KV(KP(this), K_(is_inited), K_(freezing_scn), KPC_(ls)); + J_OBJ_END(); + return pos; +} + } // namespace mds } // namespace storage -} // namespace oceanbase +} // namespace oceanbase \ No newline at end of file diff --git a/src/storage/multi_data_source/mds_table_mgr.h b/src/storage/multi_data_source/mds_table_mgr.h index 6bcd48ac3a..6931a96376 100644 --- a/src/storage/multi_data_source/mds_table_mgr.h +++ b/src/storage/multi_data_source/mds_table_mgr.h @@ -20,6 +20,7 @@ #include "storage/checkpoint/ob_common_checkpoint.h" #include "storage/multi_data_source/runtime_utility/list_helper.h" #include "lib/hash/ob_linear_hash_map.h" +#include "mds_table_order_flusher.h" namespace oceanbase { namespace storage { @@ -51,7 +52,6 @@ private: class ObMdsTableMgr final : public checkpoint::ObCommonCheckpoint { - using MdsTableMap = common::ObLinearHashMap; friend MdsTableFreezeGuard; public: ObMdsTableMgr() @@ -63,6 +63,7 @@ public: mds_table_map_(), removed_mds_table_recorder_() {} ~ObMdsTableMgr() { destroy(); } + DECLARE_TO_STRING; int init(ObLS *ls); int reset(); @@ -84,19 +85,25 @@ public: removed_mds_table_recorder_.for_each(op_wrapper); return ret; } - template // if op return FAIL, break for-each - int for_each_in_t3m_mds_table(OP &&op) { - auto op_wrapper = [&op](const common::ObTabletID &k, MdsTableBase* &v) -> bool { + template + struct OpWrapper { + template + OpWrapper(T &&op) : op_(op) {} + bool operator()(const common::ObTabletID &k, MdsTableBase* &v) { bool keep_iterating = true;// means keep iterating next mds_table int ret = OB_SUCCESS; - if (OB_FAIL(op(*v))) { + if (OB_FAIL(op_(*v))) { keep_iterating = false; } return keep_iterating; - }; - return mds_table_map_.for_each(op_wrapper); + } + OP op_; + }; + template // if op return FAIL, break for-each + int for_each_in_t3m_mds_table(OP &&op) { + OpWrapper wrapper(std::forward(op)); + return mds_table_map_.for_each(wrapper); } - DECLARE_TO_STRING; public: // derived from ObCommonCheckpoint share::SCN get_freezing_scn() const; @@ -112,9 +119,9 @@ public: // getter and setter int64_t get_ref() { return ATOMIC_LOAD(&ref_cnt_); } private: - int first_scan_to_get_min_rec_scn_(share::SCN &min_rec_scn, ObIArray &min_rec_scn_ids); - int second_scan_to_do_flush_(share::SCN min_rec_scn); - + void order_flush_(FlusherForSome &order_flusher_for_some, + share::SCN freezing_scn, + share::SCN max_consequent_callbacked_scn); private: bool is_inited_; bool is_freezing_; diff --git a/src/storage/multi_data_source/mds_table_order_flusher.cpp b/src/storage/multi_data_source/mds_table_order_flusher.cpp new file mode 100644 index 0000000000..930ea5dbb9 --- /dev/null +++ b/src/storage/multi_data_source/mds_table_order_flusher.cpp @@ -0,0 +1,31 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "mds_table_order_flusher.h" + +namespace oceanbase +{ +namespace storage +{ +namespace mds +{ + +// define in cpp file is for rewrire this function in unittest +void *MdsFlusherModulePageAllocator::alloc(const int64_t size, const ObMemAttr &attr) { + void *ret = nullptr; + ret = (NULL == allocator_) ? share::mtl_malloc(size, attr) : allocator_->alloc(size, attr); + return ret; +} + +} +} +} \ No newline at end of file diff --git a/src/storage/multi_data_source/mds_table_order_flusher.h b/src/storage/multi_data_source/mds_table_order_flusher.h new file mode 100644 index 0000000000..3f20a391ea --- /dev/null +++ b/src/storage/multi_data_source/mds_table_order_flusher.h @@ -0,0 +1,135 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H +#define SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H + +#include "ob_tablet_id.h" +#include "share/scn.h" +#include "runtime_utility/common_define.h" +#include "deps/oblib/src/lib/allocator/page_arena.h" +#include "lib/ob_errno.h" +#include "lib/utility/ob_print_utils.h" +#include "meta_programming/ob_type_traits.h" +#include "lib/hash/ob_linear_hash_map.h" +#include "deps/oblib/src/lib/container/ob_array_iterator.h" +#include "mds_for_each_map_flush_operation.h" +#include +#include + +namespace oceanbase +{ +namespace storage +{ +namespace mds +{ + +struct MdsFlusherModulePageAllocator : public ModulePageAllocator { + // just forward to parent + MdsFlusherModulePageAllocator(const lib::ObLabel &label = "MdsFlusherArray", + int64_t tenant_id = OB_SERVER_TENANT_ID, + int64_t ctx_id = 0) + : ModulePageAllocator(ObMemAttr(tenant_id, label, ctx_id)) {} + MdsFlusherModulePageAllocator(const lib::ObMemAttr &attr) : ModulePageAllocator("MdsFlusherArray") {} + explicit MdsFlusherModulePageAllocator(ObIAllocator &allocator, + const lib::ObLabel &label = "MdsFlusherArray") + : ModulePageAllocator(allocator, label) {} + virtual ~MdsFlusherModulePageAllocator() {} + // just change ob_malloc to ob_tenant_malloc + void *alloc(const int64_t size) { return ModulePageAllocator::alloc(size); } + void *alloc(const int64_t size, const ObMemAttr &attr); + void free(void *p) { + (NULL == allocator_) ? share::mtl_free(p) : allocator_->free(p); p = NULL; + } +}; + +struct FlushKey// 16 bytes +{ + FlushKey() + : tablet_id_(), + rec_scn_() {} + FlushKey(common::ObTabletID tablet_id, share::SCN rec_scn) + : tablet_id_(tablet_id), rec_scn_(rec_scn) {} + bool operator<(const FlushKey &rhs) const { return rec_scn_ < rhs.rec_scn_; } + bool operator==(const FlushKey &rhs) const { return rec_scn_ == rhs.rec_scn_; } + bool is_valid() const { return tablet_id_.is_valid() && rec_scn_.is_valid(); } + TO_STRING_KV(K_(tablet_id), K_(rec_scn)); + common::ObTabletID tablet_id_; + share::SCN rec_scn_; +}; + +template +struct MdsTableHighPriorityFlusher { + MdsTableHighPriorityFlusher() : size_(0) {} + bool try_record_mds_table(FlushKey new_key, FlushKey &eliminated_key); + void flush_by_order(MdsTableMap &map, share::SCN limit, share::SCN max_decided_scn); + int64_t to_string(char *buf, const int64_t len) const; + int64_t size_; + FlushKey high_priority_mds_tables_[STACK_QUEUE_SIZE]; +}; + +template +struct MdsTableOrderFlusher; + +// 1. if memory is enough, flush mds table by strict ASC rec_scn order. +// 2. if memory is not enough, flush at least STACK_QUEUE_SIZE smallest mds table by strict ASC rec_scn order. +template +struct MdsTableOrderFlusher +{ + MdsTableOrderFlusher() + : array_err_(OB_SUCCESS), high_priority_flusher_(), extra_mds_tables_() {} + void reserve_memory(int64_t mds_table_total_size_likely); + void record_mds_table(FlushKey key); + void flush_by_order(MdsTableMap &map, share::SCN limit, share::SCN max_decided_scn); + int64_t count() const { return high_priority_flusher_.size_ + extra_mds_tables_.count(); } + bool incomplete() const { return array_err_ != OB_SUCCESS; } + TO_STRING_KV(K_(array_err), K_(high_priority_flusher)); + int array_err_; + MdsTableHighPriorityFlusher high_priority_flusher_; + ObArray extra_mds_tables_; +}; + +// just record stack number items, and won't failed +template +struct MdsTableOrderFlusher +{ + MdsTableOrderFlusher() + : high_priority_flusher_() {} + void record_mds_table(FlushKey key); + void flush_by_order(MdsTableMap &map, share::SCN limit, share::SCN max_decided_scn); + bool empty() const { return high_priority_flusher_.size_ == 0; } + bool full() const { return high_priority_flusher_.size_ == STACK_QUEUE_SIZE; } + FlushKey min_key() const { return high_priority_flusher_.high_priority_mds_tables_[0]; } + FlushKey max_key() const { return high_priority_flusher_.high_priority_mds_tables_[high_priority_flusher_.size_ == 0 ? + 0 : + high_priority_flusher_.size_ - 1]; } + TO_STRING_KV(K_(high_priority_flusher)); + MdsTableHighPriorityFlusher high_priority_flusher_; +}; + +static constexpr int64_t FLUSH_FOR_ONE_SIZE = 1; +static constexpr int64_t FLUSH_FOR_SOME_SIZE = 32; +static constexpr int64_t FLUSH_FOR_ALL_SIZE = 128; +using FlusherForOne = MdsTableOrderFlusher; +using FlusherForSome = MdsTableOrderFlusher; +using FlusherForAll = MdsTableOrderFlusher; + +} +} +} + +#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H_IPP +#define SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H_IPP +#include "mds_table_order_flusher.ipp" +#endif + +#endif \ No newline at end of file diff --git a/src/storage/multi_data_source/mds_table_order_flusher.ipp b/src/storage/multi_data_source/mds_table_order_flusher.ipp new file mode 100644 index 0000000000..6362304e92 --- /dev/null +++ b/src/storage/multi_data_source/mds_table_order_flusher.ipp @@ -0,0 +1,168 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_IPP +#define SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_IPP + +#include "lib/ob_errno.h" +#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H_IPP +#define SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H_IPP +#include "mds_table_order_flusher.h" +#endif + +namespace oceanbase +{ +namespace storage +{ +namespace mds +{ + +// this is optimized for get_rec_scn() operation +template <> +inline bool MdsTableHighPriorityFlusher::try_record_mds_table(FlushKey new_key, FlushKey &eliminated_key) { + if (OB_UNLIKELY(size_ == 0)) { + size_ = 1; + high_priority_mds_tables_[0] = new_key; + } else { + if (new_key < high_priority_mds_tables_[0]) { + high_priority_mds_tables_[0] = new_key; + } + } + return true; +} + +template +inline bool MdsTableHighPriorityFlusher::try_record_mds_table(FlushKey new_key, FlushKey &eliminated_key) { + bool need_insert = false; + if (size_ < STACK_QUEUE_SIZE) { + need_insert = true; + } else if (new_key < high_priority_mds_tables_[STACK_QUEUE_SIZE - 1]) { + need_insert = true; + } + if (need_insert) { + FlushKey *begin = &high_priority_mds_tables_[0]; + FlushKey *end = &high_priority_mds_tables_[size_ >= STACK_QUEUE_SIZE ? STACK_QUEUE_SIZE : size_]; + FlushKey *iter = std::upper_bound(begin, end, new_key); + if (end == &high_priority_mds_tables_[STACK_QUEUE_SIZE]) { + eliminated_key = high_priority_mds_tables_[STACK_QUEUE_SIZE - 1]; + MDS_ASSERT(iter < end); + memmove(iter + 1, iter, (end - iter - 1) * sizeof(FlushKey)); + } else {// maybe end == iter + memmove(iter + 1, iter, (end - iter) * sizeof(FlushKey)); + ++size_; + } + *iter = new_key; + } + return need_insert; +} + +template +inline void MdsTableHighPriorityFlusher::flush_by_order(MdsTableMap &map, share::SCN limit, share::SCN max_decided_scn) { + int ret = OB_SUCCESS; + int64_t flush_count = 0; + FlushOp op(limit, flush_count, max_decided_scn); + for (int64_t i = 0; i < size_ && !op.dag_full(); ++i) {// ignore ERROR + if (OB_FAIL(map.operate(high_priority_mds_tables_[i].tablet_id_, op))) { + MDS_LOG(WARN, "fail to operate mds table", K(i), K(high_priority_mds_tables_[i]), KR(ret), K(limit), K(max_decided_scn)); + } + } +} + +template +int64_t MdsTableHighPriorityFlusher::to_string(char *buf, const int64_t len) const { + int64_t pos = 0; + constexpr int64_t MAX_PRINT_NUMBER = 32L; + databuff_printf(buf, len, pos, "size:%ld, ordered_tablets:{", size_); + int64_t print_numnber = std::min(size_, MAX_PRINT_NUMBER); + for (int64_t i = 0; i < print_numnber; ++i) { + databuff_printf(buf, len, pos, "%s", to_cstring(high_priority_mds_tables_[i])); + if (i != print_numnber) { + databuff_printf(buf, len, pos, ", "); + } else { + if (print_numnber < size_) { + databuff_printf(buf, len, pos, "%ld more...}", size_ - print_numnber); + } else { + databuff_printf(buf, len, pos, "}"); + } + } + } + return pos; +} + +template +void MdsTableOrderFlusher::reserve_memory(int64_t mds_table_total_size_likely) {// it'ok if failed + int ret = OB_SUCCESS; + constexpr int64_t max_tablet_number = 1_MB;//sizeof(100w FlushKey) = 16MB + int64_t reserve_size = std::min(mds_table_total_size_likely * 2, max_tablet_number); + if (OB_FAIL(extra_mds_tables_.reserve(reserve_size))) { + MDS_LOG(WARN, "fail to reserve memory", KR(ret)); + array_err_ = ret; + } +} + +template +void MdsTableOrderFlusher::record_mds_table(FlushKey key) {// it's ok if failed + int ret = OB_SUCCESS; + FlushKey eliminated_key; + if (!high_priority_flusher_.try_record_mds_table(key, eliminated_key)) { + eliminated_key = key; + } + if (eliminated_key.is_valid() && !array_err_) { + if (OB_FAIL(extra_mds_tables_.push_back(eliminated_key))) { + MDS_LOG(WARN, "fail to push_back", KR(ret), K(eliminated_key)); + array_err_ = ret; + } + } +} + +template +void MdsTableOrderFlusher::record_mds_table(FlushKey key) {// won't be failed + FlushKey _; + high_priority_flusher_.try_record_mds_table(key, _); +} + +template +void MdsTableOrderFlusher::flush_by_order(MdsTableMap &map, + share::SCN limit, + share::SCN max_decided_scn) { + int ret = OB_SUCCESS; + int64_t flush_count = 0; + FlushOp op(limit, flush_count, max_decided_scn); + high_priority_flusher_.flush_by_order(map, limit, max_decided_scn); + if (!array_err_) {// if array has error, just order flush STACK_QUEUE_SIZE mds tables + try { + std::sort(extra_mds_tables_.begin(), extra_mds_tables_.end()); + } catch (std::exception e) { + MDS_LOG(WARN, "std::sort failed", K(e.what())); + array_err_ = OB_ERR_UNEXPECTED; + } + if (!array_err_) { + for (int64_t idx = 0; idx < extra_mds_tables_.count() && !op.dag_full(); ++idx) { + if (OB_FAIL(map.operate(extra_mds_tables_[idx].tablet_id_, op))) { + MDS_LOG(WARN, "fail to operate mds table", K(idx), K(extra_mds_tables_[idx]), KR(ret), K(limit), K(max_decided_scn)); + } + } + } + } +} + +template +void MdsTableOrderFlusher::flush_by_order(MdsTableMap &map, + share::SCN limit, + share::SCN max_decided_scn) { + high_priority_flusher_.flush_by_order(map, limit, max_decided_scn); +} + +} +} +} +#endif \ No newline at end of file diff --git a/src/storage/multi_data_source/runtime_utility/common_define.h b/src/storage/multi_data_source/runtime_utility/common_define.h index c2f928729d..8b0f84610f 100644 --- a/src/storage/multi_data_source/runtime_utility/common_define.h +++ b/src/storage/multi_data_source/runtime_utility/common_define.h @@ -42,6 +42,8 @@ namespace storage { namespace mds { +class MdsTableBase; +using MdsTableMap = common::ObLinearHashMap; enum class NodePosition { MDS_TABLE, diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index c2b90831f7..898d820aab 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -5495,7 +5495,7 @@ int ObTablet::get_mds_table_rec_log_scn(SCN &rec_scn) return ret; } -int ObTablet::mds_table_flush(const share::SCN &recycle_scn) +int ObTablet::mds_table_flush(const share::SCN &decided_scn) { int ret = OB_SUCCESS; mds::MdsTableHandle mds_table; @@ -5507,7 +5507,7 @@ int ObTablet::mds_table_flush(const share::SCN &recycle_scn) LOG_WARN("inner tablet does not have mds table", K(ret)); } else if (OB_FAIL(inner_get_mds_table(mds_table))) { LOG_WARN("failed to get mds table", K(ret)); - } else if (OB_FAIL(mds_table.flush(recycle_scn))) { + } else if (OB_FAIL(mds_table.flush(decided_scn, decided_scn))) { LOG_WARN("failed to flush mds table", KR(ret), KPC(this)); } return ret; diff --git a/src/storage/tablet/ob_tablet.h b/src/storage/tablet/ob_tablet.h index 824255aa76..7ff3ffcf7b 100644 --- a/src/storage/tablet/ob_tablet.h +++ b/src/storage/tablet/ob_tablet.h @@ -176,7 +176,7 @@ public: inline common::ObRowStoreType get_last_major_latest_row_store_type() const { return table_store_cache_.last_major_latest_row_store_type_; } inline bool is_row_store() const { return table_store_cache_.is_row_store_; } int get_mds_table_rec_log_scn(share::SCN &rec_scn); - int mds_table_flush(const share::SCN &recycle_scn); + int mds_table_flush(const share::SCN &decided_scn); public: // first time create tablet diff --git a/unittest/storage/CMakeLists.txt b/unittest/storage/CMakeLists.txt index 6ae06a5ccc..975caa1ee4 100644 --- a/unittest/storage/CMakeLists.txt +++ b/unittest/storage/CMakeLists.txt @@ -53,6 +53,7 @@ storage_unittest(test_mds_node multi_data_source/test_mds_node.cpp) # storage_unittest(test_mds_unit multi_data_source/test_mds_unit.cpp) storage_unittest(test_mds_table multi_data_source/test_mds_table.cpp) storage_unittest(test_mds_table_flush multi_data_source/test_mds_table_flush.cpp) +storage_unittest(test_mds_table_flusher multi_data_source/test_mds_table_flusher.cpp) storage_unittest(test_mds_dump_kv multi_data_source/test_mds_dump_kv.cpp) #storage_unittest(test_multiple_merge) #storage_unittest(test_memtable_multi_version_row_iterator memtable/test_memtable_multi_version_row_iterator.cpp) diff --git a/unittest/storage/multi_data_source/test_mds_table.cpp b/unittest/storage/multi_data_source/test_mds_table.cpp index 0595cd0294..8219892b5a 100644 --- a/unittest/storage/multi_data_source/test_mds_table.cpp +++ b/unittest/storage/multi_data_source/test_mds_table.cpp @@ -376,7 +376,7 @@ void TestMdsTable::test_flush() { ctx2.on_redo(mock_scn(200)); int idx = 0; - ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(300)));// 1. 以300为版本号进行flush动作 + ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(300), mock_scn(500)));// 1. 以300为版本号进行flush动作 ASSERT_EQ(mock_scn(199), mds_table_.p_mds_table_base_->flushing_scn_);// 2. 实际上以199为版本号进行flush动作 ASSERT_EQ(OB_SUCCESS, mds_table_.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump( [&idx](const MdsDumpKV &kv) -> int {// 2. 转储时扫描mds table @@ -432,7 +432,7 @@ void TestMdsTable::test_multi_key_remove() { return OB_SUCCESS; }, is_committed); ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(200))); + ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(200), mock_scn(500))); ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(200))); ret = mds_table_.get_latest(ExampleUserKey(2), [](const ExampleUserData1 &data){ return OB_SUCCESS; @@ -512,16 +512,16 @@ TEST_F(TestMdsTable, test_recycle) { int64_t valid_cnt = 0; ASSERT_EQ(OB_SUCCESS, mds_table_.get_node_cnt(valid_cnt)); ASSERT_EQ(1, valid_cnt);// 此时还有一个19001版本的已提交数据,因为rec_scn没有推上去 - ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(20000))); + ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(20000), mock_scn(40000))); mds_table_.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump([](const MdsDumpKV &){ return OB_SUCCESS; }, 0, true); - mds_table_.on_flush(mock_scn(20000), OB_SUCCESS); + mds_table_.on_flush(mock_scn(40000), OB_SUCCESS); share::SCN rec_scn; ASSERT_EQ(OB_SUCCESS, mds_table_.get_rec_scn(rec_scn)); MDS_LOG(INFO, "print rec scn", K(rec_scn)); ASSERT_EQ(share::SCN::max_scn(), rec_scn); - ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(20000))); + ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(40000))); ASSERT_EQ(OB_SUCCESS, mds_table_.get_node_cnt(valid_cnt)); ASSERT_EQ(0, valid_cnt);// 此时还有一个19001版本的已提交数据,因为rec_scn没有推上去 } @@ -541,17 +541,17 @@ TEST_F(TestMdsTable, test_recalculate_flush_scn_op) { ASSERT_EQ(OB_SUCCESS, mds_table.set(ExampleUserData1(3), ctx3)); ctx3.on_redo(mock_scn(9)); ctx3.on_commit(mock_scn(11), mock_scn(11)); - ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(4))); + ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(4), mock_scn(4))); ASSERT_EQ(mock_scn(4), mds_table.p_mds_table_base_->flushing_scn_); mds_table.on_flush(mock_scn(4), OB_SUCCESS); - ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(5)));// no need do flush, directly advance rec_scn + ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(5), mock_scn(5)));// no need do flush, directly advance rec_scn ASSERT_EQ(false, mds_table.p_mds_table_base_->flushing_scn_.is_valid()); - ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(6)));// no need do flush, directly advance rec_scn + ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(6), mock_scn(6)));// no need do flush, directly advance rec_scn ASSERT_EQ(false, mds_table.p_mds_table_base_->flushing_scn_.is_valid()); - ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(7))); + ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(7), mock_scn(7))); ASSERT_EQ(mock_scn(7), mds_table.p_mds_table_base_->flushing_scn_); mds_table.on_flush(mock_scn(7), OB_SUCCESS); - ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(8))); + ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(8), mock_scn(8))); ASSERT_EQ(false, mds_table.p_mds_table_base_->flushing_scn_.is_valid()); ASSERT_EQ(mock_scn(9), mds_table.p_mds_table_base_->rec_scn_); } diff --git a/unittest/storage/multi_data_source/test_mds_table_flush.cpp b/unittest/storage/multi_data_source/test_mds_table_flush.cpp index 4d96967dd2..8e35300690 100644 --- a/unittest/storage/multi_data_source/test_mds_table_flush.cpp +++ b/unittest/storage/multi_data_source/test_mds_table_flush.cpp @@ -9,12 +9,13 @@ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ - -#include "lib/utility/utility.h" -#include "storage/multi_data_source/compile_utility/mds_dummy_key.h" +#define UNITTEST_DEBUG +static bool MDS_FLUSHER_ALLOW_ALLOC = true; #include #define private public #define protected public +#include "lib/utility/utility.h" +#include "storage/multi_data_source/compile_utility/mds_dummy_key.h" #include "share/ob_ls_id.h" #include "storage/multi_data_source/mds_writer.h" #include @@ -37,12 +38,15 @@ #include #include "storage/multi_data_source/runtime_utility/mds_lock.h" #include "storage/tablet/ob_tablet_meta.h" +#include "storage/multi_data_source/mds_table_mgr.h" +#include "storage/ls/ob_ls.h" +#include "storage/multi_data_source/mds_table_handle.h" +#include "storage/multi_data_source/mds_table_order_flusher.h" namespace oceanbase { namespace storage { share::SCN MOCK_MAX_CONSEQUENT_CALLBACKED_SCN; -share::SCN MOCK_FLUSHING_SCN; namespace mds { @@ -67,7 +71,6 @@ int MdsTableBase::get_ls_max_consequent_callbacked_scn_(share::SCN &max_conseque int MdsTableBase::merge(const int64_t construct_sequence, const share::SCN &flushing_scn) { - MOCK_FLUSHING_SCN = flushing_scn; return OB_SUCCESS; } @@ -144,7 +147,6 @@ int construct_tested_mds_table(MdsTableHandle &handle) { } TEST_F(TestMdsTableFlush, normal_flush) { - MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(125);// 只转储一个node MdsTableHandle handle; ASSERT_EQ(OB_SUCCESS, construct_tested_mds_table(handle)); share::SCN rec_scn; @@ -152,7 +154,7 @@ TEST_F(TestMdsTableFlush, normal_flush) { ASSERT_EQ(mock_scn(50), rec_scn);// 没转储的时候是最小的node的redo scn值 // 第一次转储 - ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000)));// 因为max_decided_scn较小,所以会用125做flush + ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(125)));// 因为max_decided_scn较小,所以会用125做flush bool is_flusing = false; ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing));// 在flush流程中 ASSERT_EQ(true, is_flusing); @@ -163,14 +165,13 @@ TEST_F(TestMdsTableFlush, normal_flush) { return OB_SUCCESS; }, 0, true)); ASSERT_EQ(1, scan_cnt); - handle.on_flush(MOCK_FLUSHING_SCN, OB_SUCCESS); + handle.on_flush(mock_scn(125), OB_SUCCESS); ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn)); OCCAM_LOG(INFO, "print rec scn", K(rec_scn)); ASSERT_EQ(mock_scn(200), rec_scn); // 第二次转储 - MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(140);// 对这个MdsTable没有影响 - ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000))); + ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(140))); ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing)); ASSERT_EQ(false, is_flusing);// 没转储 ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn)); @@ -178,8 +179,7 @@ TEST_F(TestMdsTableFlush, normal_flush) { ASSERT_EQ(mock_scn(200), rec_scn);// 没变化 // 第三次转储 - MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(275);// 多转一个node - ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000))); + ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(275))); ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing));// 在flush流程中 ASSERT_EQ(true, is_flusing); ASSERT_EQ(mock_scn(249), handle.p_mds_table_base_->flushing_scn_); @@ -189,14 +189,13 @@ TEST_F(TestMdsTableFlush, normal_flush) { return OB_SUCCESS; }, 0, true)); ASSERT_EQ(1, scan_cnt); - handle.on_flush(MOCK_FLUSHING_SCN, OB_SUCCESS); + handle.on_flush(mock_scn(249), OB_SUCCESS); ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn)); OCCAM_LOG(INFO, "print rec scn", K(rec_scn)); ASSERT_EQ(mock_scn(250), rec_scn); // 第四次转储 - MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(550);// 都转下去 - ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000))); + ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(550))); ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing));// 在flush流程中 ASSERT_EQ(true, is_flusing); ASSERT_EQ(mock_scn(499), handle.p_mds_table_base_->flushing_scn_); @@ -206,14 +205,13 @@ TEST_F(TestMdsTableFlush, normal_flush) { return OB_SUCCESS; }, 0, true)); ASSERT_EQ(2, scan_cnt); - handle.on_flush(MOCK_FLUSHING_SCN, OB_SUCCESS); + handle.on_flush(mock_scn(499), OB_SUCCESS); ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn)); OCCAM_LOG(INFO, "print rec scn", K(rec_scn)); ASSERT_EQ(mock_scn(500), rec_scn); // 第五次转储 - MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(600);// 对这个MdsTable没有影响 - ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000))); + ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(600))); ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing)); ASSERT_EQ(false, is_flusing);// 没转储 ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn)); @@ -221,8 +219,7 @@ TEST_F(TestMdsTableFlush, normal_flush) { ASSERT_EQ(mock_scn(500), rec_scn);// 没变化 // 第六次转储 - MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(590);// 直接被过滤掉了 - ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000))); + ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(590))); ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing)); ASSERT_EQ(false, is_flusing);// 没转储 ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn)); diff --git a/unittest/storage/multi_data_source/test_mds_table_flusher.cpp b/unittest/storage/multi_data_source/test_mds_table_flusher.cpp new file mode 100644 index 0000000000..f74e33f224 --- /dev/null +++ b/unittest/storage/multi_data_source/test_mds_table_flusher.cpp @@ -0,0 +1,302 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#include "lib/future/ob_future.h" +#include +#include +#include +#include +#include +#include +#define UNITTEST_DEBUG +#define private public +#define protected public +#include "storage/multi_data_source/runtime_utility/common_define.h" +#include "storage/multi_data_source/mds_table_base.h" +#include "storage/multi_data_source/mds_table_mgr.h" +#include "storage/ls/ob_ls.h" +#include "storage/multi_data_source/mds_table_handle.h" +#include "storage/multi_data_source/mds_table_order_flusher.h" +using namespace std; + +oceanbase::common::ObPromise PROMISE1, PROMISE2; +vector V_ActualDoFlushKey; + +static std::atomic NEED_ALLOC_FAIL(false); +static std::atomic NEED_ALLOC_FAIL_AFTER_RESERVE(false); + +namespace oceanbase { +namespace logservice +{ +int ObLogHandler::get_max_decided_scn(share::SCN &scn) +{ + scn = unittest::mock_scn(500); + return OB_SUCCESS; +} +} +namespace storage +{ +namespace mds +{ +template <> +int MdsTableImpl::flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn) { + V_ActualDoFlushKey.push_back({tablet_id_, rec_scn_}); + return OB_SUCCESS; +} +void *MdsAllocator::alloc(const int64_t size) +{ + void *ptr = ob_malloc(size, "MDS"); + ATOMIC_INC(&alloc_times_); + MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt())); + return ptr; +} +void MdsAllocator::free(void *ptr) { + ATOMIC_INC(&free_times_); + MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt())); + ob_free(ptr); +} +void *MdsFlusherModulePageAllocator::alloc(const int64_t size, const ObMemAttr &attr) { + void *ret = nullptr; + if (!NEED_ALLOC_FAIL) { + ret = (NULL == allocator_) ? share::mtl_malloc(size, attr) : allocator_->alloc(size, attr); + } else { + MDS_LOG(DEBUG, "mock no memory", K(lbt())); + } + return ret; +} + +// template <> +// void MdsTableOrderFlusher::reserve_memory(int64_t mds_table_total_size_likely) {// it'ok if failed +// int ret = OB_SUCCESS; +// abort(); +// constexpr int64_t max_tablet_number = 100 * 10000;//sizeof(100w FlushKey) = 16MB +// int64_t reserve_size = std::min(mds_table_total_size_likely * 2, max_tablet_number); +// if (OB_FAIL(extra_mds_tables_.reserve(reserve_size))) { +// MDS_LOG(WARN, "fail to reserve memory", KR(ret)); +// array_err_ = ret; +// } +// if (NEED_ALLOC_FAIL_AFTER_RESERVE) { +// NEED_ALLOC_FAIL = true; +// OCCAM_LOG(INFO, "set PEOMISE1 and wait PROMISE2"); +// PROMISE1.set(); +// ObFuture future = PROMISE2.get_future(); +// future.wait(); +// } +// } +} +} +namespace unittest { + +using namespace common; +using namespace std; +using namespace storage; +using namespace mds; +using namespace transaction; + +static constexpr int64_t TEST_ALL_SIZE = FLUSH_FOR_ALL_SIZE * 10; + +class TestMdsTableFlush: public ::testing::Test +{ +public: + TestMdsTableFlush() {} + virtual ~TestMdsTableFlush() {} + virtual void SetUp() { V_ActualDoFlushKey.clear(); NEED_ALLOC_FAIL = false; NEED_ALLOC_FAIL_AFTER_RESERVE = false; } + virtual void TearDown() { } +private: + // disallow copy + DISALLOW_COPY_AND_ASSIGN(TestMdsTableFlush); +}; + +TEST_F(TestMdsTableFlush, flusher_for_some_order) { + FlusherForSome some; + for (int i = FLUSH_FOR_SOME_SIZE; i > 0; --i) { + some.record_mds_table({ObTabletID(i), mock_scn(i)}); + } + for (int i = 0; i < FLUSH_FOR_SOME_SIZE; ++i) { + ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[i].rec_scn_, mock_scn(i + 1)); + } + some.record_mds_table({ObTabletID(999), mock_scn(999)});// 没影响,被扔了 + for (int i = 0; i < FLUSH_FOR_SOME_SIZE; ++i) { + ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[i].rec_scn_, mock_scn(i + 1)); + } + some.record_mds_table({ObTabletID(0), mock_scn(0)});// 变成第一个,其余的往后挤 + for (int i = 0; i < FLUSH_FOR_SOME_SIZE; ++i) { + ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[i].rec_scn_, mock_scn(i)); + } + some.record_mds_table({ObTabletID(999), mock_scn(FLUSH_FOR_SOME_SIZE - 2)});// 最后一个有两份 + for (int i = 0; i < FLUSH_FOR_SOME_SIZE - 1; ++i) { + ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[i].rec_scn_, mock_scn(i)); + } + ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[FLUSH_FOR_SOME_SIZE - 1].rec_scn_, mock_scn(FLUSH_FOR_SOME_SIZE - 2)); +} + +TEST_F(TestMdsTableFlush, flusher_for_all_order_with_enough_memory) { + std::vector v_key; + for (int i = 0; i < TEST_ALL_SIZE; ++i) { + v_key.push_back({ObTabletID(100 + i), mock_scn(100 + i)}); + } + std::random_shuffle(v_key.begin(), v_key.end()); + ObLS ls; + ObMdsTableMgr mgr; + vector v; + ASSERT_EQ(mgr.init(&ls), OB_SUCCESS); + // 加入mgr的顺序是乱序的 + for (int i = 0; i < v_key.size(); ++i) { + MdsTableHandle mds_table; + ASSERT_EQ(OB_SUCCESS, mds_table.init(MdsAllocator::get_instance(), + v_key[i].tablet_id_, + share::ObLSID(1), + (ObTabletPointer*)0x111, + &mgr)); + MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_; + p_mds_table->rec_scn_ = v_key[i].rec_scn_; + v.push_back(mds_table); + } + ASSERT_EQ(OB_SUCCESS, mgr.flush(share::SCN::max_scn())); + ASSERT_EQ(TEST_ALL_SIZE, V_ActualDoFlushKey.size()); + for (int i = 0; i < V_ActualDoFlushKey.size(); ++i) { + if (V_ActualDoFlushKey[i].rec_scn_ != mock_scn(100 + i)) { + MDS_LOG(INFO, "DEBUG", K(V_ActualDoFlushKey[i].rec_scn_), K(i)); + } + ASSERT_EQ(V_ActualDoFlushKey[i].rec_scn_, mock_scn(100 + i)); + ASSERT_EQ(V_ActualDoFlushKey[i].tablet_id_, ObTabletID(100 + i)); + } +} + +TEST_F(TestMdsTableFlush, flusher_for_all_order_with_limitted_memory_reserve_fail) { + NEED_ALLOC_FAIL = true; + std::vector v_key; + for (int i = 0; i < TEST_ALL_SIZE; ++i) { + v_key.push_back({ObTabletID(100 + i), mock_scn(100 + i)}); + } + std::random_shuffle(v_key.begin(), v_key.end()); + ObLS ls; + ObMdsTableMgr mgr; + vector v; + ASSERT_EQ(mgr.init(&ls), OB_SUCCESS); + // 加入mgr的顺序是乱序的 + for (int i = 0; i < v_key.size(); ++i) { + MdsTableHandle mds_table; + ASSERT_EQ(OB_SUCCESS, mds_table.init(MdsAllocator::get_instance(), + v_key[i].tablet_id_, + share::ObLSID(1), + (ObTabletPointer*)0x111, + &mgr)); + MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_; + p_mds_table->rec_scn_ = v_key[i].rec_scn_; + v.push_back(mds_table); + } + ASSERT_EQ(OB_SUCCESS, mgr.flush(share::SCN::max_scn())); + ASSERT_EQ(TEST_ALL_SIZE + FLUSH_FOR_ALL_SIZE, V_ActualDoFlushKey.size());// 只保证最前面的TEST_ALL_SIZE的tablet是有序的,并且rec scn最小 + for (int i = 0; i < FLUSH_FOR_ALL_SIZE; ++i) { + if (V_ActualDoFlushKey[i].rec_scn_ != mock_scn(100 + i)) { + MDS_LOG(INFO, "DEBUG", K(V_ActualDoFlushKey[i].rec_scn_), K(i)); + } + ASSERT_EQ(V_ActualDoFlushKey[i].rec_scn_, mock_scn(100 + i)); + ASSERT_EQ(V_ActualDoFlushKey[i].tablet_id_, ObTabletID(100 + i)); + } +} + +TEST_F(TestMdsTableFlush, flusher_for_one) { + FlusherForOne one; + for (int i = FLUSH_FOR_SOME_SIZE; i > 0; --i) { + one.record_mds_table({ObTabletID(i), mock_scn(i)}); + } + ASSERT_EQ(one.min_key().rec_scn_, mock_scn(1)); +} + +// // release版本的逻辑重写不生效 +// TEST_F(TestMdsTableFlush, flusher_for_all_order_with_limitted_memory_reserve_success_but_push_back_fail) { +// NEED_ALLOC_FAIL_AFTER_RESERVE = true;// 只支持reserve的时候分配一次内存 +// const int64_t BIG_TEST_SIZE = 5 * TEST_ALL_SIZE; + +// std::vector v_key; +// for (int i = 0; i < BIG_TEST_SIZE; ++i) { +// v_key.push_back({ObTabletID(100 + i), mock_scn(100 + i)}); +// } +// std::random_shuffle(v_key.begin(), v_key.end()); +// ObLS ls; +// ObMdsTableMgr mgr; +// vector v; +// ASSERT_EQ(mgr.init(&ls), OB_SUCCESS); +// // 首先加入第一部分 +// for (int i = 0; i < TEST_ALL_SIZE; ++i) { +// MdsTableHandle mds_table; +// ASSERT_EQ(OB_SUCCESS, mds_table.init(MdsAllocator::get_instance(), +// v_key[i].tablet_id_, +// share::ObLSID(1), +// (ObTabletPointer*)0x111, +// &mgr)); +// MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_; +// p_mds_table->rec_scn_ = v_key[i].rec_scn_; +// v.push_back(mds_table); +// } + +// ASSERT_EQ(OB_SUCCESS, PROMISE1.init()); +// ASSERT_EQ(OB_SUCCESS, PROMISE2.init()); + +// // 开启一个新的线程做flush,模拟并发增加mds table的情况(以至于reverse的内存不够用了,过程中发生额外的内存分配,但失败的情况) +// std::thread t1([&](){ +// ASSERT_EQ(OB_SUCCESS, mgr.flush(share::SCN::max_scn())); +// }); + +// { +// OCCAM_LOG(INFO, "wait PEOMISE1"); +// ObFuture future = PROMISE1.get_future(); +// future.wait(); +// } + +// { +// for (int i = TEST_ALL_SIZE; i < v_key.size(); ++i) {// 继续再注册99 * TEST_ALL_SIZE的新mds table +// MdsTableHandle mds_table; +// ASSERT_EQ(OB_SUCCESS, mds_table.init(MdsAllocator::get_instance(), +// v_key[i].tablet_id_, +// share::ObLSID(1), +// (ObTabletPointer*)0x111, +// &mgr)); +// MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_; +// p_mds_table->rec_scn_ = v_key[i].rec_scn_; +// v.push_back(mds_table); +// } +// PROMISE2.set(); +// } + +// t1.join(); + +// ASSERT_EQ(BIG_TEST_SIZE + FLUSH_FOR_ALL_SIZE, V_ActualDoFlushKey.size());// 所有的mds table都经历了转储 +// for (int i = 0; i < FLUSH_FOR_ALL_SIZE; ++i) {// 但只保证栈上记录的tablet是有序的,并且rec scn最小 +// ASSERT_EQ(V_ActualDoFlushKey[i].rec_scn_, mock_scn(100 + i)); +// ASSERT_EQ(V_ActualDoFlushKey[i].tablet_id_, ObTabletID(100 + i)); +// } + +// } + +} +} + +int main(int argc, char **argv) +{ + system("rm -rf test_mds_table_flusher.log"); + oceanbase::common::ObLogger &logger = oceanbase::common::ObLogger::get_logger(); + logger.set_file_name("test_mds_table_flusher.log", false); + logger.set_log_level(OB_LOG_LEVEL_DEBUG); + testing::InitGoogleTest(&argc, argv); + int ret = RUN_ALL_TESTS(); + int64_t alloc_times = oceanbase::storage::mds::MdsAllocator::get_alloc_times(); + int64_t free_times = oceanbase::storage::mds::MdsAllocator::get_free_times(); + if (alloc_times != free_times) { + MDS_LOG(ERROR, "memory may leak", K(free_times), K(alloc_times)); + ret = -1; + } else { + MDS_LOG(INFO, "all memory released", K(free_times), K(alloc_times)); + } + return ret; +}