[MDS] mds table flush by order
This commit is contained in:
@ -172,8 +172,10 @@ void do_flush_mds_table(share::ObLSID ls_id, ObTabletID tablet_id)
|
||||
ObTabletPointer *tablet_pointer = pointer_handle.get_resource_ptr();
|
||||
// 4. 做flush动作
|
||||
mds::MdsTableHandle handle;
|
||||
share::SCN max_decided_scn;
|
||||
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_max_decided_scn(max_decided_scn));
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_pointer->get_mds_table(tablet_id, handle));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(share::SCN::max_scn()));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(share::SCN::max_scn(), max_decided_scn));
|
||||
ASSERT_EQ(true, handle.p_mds_table_base_->flushing_scn_.is_valid());
|
||||
// 5. 等flush完成
|
||||
share::SCN rec_scn = share::SCN::min_scn();
|
||||
|
@ -648,8 +648,10 @@ ob_set_subtarget(ob_storage memtable_mvcc
|
||||
ob_set_subtarget(ob_storage multi_data_source
|
||||
multi_data_source/buffer_ctx.cpp
|
||||
multi_data_source/mds_ctx.cpp
|
||||
multi_data_source/mds_for_each_map_flush_operation.cpp
|
||||
multi_data_source/mds_node.cpp
|
||||
multi_data_source/mds_table_mgr.cpp
|
||||
multi_data_source/mds_table_order_flusher.cpp
|
||||
multi_data_source/mds_table_base.cpp
|
||||
multi_data_source/mds_writer.cpp
|
||||
multi_data_source/mds_table_handler.cpp
|
||||
|
@ -0,0 +1,43 @@
|
||||
/**
|
||||
* Copyright (c) 2023 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "mds_for_each_map_flush_operation.h"
|
||||
#include "mds_table_base.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
|
||||
bool FlushOp::operator()(const ObTabletID &, MdsTableBase *&mds_table)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (mds_table->is_switched_to_empty_shell()) {
|
||||
MDS_LOG_RET(INFO, ret, "skip empty shell tablet mds_table flush",
|
||||
KPC(mds_table), K(scan_mds_table_cnt_), K_(max_consequent_callbacked_scn));
|
||||
} else if (OB_FAIL(mds_table->flush(do_flush_limit_scn_, max_consequent_callbacked_scn_))) {
|
||||
MDS_LOG_RET(WARN, ret, "flush mds table failed",
|
||||
KR(ret), KPC(mds_table), K_(scan_mds_table_cnt), K_(max_consequent_callbacked_scn));
|
||||
if (OB_SIZE_OVERFLOW == ret) {
|
||||
is_dag_full_ = true;
|
||||
}
|
||||
} else {
|
||||
++scan_mds_table_cnt_;
|
||||
}
|
||||
return !is_dag_full_;// true means iterating the next mds table
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
/**
|
||||
* Copyright (c) 2023 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef STORAGE_MULTI_DATA_SOURCE_MDS_FOR_EACH_MAP_FLUSH_OPERATION_H
|
||||
#define STORAGE_MULTI_DATA_SOURCE_MDS_FOR_EACH_MAP_FLUSH_OPERATION_H
|
||||
#include "share/scn.h"
|
||||
#include "common/ob_tablet_id.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
class MdsTableBase;
|
||||
struct FlushOp {
|
||||
FlushOp(share::SCN do_flush_limit_scn, int64_t &scan_mds_table_cnt, share::SCN max_consequent_callbacked_scn)
|
||||
: do_flush_limit_scn_(do_flush_limit_scn),
|
||||
scan_mds_table_cnt_(scan_mds_table_cnt),
|
||||
max_consequent_callbacked_scn_(max_consequent_callbacked_scn),
|
||||
is_dag_full_(false) {}
|
||||
bool operator()(const ObTabletID &, MdsTableBase *&mds_table);
|
||||
bool dag_full() const { return is_dag_full_; }
|
||||
share::SCN do_flush_limit_scn_;
|
||||
int64_t &scan_mds_table_cnt_;
|
||||
share::SCN max_consequent_callbacked_scn_;
|
||||
bool is_dag_full_;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
@ -165,7 +165,7 @@ public:
|
||||
int64_t get_node_cnt() const;
|
||||
virtual share::SCN get_rec_scn();
|
||||
virtual int operate(const ObFunction<int(MdsTableBase &)> &operation) = 0;
|
||||
virtual int flush(share::SCN need_advanced_rec_scn_lower_limit) = 0;
|
||||
virtual int flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn) = 0;
|
||||
virtual ObTabletID get_tablet_id() const;
|
||||
virtual bool is_flushing() const;
|
||||
virtual int fill_virtual_info(ObIArray<MdsNodeInfoForVirtualTable> &mds_node_info_array) const = 0;
|
||||
|
@ -102,7 +102,7 @@ public:
|
||||
int for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump(DUMP_OP &&for_each_op,
|
||||
const int64_t mds_construct_sequence,
|
||||
const bool for_flush) const;
|
||||
int flush(share::SCN need_advanced_rec_scn_lower_limit);
|
||||
int flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn);
|
||||
int is_flushing(bool &is_flushing) const;
|
||||
void on_flush(const share::SCN &flush_scn, const int flush_ret);
|
||||
int try_recycle(const share::SCN &recycle_scn);// release nodes
|
||||
|
@ -653,13 +653,13 @@ int MdsTableHandle::for_each_unit_from_small_key_to_big_from_old_node_to_new_to_
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline int MdsTableHandle::flush(share::SCN need_advanced_rec_scn_lower_limit)
|
||||
inline int MdsTableHandle::flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// return ret;// FIXME: for lixia test, will block CLOG recycle
|
||||
#ifndef TEST_MDS_TRANSACTION
|
||||
CHECK_MDS_TABLE_INIT();
|
||||
ret = p_mds_table_base_->flush(need_advanced_rec_scn_lower_limit);
|
||||
ret = p_mds_table_base_->flush(need_advanced_rec_scn_lower_limit, max_decided_scn);
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
@ -140,10 +140,8 @@ public:
|
||||
const int64_t mds_construct_sequence,
|
||||
const bool for_flush) const override;
|
||||
virtual int operate(const ObFunction<int(MdsTableBase &)> &operation) override;
|
||||
int calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN need_advanced_rec_scn_lower_limit,
|
||||
share::SCN &flush_scn,
|
||||
int64_t &need_dumped_nodes_cnt);
|
||||
virtual int flush(share::SCN need_advanced_rec_scn_lower_limit) override;
|
||||
int calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN &flush_scn, int64_t &need_dumped_nodes_cnt);
|
||||
virtual int flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn) override;
|
||||
void on_flush_(const share::SCN &flush_scn, const int flush_ret);
|
||||
virtual void on_flush(const share::SCN &flush_scn, const int flush_ret) override;
|
||||
virtual int try_recycle(const share::SCN recycle_scn) override;
|
||||
|
@ -921,60 +921,39 @@ struct CountUnDumpdedNodesBelowDoFlushScn// To filter unnecessary flush operatio
|
||||
const share::SCN &do_flush_scn_;
|
||||
};
|
||||
template <typename MdsTableType>
|
||||
int MdsTableImpl<MdsTableType>::calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN need_advanced_rec_scn_lower_limit,
|
||||
share::SCN &do_flush_scn,
|
||||
int64_t &need_dumped_nodes_cnt)
|
||||
int MdsTableImpl<MdsTableType>::calculate_flush_scn_and_need_dumped_nodes_cnt_(share::SCN &do_flush_scn, int64_t &need_dumped_nodes_cnt)
|
||||
{
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(need_advanced_rec_scn_lower_limit), K(do_flush_scn), \
|
||||
K(need_dumped_nodes_cnt)
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(do_flush_scn), K(need_dumped_nodes_cnt)
|
||||
MDS_TG(100_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
share::SCN calculated_flush_scn;
|
||||
share::SCN ls_max_consequent_callbacked_scn = share::SCN::max_scn();
|
||||
#ifndef UNITTEST_DEBUG
|
||||
if (MDS_FAIL(get_ls_max_consequent_callbacked_scn_(ls_max_consequent_callbacked_scn))) {
|
||||
MDS_LOG_FLUSH(WARN, "fail to get ls_max_consequent_callbacked_scn");
|
||||
} else if (ls_max_consequent_callbacked_scn.is_max()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_FLUSH(ERROR, "invalid ls max consequent callbacked scn");
|
||||
} else {
|
||||
do_flush_scn = ls_max_consequent_callbacked_scn;
|
||||
}
|
||||
#else
|
||||
do_flush_scn = need_advanced_rec_scn_lower_limit;
|
||||
#endif
|
||||
if (OB_SUCC(ret)) {
|
||||
RecalculateFlushScnCauseOnlySuppportDumpCommittedNodeOP op1(do_flush_scn);// recalculate flush scn
|
||||
CountUnDumpdedNodesBelowDoFlushScn op2(need_dumped_nodes_cnt, do_flush_scn);// count nodes need dump
|
||||
if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCUALTE_FLUSH_SCN, op1))) {
|
||||
MDS_LOG_FLUSH(WARN, "for each to calculate flush scn failed");
|
||||
} else if (MDS_FAIL(for_each_scan_row(FowEachRowAction::COUNT_NODES_BEFLOW_FLUSH_SCN, op2))) {
|
||||
MDS_LOG_FLUSH(WARN, "for each to count undumped nodes failed");
|
||||
}
|
||||
}
|
||||
RecalculateFlushScnCauseOnlySuppportDumpCommittedNodeOP op1(do_flush_scn);// recalculate flush scn
|
||||
CountUnDumpdedNodesBelowDoFlushScn op2(need_dumped_nodes_cnt, do_flush_scn);// count nodes need dump
|
||||
if (MDS_FAIL(for_each_scan_row(FowEachRowAction::CALCUALTE_FLUSH_SCN, op1))) {
|
||||
MDS_LOG_FLUSH(WARN, "for each to calculate flush scn failed");
|
||||
} else if (MDS_FAIL(for_each_scan_row(FowEachRowAction::COUNT_NODES_BEFLOW_FLUSH_SCN, op2))) {
|
||||
MDS_LOG_FLUSH(WARN, "for each to count undumped nodes failed");
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename MdsTableType>
|
||||
int MdsTableImpl<MdsTableType>::flush(share::SCN need_advanced_rec_scn_lower_limit)
|
||||
int MdsTableImpl<MdsTableType>::flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn)
|
||||
{// if rec_scn below this limit, need generate dag and try hard to advance it
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(need_advanced_rec_scn_lower_limit), K(do_flush_scn), K(undump_node_cnt)
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(need_advanced_rec_scn_lower_limit), K(do_flush_scn), K(max_decided_scn), K(undump_node_cnt)
|
||||
MDS_TG(100_ms);
|
||||
int ret = OB_SUCCESS;
|
||||
share::SCN do_flush_scn;// this scn is defined for calculation
|
||||
share::SCN do_flush_scn = max_decided_scn;// this scn is defined for calculation
|
||||
int64_t undump_node_cnt = 0;
|
||||
MdsWLockGuard lg(lock_);
|
||||
if (!need_advanced_rec_scn_lower_limit.is_valid()) {
|
||||
if (!need_advanced_rec_scn_lower_limit.is_valid() || !max_decided_scn.is_valid() || max_decided_scn.is_max()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG_FLUSH(WARN, "invalid recycle scn");
|
||||
} else if (get_rec_scn().is_max()) {
|
||||
MDS_LOG_FLUSH(TRACE, "no need do flush cause rec_scn is MAX already");
|
||||
} else if (need_advanced_rec_scn_lower_limit < get_rec_scn()) {// no need dump this mds table to advance rec_scn
|
||||
MDS_LOG_FLUSH(TRACE, "no need do flush need_advanced_rec_scn_lower_limit less than rec_scn");
|
||||
} else if (MDS_FAIL(calculate_flush_scn_and_need_dumped_nodes_cnt_(need_advanced_rec_scn_lower_limit,
|
||||
do_flush_scn,
|
||||
undump_node_cnt))) {
|
||||
} else if (MDS_FAIL(calculate_flush_scn_and_need_dumped_nodes_cnt_(do_flush_scn, undump_node_cnt))) {
|
||||
MDS_LOG_FLUSH(WARN, "fail to call calculate_flush_scn_and_need_dumped_nodes_cnt_");
|
||||
} else if (undump_node_cnt == 0) {// no need do flush actually
|
||||
// mds_ckpt_scn on tablet won't be advanced,
|
||||
@ -985,10 +964,12 @@ int MdsTableImpl<MdsTableType>::flush(share::SCN need_advanced_rec_scn_lower_lim
|
||||
} else {
|
||||
#ifndef UNITTEST_DEBUG
|
||||
if (MDS_FAIL(merge(construct_sequence_, do_flush_scn))) {
|
||||
if (OB_EAGAIN == ret || OB_SIZE_OVERFLOW == ret) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
if (REACH_TIME_INTERVAL(100_ms)) {
|
||||
MDS_LOG_FLUSH(WARN, "failed to commit merge mds table dag cause already exist or queue already full");
|
||||
MDS_LOG_FLUSH(WARN, "failed to commit merge mds table dag cause already exist");
|
||||
ret = OB_SUCCESS;
|
||||
} else if (OB_SIZE_OVERFLOW == ret) {// throw out
|
||||
MDS_LOG_FLUSH(WARN, "failed to commit merge mds table dag cause queue already full");
|
||||
}
|
||||
} else {
|
||||
MDS_LOG_FLUSH(WARN, "failed to commit merge mds table dag");
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include "lib/ob_errno.h"
|
||||
#include "mds_ctx.h"
|
||||
#include "ob_tablet_id.h"
|
||||
#include "storage/multi_data_source/mds_table_order_flusher.h"
|
||||
#define USING_LOG_PREFIX MDS
|
||||
|
||||
#include "mds_table_mgr.h"
|
||||
@ -135,62 +136,21 @@ void ObMdsTableMgr::unregister_from_removed_mds_table_recorder(MdsTableBase *p_m
|
||||
}
|
||||
}
|
||||
|
||||
int ObMdsTableMgr::first_scan_to_get_min_rec_scn_(share::SCN &min_rec_scn, ObIArray<ObTabletID> &min_rec_scn_ids)
|
||||
{
|
||||
MDS_TG(10_s);
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t scan_cnt = 0;
|
||||
auto get_min_rec_scn_op =
|
||||
[&min_rec_scn, &scan_cnt, &min_rec_scn_ids](const common::ObTabletID &, MdsTableBase *&mds_table) {
|
||||
if (!mds_table->is_switched_to_empty_shell()) {
|
||||
share::SCN rec_scn = mds_table->get_rec_scn();
|
||||
if (rec_scn == min_rec_scn) {
|
||||
if (min_rec_scn_ids.count() < 128) {
|
||||
(void) min_rec_scn_ids.push_back(mds_table->get_tablet_id());
|
||||
}
|
||||
} else if (rec_scn < min_rec_scn) {
|
||||
min_rec_scn = rec_scn;
|
||||
min_rec_scn_ids.reset();
|
||||
(void) min_rec_scn_ids.push_back(mds_table->get_tablet_id());
|
||||
}
|
||||
++scan_cnt;
|
||||
template <typename Flusher>
|
||||
struct OrderOp {
|
||||
OrderOp(Flusher &flusher) : flusher_(flusher) {}
|
||||
int operator()(MdsTableBase &mds_table) {
|
||||
if (!mds_table.is_switched_to_empty_shell()) {
|
||||
share::SCN rec_scn = mds_table.get_rec_scn();
|
||||
flusher_.record_mds_table({mds_table.get_tablet_id(), rec_scn});
|
||||
}
|
||||
return true;// true means iterating the next mds table
|
||||
};
|
||||
if (OB_FAIL(mds_table_map_.for_each(get_min_rec_scn_op))) {
|
||||
MDS_LOG(WARN, "fail to do map for_each", KR(ret), K(*this), K(scan_cnt), K(min_rec_scn), K(min_rec_scn_ids));
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMdsTableMgr::second_scan_to_do_flush_(share::SCN do_flush_limit_scn)
|
||||
{
|
||||
MDS_TG(10_s);
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t scan_mds_table_cnt = 0;
|
||||
auto flush_op = [do_flush_limit_scn, &scan_mds_table_cnt](const common::ObTabletID &tablet_id,
|
||||
MdsTableBase *&mds_table) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (mds_table->is_switched_to_empty_shell()) {
|
||||
MDS_LOG_RET(INFO, ret, "skip empty shell tablet mds_table flush", K(tablet_id), K(scan_mds_table_cnt));
|
||||
} else if (OB_TMP_FAIL(mds_table->flush(do_flush_limit_scn))) {
|
||||
MDS_LOG_RET(WARN, ret, "flush mds table failed", KR(tmp_ret), K(tablet_id), K(scan_mds_table_cnt));
|
||||
} else {
|
||||
++scan_mds_table_cnt;
|
||||
}
|
||||
return true;// true means iterating the next mds table
|
||||
};
|
||||
if (OB_FAIL(mds_table_map_.for_each(flush_op))) {
|
||||
MDS_LOG(WARN, "fail to do map for_each", KR(ret), K(*this), K(scan_mds_table_cnt), K(do_flush_limit_scn));
|
||||
} else {
|
||||
MDS_LOG(INFO, "success to do second scan to do flush", KR(ret), K(*this), K(scan_mds_table_cnt), K(do_flush_limit_scn));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
Flusher &flusher_;
|
||||
};
|
||||
int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze)
|
||||
{
|
||||
#define PRINT_WRAPPER KR(ret), K(ls_->get_ls_id()), K(recycle_scn), K(need_freeze), K(min_rec_scn),\
|
||||
#define PRINT_WRAPPER KR(ret), K(ls_->get_ls_id()), K(recycle_scn), K(need_freeze), K(order_flusher_for_some),\
|
||||
K(max_consequent_callbacked_scn), K(*this)
|
||||
MDS_TG(10_s);
|
||||
int ret = OB_SUCCESS;
|
||||
@ -198,26 +158,25 @@ int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze)
|
||||
freeze_guard.init(this);
|
||||
int64_t flushed_table_cnt = 0;
|
||||
MdsTableHandle *flushing_mds_table = nullptr;
|
||||
share::SCN min_rec_scn = share::SCN::max_scn();
|
||||
share::SCN max_consequent_callbacked_scn;
|
||||
ObSEArray<ObTabletID, 10> min_rec_scn_tablet_ids;
|
||||
FlusherForSome order_flusher_for_some;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
MDS_LOG_FREEZE(ERROR, "mds table mgr not inited");
|
||||
} else if (!freeze_guard.can_freeze()) {
|
||||
MDS_LOG_FREEZE(INFO, "mds table mgr is doing flush, skip flush once");
|
||||
} else if (MDS_FAIL(first_scan_to_get_min_rec_scn_(min_rec_scn, min_rec_scn_tablet_ids))) {
|
||||
MDS_LOG_FREEZE(WARN, "do first_scan_to_get_min_rec_scn_ failed");
|
||||
} else if (min_rec_scn == share::SCN::max_scn()) {// no mds table
|
||||
} else if (MDS_FAIL(for_each_in_t3m_mds_table(OrderOp<FlusherForSome>(order_flusher_for_some)))) {
|
||||
MDS_LOG_FREEZE(WARN, "do first scan failed");
|
||||
} else if (order_flusher_for_some.empty()) {// no mds table
|
||||
MDS_LOG_FREEZE(INFO, "no valid mds table there, no need do flush");
|
||||
} else if (MDS_FAIL(ls_->get_freezer()->get_max_consequent_callbacked_scn(max_consequent_callbacked_scn))) {
|
||||
} else if (MDS_FAIL(ls_->get_max_decided_scn(max_consequent_callbacked_scn))) {
|
||||
MDS_LOG_FREEZE(WARN, "fail to get max_consequent_callbacked_scn", KR(ret), K(*this));
|
||||
} else if (!max_consequent_callbacked_scn.is_valid() || max_consequent_callbacked_scn.is_max() || max_consequent_callbacked_scn.is_min()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
MDS_LOG_FREEZE(WARN, "invalid max_consequent_callbacked_scn", KR(ret), K(*this));
|
||||
} else {
|
||||
if (need_freeze) {
|
||||
if (need_freeze) {// need advance freezing scn
|
||||
if (recycle_scn.is_max() || !recycle_scn.is_valid()) {
|
||||
recycle_scn = max_consequent_callbacked_scn;
|
||||
}
|
||||
@ -230,12 +189,8 @@ int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze)
|
||||
freezing_scn_ = max_consequent_callbacked_scn;
|
||||
MDS_LOG_FREEZE(INFO, "freezing_scn decline to max_consequent_callbacked_scn");
|
||||
}
|
||||
if (min_rec_scn <= freezing_scn_) {
|
||||
if (MDS_FAIL(second_scan_to_do_flush_(freezing_scn_))) {
|
||||
MDS_LOG_FREEZE(WARN, "fail to do flush");
|
||||
} else {
|
||||
MDS_LOG_FREEZE(INFO, "success to do flush");
|
||||
}
|
||||
if (order_flusher_for_some.min_key().rec_scn_ <= freezing_scn_) {
|
||||
order_flush_(order_flusher_for_some, freezing_scn_, max_consequent_callbacked_scn);
|
||||
} else {
|
||||
MDS_LOG_FREEZE(INFO, "no need do flush cause min_rec_scn is larger than freezing scn");
|
||||
}
|
||||
@ -244,6 +199,41 @@ int ObMdsTableMgr::flush(SCN recycle_scn, bool need_freeze)
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
void ObMdsTableMgr::order_flush_(FlusherForSome &order_flusher_for_some,
|
||||
share::SCN freezing_scn,
|
||||
share::SCN max_consequent_callbacked_scn)
|
||||
{
|
||||
#define PRINT_WRAPPER KR(ret), K(ls_->get_ls_id()), K(freezing_scn), K(order_flusher_for_some),\
|
||||
K(third_sacn_mds_table_cnt), K(max_consequent_callbacked_scn), K(order_flusher_for_all.count()),\
|
||||
K(*this)
|
||||
MDS_TG(10_s);
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t third_sacn_mds_table_cnt = 0;
|
||||
FlusherForAll order_flusher_for_all;
|
||||
FlushOp flush_op(freezing_scn, third_sacn_mds_table_cnt, max_consequent_callbacked_scn);
|
||||
if (!order_flusher_for_some.full() || order_flusher_for_some.max_key().rec_scn_ > freezing_scn_) {
|
||||
// than means all mds_tables needed be flushed is included in order_flusher_for_some
|
||||
order_flusher_for_some.flush_by_order(mds_table_map_, freezing_scn_, max_consequent_callbacked_scn);
|
||||
MDS_LOG_FREEZE(INFO, "flush all mds_tables(little number) by total order");
|
||||
} else {
|
||||
order_flusher_for_all.reserve_memory(mds_table_map_.count());
|
||||
if (MDS_FAIL(for_each_in_t3m_mds_table(OrderOp<FlusherForAll>(order_flusher_for_all)))) {// second scan
|
||||
MDS_LOG_FREEZE(WARN, "do scan failed");
|
||||
}
|
||||
order_flusher_for_all.flush_by_order(mds_table_map_, freezing_scn_, max_consequent_callbacked_scn);
|
||||
if (order_flusher_for_all.incomplete()) {// need do third scan
|
||||
if (MDS_FAIL(mds_table_map_.for_each(flush_op))) {// third scan
|
||||
MDS_LOG_FREEZE(WARN, "do scan failed");
|
||||
} else {
|
||||
MDS_LOG_FREEZE(INFO, "flush some mds_tables by order, and others out of order");
|
||||
}
|
||||
} else {
|
||||
MDS_LOG_FREEZE(INFO, "flush all mds_tables(large number) by total order");
|
||||
}
|
||||
}
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
SCN ObMdsTableMgr::get_freezing_scn() const { return freezing_scn_.atomic_get(); }
|
||||
|
||||
SCN ObMdsTableMgr::get_rec_scn()
|
||||
@ -252,35 +242,29 @@ SCN ObMdsTableMgr::get_rec_scn()
|
||||
return get_rec_scn(tablet_id);
|
||||
}
|
||||
|
||||
share::SCN ObMdsTableMgr::get_rec_scn(ObTabletID &tablet_id)
|
||||
SCN ObMdsTableMgr::get_rec_scn(ObTabletID &tablet_id)
|
||||
{
|
||||
#define PRINT_WRAPPER KR(ret), K(min_rec_scn), K(order_flusher_for_one), K(*this)
|
||||
MDS_TG(1_s);
|
||||
int ret = OB_SUCCESS;
|
||||
SCN min_rec_scn = share::SCN::max_scn();
|
||||
ObSEArray<ObTabletID, 10> min_rec_scn_tablet_ids;
|
||||
|
||||
FlusherForOne order_flusher_for_one;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
MDS_LOG(ERROR, "regsiter mds table failed", KR(ret));
|
||||
} else if (MDS_FAIL(first_scan_to_get_min_rec_scn_(min_rec_scn, min_rec_scn_tablet_ids))) {
|
||||
min_rec_scn = SCN::min_scn();
|
||||
MDS_LOG(WARN, "fail to scan get min_rec_scn", KR(ret), K(min_rec_scn), K(min_rec_scn_tablet_ids), K(*this));
|
||||
} else if (MDS_FAIL(for_each_in_t3m_mds_table(OrderOp<FlusherForOne>(order_flusher_for_one)))) {
|
||||
min_rec_scn.set_min();
|
||||
MDS_LOG_FREEZE(WARN, "do first scan failed");
|
||||
} else if (order_flusher_for_one.empty()) {// no mds table
|
||||
MDS_LOG_FREEZE(INFO, "no valid mds table there, return MAX SCN");
|
||||
} else {
|
||||
if (!min_rec_scn_tablet_ids.empty()) {
|
||||
tablet_id = min_rec_scn_tablet_ids.at(0);
|
||||
}
|
||||
MDS_LOG(INFO, "get rec_scn from MdsTableMgr", KR(ret), K(min_rec_scn), K(min_rec_scn_tablet_ids), K(*this));
|
||||
FlushKey key = order_flusher_for_one.min_key();
|
||||
min_rec_scn = key.rec_scn_;
|
||||
tablet_id = key.tablet_id_;
|
||||
MDS_LOG_FREEZE(INFO, "get rec_scn from MdsTableMgr");
|
||||
}
|
||||
return min_rec_scn;
|
||||
}
|
||||
|
||||
DEF_TO_STRING(ObMdsTableMgr)
|
||||
{
|
||||
int64_t pos = 0;
|
||||
J_OBJ_START();
|
||||
J_KV(KP(this), K_(is_inited), K_(freezing_scn), KPC_(ls));
|
||||
J_OBJ_END();
|
||||
return pos;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
int MdsTableFreezeGuard::init(ObMdsTableMgr *mds_mgr)
|
||||
@ -341,6 +325,15 @@ int MdsTableMgrHandle::set_mds_table_mgr(ObMdsTableMgr *mds_table_mgr)
|
||||
return ret;
|
||||
}
|
||||
|
||||
DEF_TO_STRING(ObMdsTableMgr)
|
||||
{
|
||||
int64_t pos = 0;
|
||||
J_OBJ_START();
|
||||
J_KV(KP(this), K_(is_inited), K_(freezing_scn), KPC_(ls));
|
||||
J_OBJ_END();
|
||||
return pos;
|
||||
}
|
||||
|
||||
} // namespace mds
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
@ -20,6 +20,7 @@
|
||||
#include "storage/checkpoint/ob_common_checkpoint.h"
|
||||
#include "storage/multi_data_source/runtime_utility/list_helper.h"
|
||||
#include "lib/hash/ob_linear_hash_map.h"
|
||||
#include "mds_table_order_flusher.h"
|
||||
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
@ -51,7 +52,6 @@ private:
|
||||
|
||||
class ObMdsTableMgr final : public checkpoint::ObCommonCheckpoint
|
||||
{
|
||||
using MdsTableMap = common::ObLinearHashMap<common::ObTabletID, MdsTableBase*>;
|
||||
friend MdsTableFreezeGuard;
|
||||
public:
|
||||
ObMdsTableMgr()
|
||||
@ -63,6 +63,7 @@ public:
|
||||
mds_table_map_(),
|
||||
removed_mds_table_recorder_() {}
|
||||
~ObMdsTableMgr() { destroy(); }
|
||||
DECLARE_TO_STRING;
|
||||
|
||||
int init(ObLS *ls);
|
||||
int reset();
|
||||
@ -84,19 +85,25 @@ public:
|
||||
removed_mds_table_recorder_.for_each(op_wrapper);
|
||||
return ret;
|
||||
}
|
||||
template <typename OP, ENABLE_IF_LIKE_FUNCTION(OP, int(MdsTableBase &))>// if op return FAIL, break for-each
|
||||
int for_each_in_t3m_mds_table(OP &&op) {
|
||||
auto op_wrapper = [&op](const common::ObTabletID &k, MdsTableBase* &v) -> bool {
|
||||
template <typename OP>
|
||||
struct OpWrapper {
|
||||
template <typename T>
|
||||
OpWrapper(T &&op) : op_(op) {}
|
||||
bool operator()(const common::ObTabletID &k, MdsTableBase* &v) {
|
||||
bool keep_iterating = true;// means keep iterating next mds_table
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(op(*v))) {
|
||||
if (OB_FAIL(op_(*v))) {
|
||||
keep_iterating = false;
|
||||
}
|
||||
return keep_iterating;
|
||||
};
|
||||
return mds_table_map_.for_each(op_wrapper);
|
||||
}
|
||||
OP op_;
|
||||
};
|
||||
template <typename OP, ENABLE_IF_LIKE_FUNCTION(OP, int(MdsTableBase &))>// if op return FAIL, break for-each
|
||||
int for_each_in_t3m_mds_table(OP &&op) {
|
||||
OpWrapper<OP> wrapper(std::forward<OP>(op));
|
||||
return mds_table_map_.for_each(wrapper);
|
||||
}
|
||||
DECLARE_TO_STRING;
|
||||
|
||||
public: // derived from ObCommonCheckpoint
|
||||
share::SCN get_freezing_scn() const;
|
||||
@ -112,9 +119,9 @@ public: // getter and setter
|
||||
int64_t get_ref() { return ATOMIC_LOAD(&ref_cnt_); }
|
||||
|
||||
private:
|
||||
int first_scan_to_get_min_rec_scn_(share::SCN &min_rec_scn, ObIArray<ObTabletID> &min_rec_scn_ids);
|
||||
int second_scan_to_do_flush_(share::SCN min_rec_scn);
|
||||
|
||||
void order_flush_(FlusherForSome &order_flusher_for_some,
|
||||
share::SCN freezing_scn,
|
||||
share::SCN max_consequent_callbacked_scn);
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool is_freezing_;
|
||||
|
31
src/storage/multi_data_source/mds_table_order_flusher.cpp
Normal file
31
src/storage/multi_data_source/mds_table_order_flusher.cpp
Normal file
@ -0,0 +1,31 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "mds_table_order_flusher.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
|
||||
// define in cpp file is for rewrire this function in unittest
|
||||
void *MdsFlusherModulePageAllocator::alloc(const int64_t size, const ObMemAttr &attr) {
|
||||
void *ret = nullptr;
|
||||
ret = (NULL == allocator_) ? share::mtl_malloc(size, attr) : allocator_->alloc(size, attr);
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
135
src/storage/multi_data_source/mds_table_order_flusher.h
Normal file
135
src/storage/multi_data_source/mds_table_order_flusher.h
Normal file
@ -0,0 +1,135 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H
|
||||
#define SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H
|
||||
|
||||
#include "ob_tablet_id.h"
|
||||
#include "share/scn.h"
|
||||
#include "runtime_utility/common_define.h"
|
||||
#include "deps/oblib/src/lib/allocator/page_arena.h"
|
||||
#include "lib/ob_errno.h"
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
#include "meta_programming/ob_type_traits.h"
|
||||
#include "lib/hash/ob_linear_hash_map.h"
|
||||
#include "deps/oblib/src/lib/container/ob_array_iterator.h"
|
||||
#include "mds_for_each_map_flush_operation.h"
|
||||
#include <algorithm>
|
||||
#include <exception>
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
|
||||
struct MdsFlusherModulePageAllocator : public ModulePageAllocator {
|
||||
// just forward to parent
|
||||
MdsFlusherModulePageAllocator(const lib::ObLabel &label = "MdsFlusherArray",
|
||||
int64_t tenant_id = OB_SERVER_TENANT_ID,
|
||||
int64_t ctx_id = 0)
|
||||
: ModulePageAllocator(ObMemAttr(tenant_id, label, ctx_id)) {}
|
||||
MdsFlusherModulePageAllocator(const lib::ObMemAttr &attr) : ModulePageAllocator("MdsFlusherArray") {}
|
||||
explicit MdsFlusherModulePageAllocator(ObIAllocator &allocator,
|
||||
const lib::ObLabel &label = "MdsFlusherArray")
|
||||
: ModulePageAllocator(allocator, label) {}
|
||||
virtual ~MdsFlusherModulePageAllocator() {}
|
||||
// just change ob_malloc to ob_tenant_malloc
|
||||
void *alloc(const int64_t size) { return ModulePageAllocator::alloc(size); }
|
||||
void *alloc(const int64_t size, const ObMemAttr &attr);
|
||||
void free(void *p) {
|
||||
(NULL == allocator_) ? share::mtl_free(p) : allocator_->free(p); p = NULL;
|
||||
}
|
||||
};
|
||||
|
||||
struct FlushKey// 16 bytes
|
||||
{
|
||||
FlushKey()
|
||||
: tablet_id_(),
|
||||
rec_scn_() {}
|
||||
FlushKey(common::ObTabletID tablet_id, share::SCN rec_scn)
|
||||
: tablet_id_(tablet_id), rec_scn_(rec_scn) {}
|
||||
bool operator<(const FlushKey &rhs) const { return rec_scn_ < rhs.rec_scn_; }
|
||||
bool operator==(const FlushKey &rhs) const { return rec_scn_ == rhs.rec_scn_; }
|
||||
bool is_valid() const { return tablet_id_.is_valid() && rec_scn_.is_valid(); }
|
||||
TO_STRING_KV(K_(tablet_id), K_(rec_scn));
|
||||
common::ObTabletID tablet_id_;
|
||||
share::SCN rec_scn_;
|
||||
};
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
struct MdsTableHighPriorityFlusher {
|
||||
MdsTableHighPriorityFlusher() : size_(0) {}
|
||||
bool try_record_mds_table(FlushKey new_key, FlushKey &eliminated_key);
|
||||
void flush_by_order(MdsTableMap &map, share::SCN limit, share::SCN max_decided_scn);
|
||||
int64_t to_string(char *buf, const int64_t len) const;
|
||||
int64_t size_;
|
||||
FlushKey high_priority_mds_tables_[STACK_QUEUE_SIZE];
|
||||
};
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE, bool ORDER_ALL>
|
||||
struct MdsTableOrderFlusher;
|
||||
|
||||
// 1. if memory is enough, flush mds table by strict ASC rec_scn order.
|
||||
// 2. if memory is not enough, flush at least STACK_QUEUE_SIZE smallest mds table by strict ASC rec_scn order.
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
struct MdsTableOrderFlusher<STACK_QUEUE_SIZE, true>
|
||||
{
|
||||
MdsTableOrderFlusher()
|
||||
: array_err_(OB_SUCCESS), high_priority_flusher_(), extra_mds_tables_() {}
|
||||
void reserve_memory(int64_t mds_table_total_size_likely);
|
||||
void record_mds_table(FlushKey key);
|
||||
void flush_by_order(MdsTableMap &map, share::SCN limit, share::SCN max_decided_scn);
|
||||
int64_t count() const { return high_priority_flusher_.size_ + extra_mds_tables_.count(); }
|
||||
bool incomplete() const { return array_err_ != OB_SUCCESS; }
|
||||
TO_STRING_KV(K_(array_err), K_(high_priority_flusher));
|
||||
int array_err_;
|
||||
MdsTableHighPriorityFlusher<STACK_QUEUE_SIZE> high_priority_flusher_;
|
||||
ObArray<FlushKey, MdsFlusherModulePageAllocator> extra_mds_tables_;
|
||||
};
|
||||
|
||||
// just record stack number items, and won't failed
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
struct MdsTableOrderFlusher<STACK_QUEUE_SIZE, false>
|
||||
{
|
||||
MdsTableOrderFlusher()
|
||||
: high_priority_flusher_() {}
|
||||
void record_mds_table(FlushKey key);
|
||||
void flush_by_order(MdsTableMap &map, share::SCN limit, share::SCN max_decided_scn);
|
||||
bool empty() const { return high_priority_flusher_.size_ == 0; }
|
||||
bool full() const { return high_priority_flusher_.size_ == STACK_QUEUE_SIZE; }
|
||||
FlushKey min_key() const { return high_priority_flusher_.high_priority_mds_tables_[0]; }
|
||||
FlushKey max_key() const { return high_priority_flusher_.high_priority_mds_tables_[high_priority_flusher_.size_ == 0 ?
|
||||
0 :
|
||||
high_priority_flusher_.size_ - 1]; }
|
||||
TO_STRING_KV(K_(high_priority_flusher));
|
||||
MdsTableHighPriorityFlusher<STACK_QUEUE_SIZE> high_priority_flusher_;
|
||||
};
|
||||
|
||||
static constexpr int64_t FLUSH_FOR_ONE_SIZE = 1;
|
||||
static constexpr int64_t FLUSH_FOR_SOME_SIZE = 32;
|
||||
static constexpr int64_t FLUSH_FOR_ALL_SIZE = 128;
|
||||
using FlusherForOne = MdsTableOrderFlusher<FLUSH_FOR_ONE_SIZE, false>;
|
||||
using FlusherForSome = MdsTableOrderFlusher<FLUSH_FOR_SOME_SIZE, false>;
|
||||
using FlusherForAll = MdsTableOrderFlusher<FLUSH_FOR_ALL_SIZE, true>;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H_IPP
|
||||
#define SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H_IPP
|
||||
#include "mds_table_order_flusher.ipp"
|
||||
#endif
|
||||
|
||||
#endif
|
168
src/storage/multi_data_source/mds_table_order_flusher.ipp
Normal file
168
src/storage/multi_data_source/mds_table_order_flusher.ipp
Normal file
@ -0,0 +1,168 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_IPP
|
||||
#define SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_IPP
|
||||
|
||||
#include "lib/ob_errno.h"
|
||||
#ifndef SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H_IPP
|
||||
#define SHARE_STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_HIGH_PRIORITY_FLUSHER_H_IPP
|
||||
#include "mds_table_order_flusher.h"
|
||||
#endif
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
|
||||
// this is optimized for get_rec_scn() operation
|
||||
template <>
|
||||
inline bool MdsTableHighPriorityFlusher<FLUSH_FOR_ONE_SIZE>::try_record_mds_table(FlushKey new_key, FlushKey &eliminated_key) {
|
||||
if (OB_UNLIKELY(size_ == 0)) {
|
||||
size_ = 1;
|
||||
high_priority_mds_tables_[0] = new_key;
|
||||
} else {
|
||||
if (new_key < high_priority_mds_tables_[0]) {
|
||||
high_priority_mds_tables_[0] = new_key;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
inline bool MdsTableHighPriorityFlusher<STACK_QUEUE_SIZE>::try_record_mds_table(FlushKey new_key, FlushKey &eliminated_key) {
|
||||
bool need_insert = false;
|
||||
if (size_ < STACK_QUEUE_SIZE) {
|
||||
need_insert = true;
|
||||
} else if (new_key < high_priority_mds_tables_[STACK_QUEUE_SIZE - 1]) {
|
||||
need_insert = true;
|
||||
}
|
||||
if (need_insert) {
|
||||
FlushKey *begin = &high_priority_mds_tables_[0];
|
||||
FlushKey *end = &high_priority_mds_tables_[size_ >= STACK_QUEUE_SIZE ? STACK_QUEUE_SIZE : size_];
|
||||
FlushKey *iter = std::upper_bound(begin, end, new_key);
|
||||
if (end == &high_priority_mds_tables_[STACK_QUEUE_SIZE]) {
|
||||
eliminated_key = high_priority_mds_tables_[STACK_QUEUE_SIZE - 1];
|
||||
MDS_ASSERT(iter < end);
|
||||
memmove(iter + 1, iter, (end - iter - 1) * sizeof(FlushKey));
|
||||
} else {// maybe end == iter
|
||||
memmove(iter + 1, iter, (end - iter) * sizeof(FlushKey));
|
||||
++size_;
|
||||
}
|
||||
*iter = new_key;
|
||||
}
|
||||
return need_insert;
|
||||
}
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
inline void MdsTableHighPriorityFlusher<STACK_QUEUE_SIZE>::flush_by_order(MdsTableMap &map, share::SCN limit, share::SCN max_decided_scn) {
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t flush_count = 0;
|
||||
FlushOp op(limit, flush_count, max_decided_scn);
|
||||
for (int64_t i = 0; i < size_ && !op.dag_full(); ++i) {// ignore ERROR
|
||||
if (OB_FAIL(map.operate(high_priority_mds_tables_[i].tablet_id_, op))) {
|
||||
MDS_LOG(WARN, "fail to operate mds table", K(i), K(high_priority_mds_tables_[i]), KR(ret), K(limit), K(max_decided_scn));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
int64_t MdsTableHighPriorityFlusher<STACK_QUEUE_SIZE>::to_string(char *buf, const int64_t len) const {
|
||||
int64_t pos = 0;
|
||||
constexpr int64_t MAX_PRINT_NUMBER = 32L;
|
||||
databuff_printf(buf, len, pos, "size:%ld, ordered_tablets:{", size_);
|
||||
int64_t print_numnber = std::min(size_, MAX_PRINT_NUMBER);
|
||||
for (int64_t i = 0; i < print_numnber; ++i) {
|
||||
databuff_printf(buf, len, pos, "%s", to_cstring(high_priority_mds_tables_[i]));
|
||||
if (i != print_numnber) {
|
||||
databuff_printf(buf, len, pos, ", ");
|
||||
} else {
|
||||
if (print_numnber < size_) {
|
||||
databuff_printf(buf, len, pos, "%ld more...}", size_ - print_numnber);
|
||||
} else {
|
||||
databuff_printf(buf, len, pos, "}");
|
||||
}
|
||||
}
|
||||
}
|
||||
return pos;
|
||||
}
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
void MdsTableOrderFlusher<STACK_QUEUE_SIZE, true>::reserve_memory(int64_t mds_table_total_size_likely) {// it'ok if failed
|
||||
int ret = OB_SUCCESS;
|
||||
constexpr int64_t max_tablet_number = 1_MB;//sizeof(100w FlushKey) = 16MB
|
||||
int64_t reserve_size = std::min(mds_table_total_size_likely * 2, max_tablet_number);
|
||||
if (OB_FAIL(extra_mds_tables_.reserve(reserve_size))) {
|
||||
MDS_LOG(WARN, "fail to reserve memory", KR(ret));
|
||||
array_err_ = ret;
|
||||
}
|
||||
}
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
void MdsTableOrderFlusher<STACK_QUEUE_SIZE, true>::record_mds_table(FlushKey key) {// it's ok if failed
|
||||
int ret = OB_SUCCESS;
|
||||
FlushKey eliminated_key;
|
||||
if (!high_priority_flusher_.try_record_mds_table(key, eliminated_key)) {
|
||||
eliminated_key = key;
|
||||
}
|
||||
if (eliminated_key.is_valid() && !array_err_) {
|
||||
if (OB_FAIL(extra_mds_tables_.push_back(eliminated_key))) {
|
||||
MDS_LOG(WARN, "fail to push_back", KR(ret), K(eliminated_key));
|
||||
array_err_ = ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
void MdsTableOrderFlusher<STACK_QUEUE_SIZE, false>::record_mds_table(FlushKey key) {// won't be failed
|
||||
FlushKey _;
|
||||
high_priority_flusher_.try_record_mds_table(key, _);
|
||||
}
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
void MdsTableOrderFlusher<STACK_QUEUE_SIZE, true>::flush_by_order(MdsTableMap &map,
|
||||
share::SCN limit,
|
||||
share::SCN max_decided_scn) {
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t flush_count = 0;
|
||||
FlushOp op(limit, flush_count, max_decided_scn);
|
||||
high_priority_flusher_.flush_by_order(map, limit, max_decided_scn);
|
||||
if (!array_err_) {// if array has error, just order flush STACK_QUEUE_SIZE mds tables
|
||||
try {
|
||||
std::sort(extra_mds_tables_.begin(), extra_mds_tables_.end());
|
||||
} catch (std::exception e) {
|
||||
MDS_LOG(WARN, "std::sort failed", K(e.what()));
|
||||
array_err_ = OB_ERR_UNEXPECTED;
|
||||
}
|
||||
if (!array_err_) {
|
||||
for (int64_t idx = 0; idx < extra_mds_tables_.count() && !op.dag_full(); ++idx) {
|
||||
if (OB_FAIL(map.operate(extra_mds_tables_[idx].tablet_id_, op))) {
|
||||
MDS_LOG(WARN, "fail to operate mds table", K(idx), K(extra_mds_tables_[idx]), KR(ret), K(limit), K(max_decided_scn));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <int64_t STACK_QUEUE_SIZE>
|
||||
void MdsTableOrderFlusher<STACK_QUEUE_SIZE, false>::flush_by_order(MdsTableMap &map,
|
||||
share::SCN limit,
|
||||
share::SCN max_decided_scn) {
|
||||
high_priority_flusher_.flush_by_order(map, limit, max_decided_scn);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
@ -42,6 +42,8 @@ namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
class MdsTableBase;
|
||||
using MdsTableMap = common::ObLinearHashMap<common::ObTabletID, MdsTableBase*>;
|
||||
|
||||
enum class NodePosition {
|
||||
MDS_TABLE,
|
||||
|
@ -5495,7 +5495,7 @@ int ObTablet::get_mds_table_rec_log_scn(SCN &rec_scn)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTablet::mds_table_flush(const share::SCN &recycle_scn)
|
||||
int ObTablet::mds_table_flush(const share::SCN &decided_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
mds::MdsTableHandle mds_table;
|
||||
@ -5507,7 +5507,7 @@ int ObTablet::mds_table_flush(const share::SCN &recycle_scn)
|
||||
LOG_WARN("inner tablet does not have mds table", K(ret));
|
||||
} else if (OB_FAIL(inner_get_mds_table(mds_table))) {
|
||||
LOG_WARN("failed to get mds table", K(ret));
|
||||
} else if (OB_FAIL(mds_table.flush(recycle_scn))) {
|
||||
} else if (OB_FAIL(mds_table.flush(decided_scn, decided_scn))) {
|
||||
LOG_WARN("failed to flush mds table", KR(ret), KPC(this));
|
||||
}
|
||||
return ret;
|
||||
|
@ -176,7 +176,7 @@ public:
|
||||
inline common::ObRowStoreType get_last_major_latest_row_store_type() const { return table_store_cache_.last_major_latest_row_store_type_; }
|
||||
inline bool is_row_store() const { return table_store_cache_.is_row_store_; }
|
||||
int get_mds_table_rec_log_scn(share::SCN &rec_scn);
|
||||
int mds_table_flush(const share::SCN &recycle_scn);
|
||||
int mds_table_flush(const share::SCN &decided_scn);
|
||||
|
||||
public:
|
||||
// first time create tablet
|
||||
|
@ -53,6 +53,7 @@ storage_unittest(test_mds_node multi_data_source/test_mds_node.cpp)
|
||||
# storage_unittest(test_mds_unit multi_data_source/test_mds_unit.cpp)
|
||||
storage_unittest(test_mds_table multi_data_source/test_mds_table.cpp)
|
||||
storage_unittest(test_mds_table_flush multi_data_source/test_mds_table_flush.cpp)
|
||||
storage_unittest(test_mds_table_flusher multi_data_source/test_mds_table_flusher.cpp)
|
||||
storage_unittest(test_mds_dump_kv multi_data_source/test_mds_dump_kv.cpp)
|
||||
#storage_unittest(test_multiple_merge)
|
||||
#storage_unittest(test_memtable_multi_version_row_iterator memtable/test_memtable_multi_version_row_iterator.cpp)
|
||||
|
@ -376,7 +376,7 @@ void TestMdsTable::test_flush() {
|
||||
ctx2.on_redo(mock_scn(200));
|
||||
|
||||
int idx = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(300)));// 1. 以300为版本号进行flush动作
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(300), mock_scn(500)));// 1. 以300为版本号进行flush动作
|
||||
ASSERT_EQ(mock_scn(199), mds_table_.p_mds_table_base_->flushing_scn_);// 2. 实际上以199为版本号进行flush动作
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump(
|
||||
[&idx](const MdsDumpKV &kv) -> int {// 2. 转储时扫描mds table
|
||||
@ -432,7 +432,7 @@ void TestMdsTable::test_multi_key_remove() {
|
||||
return OB_SUCCESS;
|
||||
}, is_committed);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(200)));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(200), mock_scn(500)));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(200)));
|
||||
ret = mds_table_.get_latest<ExampleUserKey, ExampleUserData1>(ExampleUserKey(2), [](const ExampleUserData1 &data){
|
||||
return OB_SUCCESS;
|
||||
@ -512,16 +512,16 @@ TEST_F(TestMdsTable, test_recycle) {
|
||||
int64_t valid_cnt = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.get_node_cnt(valid_cnt));
|
||||
ASSERT_EQ(1, valid_cnt);// 此时还有一个19001版本的已提交数据,因为rec_scn没有推上去
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(20000)));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.flush(mock_scn(20000), mock_scn(40000)));
|
||||
mds_table_.for_each_unit_from_small_key_to_big_from_old_node_to_new_to_dump([](const MdsDumpKV &){
|
||||
return OB_SUCCESS;
|
||||
}, 0, true);
|
||||
mds_table_.on_flush(mock_scn(20000), OB_SUCCESS);
|
||||
mds_table_.on_flush(mock_scn(40000), OB_SUCCESS);
|
||||
share::SCN rec_scn;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.get_rec_scn(rec_scn));
|
||||
MDS_LOG(INFO, "print rec scn", K(rec_scn));
|
||||
ASSERT_EQ(share::SCN::max_scn(), rec_scn);
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(20000)));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.try_recycle(mock_scn(40000)));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table_.get_node_cnt(valid_cnt));
|
||||
ASSERT_EQ(0, valid_cnt);// 此时还有一个19001版本的已提交数据,因为rec_scn没有推上去
|
||||
}
|
||||
@ -541,17 +541,17 @@ TEST_F(TestMdsTable, test_recalculate_flush_scn_op) {
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.set(ExampleUserData1(3), ctx3));
|
||||
ctx3.on_redo(mock_scn(9));
|
||||
ctx3.on_commit(mock_scn(11), mock_scn(11));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(4)));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(4), mock_scn(4)));
|
||||
ASSERT_EQ(mock_scn(4), mds_table.p_mds_table_base_->flushing_scn_);
|
||||
mds_table.on_flush(mock_scn(4), OB_SUCCESS);
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(5)));// no need do flush, directly advance rec_scn
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(5), mock_scn(5)));// no need do flush, directly advance rec_scn
|
||||
ASSERT_EQ(false, mds_table.p_mds_table_base_->flushing_scn_.is_valid());
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(6)));// no need do flush, directly advance rec_scn
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(6), mock_scn(6)));// no need do flush, directly advance rec_scn
|
||||
ASSERT_EQ(false, mds_table.p_mds_table_base_->flushing_scn_.is_valid());
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(7)));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(7), mock_scn(7)));
|
||||
ASSERT_EQ(mock_scn(7), mds_table.p_mds_table_base_->flushing_scn_);
|
||||
mds_table.on_flush(mock_scn(7), OB_SUCCESS);
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(8)));
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.flush(mock_scn(8), mock_scn(8)));
|
||||
ASSERT_EQ(false, mds_table.p_mds_table_base_->flushing_scn_.is_valid());
|
||||
ASSERT_EQ(mock_scn(9), mds_table.p_mds_table_base_->rec_scn_);
|
||||
}
|
||||
|
@ -9,12 +9,13 @@
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "lib/utility/utility.h"
|
||||
#include "storage/multi_data_source/compile_utility/mds_dummy_key.h"
|
||||
#define UNITTEST_DEBUG
|
||||
static bool MDS_FLUSHER_ALLOW_ALLOC = true;
|
||||
#include <gtest/gtest.h>
|
||||
#define private public
|
||||
#define protected public
|
||||
#include "lib/utility/utility.h"
|
||||
#include "storage/multi_data_source/compile_utility/mds_dummy_key.h"
|
||||
#include "share/ob_ls_id.h"
|
||||
#include "storage/multi_data_source/mds_writer.h"
|
||||
#include <thread>
|
||||
@ -37,12 +38,15 @@
|
||||
#include <numeric>
|
||||
#include "storage/multi_data_source/runtime_utility/mds_lock.h"
|
||||
#include "storage/tablet/ob_tablet_meta.h"
|
||||
#include "storage/multi_data_source/mds_table_mgr.h"
|
||||
#include "storage/ls/ob_ls.h"
|
||||
#include "storage/multi_data_source/mds_table_handle.h"
|
||||
#include "storage/multi_data_source/mds_table_order_flusher.h"
|
||||
namespace oceanbase {
|
||||
namespace storage
|
||||
{
|
||||
|
||||
share::SCN MOCK_MAX_CONSEQUENT_CALLBACKED_SCN;
|
||||
share::SCN MOCK_FLUSHING_SCN;
|
||||
|
||||
namespace mds
|
||||
{
|
||||
@ -67,7 +71,6 @@ int MdsTableBase::get_ls_max_consequent_callbacked_scn_(share::SCN &max_conseque
|
||||
|
||||
int MdsTableBase::merge(const int64_t construct_sequence, const share::SCN &flushing_scn)
|
||||
{
|
||||
MOCK_FLUSHING_SCN = flushing_scn;
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
@ -144,7 +147,6 @@ int construct_tested_mds_table(MdsTableHandle &handle) {
|
||||
}
|
||||
|
||||
TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(125);// 只转储一个node
|
||||
MdsTableHandle handle;
|
||||
ASSERT_EQ(OB_SUCCESS, construct_tested_mds_table(handle));
|
||||
share::SCN rec_scn;
|
||||
@ -152,7 +154,7 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
ASSERT_EQ(mock_scn(50), rec_scn);// 没转储的时候是最小的node的redo scn值
|
||||
|
||||
// 第一次转储
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000)));// 因为max_decided_scn较小,所以会用125做flush
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(125)));// 因为max_decided_scn较小,所以会用125做flush
|
||||
bool is_flusing = false;
|
||||
ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing));// 在flush流程中
|
||||
ASSERT_EQ(true, is_flusing);
|
||||
@ -163,14 +165,13 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
return OB_SUCCESS;
|
||||
}, 0, true));
|
||||
ASSERT_EQ(1, scan_cnt);
|
||||
handle.on_flush(MOCK_FLUSHING_SCN, OB_SUCCESS);
|
||||
handle.on_flush(mock_scn(125), OB_SUCCESS);
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
OCCAM_LOG(INFO, "print rec scn", K(rec_scn));
|
||||
ASSERT_EQ(mock_scn(200), rec_scn);
|
||||
|
||||
// 第二次转储
|
||||
MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(140);// 对这个MdsTable没有影响
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(140)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing));
|
||||
ASSERT_EQ(false, is_flusing);// 没转储
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
@ -178,8 +179,7 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
ASSERT_EQ(mock_scn(200), rec_scn);// 没变化
|
||||
|
||||
// 第三次转储
|
||||
MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(275);// 多转一个node
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(275)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing));// 在flush流程中
|
||||
ASSERT_EQ(true, is_flusing);
|
||||
ASSERT_EQ(mock_scn(249), handle.p_mds_table_base_->flushing_scn_);
|
||||
@ -189,14 +189,13 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
return OB_SUCCESS;
|
||||
}, 0, true));
|
||||
ASSERT_EQ(1, scan_cnt);
|
||||
handle.on_flush(MOCK_FLUSHING_SCN, OB_SUCCESS);
|
||||
handle.on_flush(mock_scn(249), OB_SUCCESS);
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
OCCAM_LOG(INFO, "print rec scn", K(rec_scn));
|
||||
ASSERT_EQ(mock_scn(250), rec_scn);
|
||||
|
||||
// 第四次转储
|
||||
MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(550);// 都转下去
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(550)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing));// 在flush流程中
|
||||
ASSERT_EQ(true, is_flusing);
|
||||
ASSERT_EQ(mock_scn(499), handle.p_mds_table_base_->flushing_scn_);
|
||||
@ -206,14 +205,13 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
return OB_SUCCESS;
|
||||
}, 0, true));
|
||||
ASSERT_EQ(2, scan_cnt);
|
||||
handle.on_flush(MOCK_FLUSHING_SCN, OB_SUCCESS);
|
||||
handle.on_flush(mock_scn(499), OB_SUCCESS);
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
OCCAM_LOG(INFO, "print rec scn", K(rec_scn));
|
||||
ASSERT_EQ(mock_scn(500), rec_scn);
|
||||
|
||||
// 第五次转储
|
||||
MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(600);// 对这个MdsTable没有影响
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(600)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing));
|
||||
ASSERT_EQ(false, is_flusing);// 没转储
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
@ -221,8 +219,7 @@ TEST_F(TestMdsTableFlush, normal_flush) {
|
||||
ASSERT_EQ(mock_scn(500), rec_scn);// 没变化
|
||||
|
||||
// 第六次转储
|
||||
MOCK_MAX_CONSEQUENT_CALLBACKED_SCN = mock_scn(590);// 直接被过滤掉了
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.flush(mock_scn(1000), mock_scn(590)));
|
||||
ASSERT_EQ(OB_SUCCESS, handle.is_flushing(is_flusing));
|
||||
ASSERT_EQ(false, is_flusing);// 没转储
|
||||
ASSERT_EQ(OB_SUCCESS, handle.get_rec_scn(rec_scn));
|
||||
|
302
unittest/storage/multi_data_source/test_mds_table_flusher.cpp
Normal file
302
unittest/storage/multi_data_source/test_mds_table_flusher.cpp
Normal file
@ -0,0 +1,302 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#include "lib/future/ob_future.h"
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include <gtest/gtest.h>
|
||||
#define UNITTEST_DEBUG
|
||||
#define private public
|
||||
#define protected public
|
||||
#include "storage/multi_data_source/runtime_utility/common_define.h"
|
||||
#include "storage/multi_data_source/mds_table_base.h"
|
||||
#include "storage/multi_data_source/mds_table_mgr.h"
|
||||
#include "storage/ls/ob_ls.h"
|
||||
#include "storage/multi_data_source/mds_table_handle.h"
|
||||
#include "storage/multi_data_source/mds_table_order_flusher.h"
|
||||
using namespace std;
|
||||
|
||||
oceanbase::common::ObPromise<void> PROMISE1, PROMISE2;
|
||||
vector<oceanbase::storage::mds::FlushKey> V_ActualDoFlushKey;
|
||||
|
||||
static std::atomic<bool> NEED_ALLOC_FAIL(false);
|
||||
static std::atomic<bool> NEED_ALLOC_FAIL_AFTER_RESERVE(false);
|
||||
|
||||
namespace oceanbase {
|
||||
namespace logservice
|
||||
{
|
||||
int ObLogHandler::get_max_decided_scn(share::SCN &scn)
|
||||
{
|
||||
scn = unittest::mock_scn(500);
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
template <>
|
||||
int MdsTableImpl<UnitTestMdsTable>::flush(share::SCN need_advanced_rec_scn_lower_limit, share::SCN max_decided_scn) {
|
||||
V_ActualDoFlushKey.push_back({tablet_id_, rec_scn_});
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
void *MdsAllocator::alloc(const int64_t size)
|
||||
{
|
||||
void *ptr = ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
ob_free(ptr);
|
||||
}
|
||||
void *MdsFlusherModulePageAllocator::alloc(const int64_t size, const ObMemAttr &attr) {
|
||||
void *ret = nullptr;
|
||||
if (!NEED_ALLOC_FAIL) {
|
||||
ret = (NULL == allocator_) ? share::mtl_malloc(size, attr) : allocator_->alloc(size, attr);
|
||||
} else {
|
||||
MDS_LOG(DEBUG, "mock no memory", K(lbt()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// template <>
|
||||
// void MdsTableOrderFlusher<FLUSH_FOR_ALL_SIZE, true>::reserve_memory(int64_t mds_table_total_size_likely) {// it'ok if failed
|
||||
// int ret = OB_SUCCESS;
|
||||
// abort();
|
||||
// constexpr int64_t max_tablet_number = 100 * 10000;//sizeof(100w FlushKey) = 16MB
|
||||
// int64_t reserve_size = std::min(mds_table_total_size_likely * 2, max_tablet_number);
|
||||
// if (OB_FAIL(extra_mds_tables_.reserve(reserve_size))) {
|
||||
// MDS_LOG(WARN, "fail to reserve memory", KR(ret));
|
||||
// array_err_ = ret;
|
||||
// }
|
||||
// if (NEED_ALLOC_FAIL_AFTER_RESERVE) {
|
||||
// NEED_ALLOC_FAIL = true;
|
||||
// OCCAM_LOG(INFO, "set PEOMISE1 and wait PROMISE2");
|
||||
// PROMISE1.set();
|
||||
// ObFuture<void> future = PROMISE2.get_future();
|
||||
// future.wait();
|
||||
// }
|
||||
// }
|
||||
}
|
||||
}
|
||||
namespace unittest {
|
||||
|
||||
using namespace common;
|
||||
using namespace std;
|
||||
using namespace storage;
|
||||
using namespace mds;
|
||||
using namespace transaction;
|
||||
|
||||
static constexpr int64_t TEST_ALL_SIZE = FLUSH_FOR_ALL_SIZE * 10;
|
||||
|
||||
class TestMdsTableFlush: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsTableFlush() {}
|
||||
virtual ~TestMdsTableFlush() {}
|
||||
virtual void SetUp() { V_ActualDoFlushKey.clear(); NEED_ALLOC_FAIL = false; NEED_ALLOC_FAIL_AFTER_RESERVE = false; }
|
||||
virtual void TearDown() { }
|
||||
private:
|
||||
// disallow copy
|
||||
DISALLOW_COPY_AND_ASSIGN(TestMdsTableFlush);
|
||||
};
|
||||
|
||||
TEST_F(TestMdsTableFlush, flusher_for_some_order) {
|
||||
FlusherForSome some;
|
||||
for (int i = FLUSH_FOR_SOME_SIZE; i > 0; --i) {
|
||||
some.record_mds_table({ObTabletID(i), mock_scn(i)});
|
||||
}
|
||||
for (int i = 0; i < FLUSH_FOR_SOME_SIZE; ++i) {
|
||||
ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[i].rec_scn_, mock_scn(i + 1));
|
||||
}
|
||||
some.record_mds_table({ObTabletID(999), mock_scn(999)});// 没影响,被扔了
|
||||
for (int i = 0; i < FLUSH_FOR_SOME_SIZE; ++i) {
|
||||
ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[i].rec_scn_, mock_scn(i + 1));
|
||||
}
|
||||
some.record_mds_table({ObTabletID(0), mock_scn(0)});// 变成第一个,其余的往后挤
|
||||
for (int i = 0; i < FLUSH_FOR_SOME_SIZE; ++i) {
|
||||
ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[i].rec_scn_, mock_scn(i));
|
||||
}
|
||||
some.record_mds_table({ObTabletID(999), mock_scn(FLUSH_FOR_SOME_SIZE - 2)});// 最后一个有两份
|
||||
for (int i = 0; i < FLUSH_FOR_SOME_SIZE - 1; ++i) {
|
||||
ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[i].rec_scn_, mock_scn(i));
|
||||
}
|
||||
ASSERT_EQ(some.high_priority_flusher_.high_priority_mds_tables_[FLUSH_FOR_SOME_SIZE - 1].rec_scn_, mock_scn(FLUSH_FOR_SOME_SIZE - 2));
|
||||
}
|
||||
|
||||
TEST_F(TestMdsTableFlush, flusher_for_all_order_with_enough_memory) {
|
||||
std::vector<FlushKey> v_key;
|
||||
for (int i = 0; i < TEST_ALL_SIZE; ++i) {
|
||||
v_key.push_back({ObTabletID(100 + i), mock_scn(100 + i)});
|
||||
}
|
||||
std::random_shuffle(v_key.begin(), v_key.end());
|
||||
ObLS ls;
|
||||
ObMdsTableMgr mgr;
|
||||
vector<MdsTableHandle> v;
|
||||
ASSERT_EQ(mgr.init(&ls), OB_SUCCESS);
|
||||
// 加入mgr的顺序是乱序的
|
||||
for (int i = 0; i < v_key.size(); ++i) {
|
||||
MdsTableHandle mds_table;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(),
|
||||
v_key[i].tablet_id_,
|
||||
share::ObLSID(1),
|
||||
(ObTabletPointer*)0x111,
|
||||
&mgr));
|
||||
MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_;
|
||||
p_mds_table->rec_scn_ = v_key[i].rec_scn_;
|
||||
v.push_back(mds_table);
|
||||
}
|
||||
ASSERT_EQ(OB_SUCCESS, mgr.flush(share::SCN::max_scn()));
|
||||
ASSERT_EQ(TEST_ALL_SIZE, V_ActualDoFlushKey.size());
|
||||
for (int i = 0; i < V_ActualDoFlushKey.size(); ++i) {
|
||||
if (V_ActualDoFlushKey[i].rec_scn_ != mock_scn(100 + i)) {
|
||||
MDS_LOG(INFO, "DEBUG", K(V_ActualDoFlushKey[i].rec_scn_), K(i));
|
||||
}
|
||||
ASSERT_EQ(V_ActualDoFlushKey[i].rec_scn_, mock_scn(100 + i));
|
||||
ASSERT_EQ(V_ActualDoFlushKey[i].tablet_id_, ObTabletID(100 + i));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestMdsTableFlush, flusher_for_all_order_with_limitted_memory_reserve_fail) {
|
||||
NEED_ALLOC_FAIL = true;
|
||||
std::vector<FlushKey> v_key;
|
||||
for (int i = 0; i < TEST_ALL_SIZE; ++i) {
|
||||
v_key.push_back({ObTabletID(100 + i), mock_scn(100 + i)});
|
||||
}
|
||||
std::random_shuffle(v_key.begin(), v_key.end());
|
||||
ObLS ls;
|
||||
ObMdsTableMgr mgr;
|
||||
vector<MdsTableHandle> v;
|
||||
ASSERT_EQ(mgr.init(&ls), OB_SUCCESS);
|
||||
// 加入mgr的顺序是乱序的
|
||||
for (int i = 0; i < v_key.size(); ++i) {
|
||||
MdsTableHandle mds_table;
|
||||
ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(),
|
||||
v_key[i].tablet_id_,
|
||||
share::ObLSID(1),
|
||||
(ObTabletPointer*)0x111,
|
||||
&mgr));
|
||||
MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_;
|
||||
p_mds_table->rec_scn_ = v_key[i].rec_scn_;
|
||||
v.push_back(mds_table);
|
||||
}
|
||||
ASSERT_EQ(OB_SUCCESS, mgr.flush(share::SCN::max_scn()));
|
||||
ASSERT_EQ(TEST_ALL_SIZE + FLUSH_FOR_ALL_SIZE, V_ActualDoFlushKey.size());// 只保证最前面的TEST_ALL_SIZE的tablet是有序的,并且rec scn最小
|
||||
for (int i = 0; i < FLUSH_FOR_ALL_SIZE; ++i) {
|
||||
if (V_ActualDoFlushKey[i].rec_scn_ != mock_scn(100 + i)) {
|
||||
MDS_LOG(INFO, "DEBUG", K(V_ActualDoFlushKey[i].rec_scn_), K(i));
|
||||
}
|
||||
ASSERT_EQ(V_ActualDoFlushKey[i].rec_scn_, mock_scn(100 + i));
|
||||
ASSERT_EQ(V_ActualDoFlushKey[i].tablet_id_, ObTabletID(100 + i));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestMdsTableFlush, flusher_for_one) {
|
||||
FlusherForOne one;
|
||||
for (int i = FLUSH_FOR_SOME_SIZE; i > 0; --i) {
|
||||
one.record_mds_table({ObTabletID(i), mock_scn(i)});
|
||||
}
|
||||
ASSERT_EQ(one.min_key().rec_scn_, mock_scn(1));
|
||||
}
|
||||
|
||||
// // release版本的逻辑重写不生效
|
||||
// TEST_F(TestMdsTableFlush, flusher_for_all_order_with_limitted_memory_reserve_success_but_push_back_fail) {
|
||||
// NEED_ALLOC_FAIL_AFTER_RESERVE = true;// 只支持reserve的时候分配一次内存
|
||||
// const int64_t BIG_TEST_SIZE = 5 * TEST_ALL_SIZE;
|
||||
|
||||
// std::vector<FlushKey> v_key;
|
||||
// for (int i = 0; i < BIG_TEST_SIZE; ++i) {
|
||||
// v_key.push_back({ObTabletID(100 + i), mock_scn(100 + i)});
|
||||
// }
|
||||
// std::random_shuffle(v_key.begin(), v_key.end());
|
||||
// ObLS ls;
|
||||
// ObMdsTableMgr mgr;
|
||||
// vector<MdsTableHandle> v;
|
||||
// ASSERT_EQ(mgr.init(&ls), OB_SUCCESS);
|
||||
// // 首先加入第一部分
|
||||
// for (int i = 0; i < TEST_ALL_SIZE; ++i) {
|
||||
// MdsTableHandle mds_table;
|
||||
// ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(),
|
||||
// v_key[i].tablet_id_,
|
||||
// share::ObLSID(1),
|
||||
// (ObTabletPointer*)0x111,
|
||||
// &mgr));
|
||||
// MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_;
|
||||
// p_mds_table->rec_scn_ = v_key[i].rec_scn_;
|
||||
// v.push_back(mds_table);
|
||||
// }
|
||||
|
||||
// ASSERT_EQ(OB_SUCCESS, PROMISE1.init());
|
||||
// ASSERT_EQ(OB_SUCCESS, PROMISE2.init());
|
||||
|
||||
// // 开启一个新的线程做flush,模拟并发增加mds table的情况(以至于reverse的内存不够用了,过程中发生额外的内存分配,但失败的情况)
|
||||
// std::thread t1([&](){
|
||||
// ASSERT_EQ(OB_SUCCESS, mgr.flush(share::SCN::max_scn()));
|
||||
// });
|
||||
|
||||
// {
|
||||
// OCCAM_LOG(INFO, "wait PEOMISE1");
|
||||
// ObFuture<void> future = PROMISE1.get_future();
|
||||
// future.wait();
|
||||
// }
|
||||
|
||||
// {
|
||||
// for (int i = TEST_ALL_SIZE; i < v_key.size(); ++i) {// 继续再注册99 * TEST_ALL_SIZE的新mds table
|
||||
// MdsTableHandle mds_table;
|
||||
// ASSERT_EQ(OB_SUCCESS, mds_table.init<UnitTestMdsTable>(MdsAllocator::get_instance(),
|
||||
// v_key[i].tablet_id_,
|
||||
// share::ObLSID(1),
|
||||
// (ObTabletPointer*)0x111,
|
||||
// &mgr));
|
||||
// MdsTableBase *p_mds_table = mds_table.p_mds_table_base_.data_;
|
||||
// p_mds_table->rec_scn_ = v_key[i].rec_scn_;
|
||||
// v.push_back(mds_table);
|
||||
// }
|
||||
// PROMISE2.set();
|
||||
// }
|
||||
|
||||
// t1.join();
|
||||
|
||||
// ASSERT_EQ(BIG_TEST_SIZE + FLUSH_FOR_ALL_SIZE, V_ActualDoFlushKey.size());// 所有的mds table都经历了转储
|
||||
// for (int i = 0; i < FLUSH_FOR_ALL_SIZE; ++i) {// 但只保证栈上记录的tablet是有序的,并且rec scn最小
|
||||
// ASSERT_EQ(V_ActualDoFlushKey[i].rec_scn_, mock_scn(100 + i));
|
||||
// ASSERT_EQ(V_ActualDoFlushKey[i].tablet_id_, ObTabletID(100 + i));
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
system("rm -rf test_mds_table_flusher.log");
|
||||
oceanbase::common::ObLogger &logger = oceanbase::common::ObLogger::get_logger();
|
||||
logger.set_file_name("test_mds_table_flusher.log", false);
|
||||
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
int ret = RUN_ALL_TESTS();
|
||||
int64_t alloc_times = oceanbase::storage::mds::MdsAllocator::get_alloc_times();
|
||||
int64_t free_times = oceanbase::storage::mds::MdsAllocator::get_free_times();
|
||||
if (alloc_times != free_times) {
|
||||
MDS_LOG(ERROR, "memory may leak", K(free_times), K(alloc_times));
|
||||
ret = -1;
|
||||
} else {
|
||||
MDS_LOG(INFO, "all memory released", K(free_times), K(alloc_times));
|
||||
}
|
||||
return ret;
|
||||
}
|
Reference in New Issue
Block a user