[MDS] add filter functor param in iterator and fix iterate medium_info logic
This commit is contained in:
parent
c3f8961d6d
commit
396d455b32
@ -10,6 +10,7 @@
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "lib/ob_errno.h"
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#define private public
|
||||
@ -725,6 +726,61 @@ TEST_F(TestMediumInfoReader, mds_table_dump_data_full_inclusion)
|
||||
ASSERT_EQ(OB_ITER_END, ret);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestMediumInfoReader, mds_table_cannot_read_uncommitted_node) {
|
||||
int ret = OB_SUCCESS;
|
||||
// create tablet
|
||||
const common::ObTabletID tablet_id(ObTimeUtility::fast_current_time() % 10000000000000);
|
||||
ObTabletHandle tablet_handle;
|
||||
ObTabletHandle new_tablet_handle;
|
||||
ret = create_tablet(tablet_id, tablet_handle);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
ObTablet *tablet = tablet_handle.get_obj();
|
||||
ASSERT_NE(nullptr, tablet);
|
||||
|
||||
// insert data into mds table
|
||||
mds::MdsCtx ctx(mds::MdsWriter(transaction::ObTransID(1)));
|
||||
compaction::ObMediumCompactionInfoKey key(1);
|
||||
compaction::ObMediumCompactionInfo info;
|
||||
ret = MediumInfoHelper::build_medium_compaction_info(allocator_, info, 1);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ret = mds_table_.set(key, info, ctx);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// iterate
|
||||
{
|
||||
common::ObArenaAllocator allocator;
|
||||
ObTabletMediumInfoReader reader(*tablet);
|
||||
ret = reader.init(allocator);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
key.reset();
|
||||
info.reset();
|
||||
ret = reader.get_next_medium_info(allocator, key, info);
|
||||
ASSERT_EQ(OB_ITER_END, ret);
|
||||
}
|
||||
|
||||
// commit
|
||||
{
|
||||
ctx.on_redo(mock_scn(1));
|
||||
ctx.before_prepare();
|
||||
ctx.on_prepare(mock_scn(1));
|
||||
ctx.on_commit(mock_scn(1), mock_scn(1));
|
||||
}
|
||||
|
||||
// iterate
|
||||
{
|
||||
common::ObArenaAllocator allocator;
|
||||
ObTabletMediumInfoReader reader(*tablet);
|
||||
ret = reader.init(allocator);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ret = reader.get_next_medium_info(allocator, key, info);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(1, key.medium_snapshot_);
|
||||
ASSERT_EQ(1, info.medium_snapshot_);
|
||||
}
|
||||
}
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
|
||||
|
@ -128,66 +128,6 @@ private:
|
||||
ObLightSharedPtr<MdsTableBase> p_mds_table_base_;
|
||||
};
|
||||
|
||||
inline void construct_lock_guard(mds::MdsRLockGuard &guard, mds::MdsLock &lock) {
|
||||
guard.~MdsRLockGuard();
|
||||
new (&guard) mds::MdsRLockGuard(lock);
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
struct ObMdsKvRowScanIterator {// will lock unit when init, but won't lock row when get_next_kv_row()
|
||||
using KvRowIter = typename mds::MdsUnit<UnitKey, UnitValue>::iterator;
|
||||
using KvRow = mds::KvPair<UnitKey, mds::Row<UnitKey, UnitValue>>;
|
||||
ObMdsKvRowScanIterator();
|
||||
int init(mds::MdsTableHandle &mds_table_handle);
|
||||
int get_next_kv_row(KvRow *&p_kv_row);
|
||||
TO_STRING_KV(KP(this), K_(is_inited), K_(is_first_scan),\
|
||||
K(typeid(UnitKey).name()), K(typeid(UnitValue).name()))
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool is_first_scan_;
|
||||
mds::MdsUnit<UnitKey, UnitValue> *p_mds_unit_;
|
||||
KvRowIter kv_row_iter_;
|
||||
mds::MdsRLockGuard unit_guard_;
|
||||
};
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
struct ObMdsNodeScanIterator {// will lock row when init, but won't lock node when get_next()
|
||||
using KvRow = mds::KvPair<UnitKey, mds::Row<UnitKey, UnitValue>>;
|
||||
using KvRowIter = typename mds::MdsUnit<UnitKey, UnitValue>::iterator;
|
||||
using NodeIter = typename KvRowIter::row_type::iterator;
|
||||
ObMdsNodeScanIterator();
|
||||
int init(KvRow *&p_kv_row);
|
||||
int get_next_kv_node(UnitKey &key, mds::UserMdsNode<UnitKey, UnitValue> *&p_node);
|
||||
bool is_valid() const;
|
||||
void reset();
|
||||
TO_STRING_KV(KP(this), K_(is_inited), K_(is_first_scan),\
|
||||
K(typeid(UnitKey).name()), K(typeid(UnitValue).name()))
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool is_first_scan_;
|
||||
KvRow *p_mds_kv_row_;
|
||||
NodeIter node_iter_;
|
||||
mds::MdsRLockGuard row_guard_;
|
||||
};
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
struct ObMdsUnitRowNodeScanIterator {// will add mds table ref when init to make sure inner iter safe
|
||||
using KvRow = mds::KvPair<UnitKey, mds::Row<UnitKey, UnitValue>>;
|
||||
using KvRowIter = typename mds::MdsUnit<UnitKey, UnitValue>::iterator;
|
||||
using NodeIter = typename KvRowIter::row_type::iterator;
|
||||
ObMdsUnitRowNodeScanIterator();
|
||||
int init(mds::MdsTableHandle &mds_table_handle);
|
||||
int get_next(UnitKey &key, mds::UserMdsNode<UnitKey, UnitValue> *&p_node);
|
||||
TO_STRING_KV(KP(this), K_(is_inited), K_(is_first_scan), K_(mds_table_handle),\
|
||||
K(typeid(UnitKey).name()), K(typeid(UnitValue).name()))
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool is_first_scan_;
|
||||
mds::MdsTableHandle mds_table_handle_;
|
||||
ObMdsKvRowScanIterator<UnitKey, UnitValue> row_scan_iter_;
|
||||
ObMdsNodeScanIterator<UnitKey, UnitValue> node_scan_iter_;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -865,171 +865,6 @@ inline int MdsTableHandle::forcely_reset_mds_table(const char (&reason)[N])
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
ObMdsKvRowScanIterator<UnitKey, UnitValue>::ObMdsKvRowScanIterator()
|
||||
: is_inited_(false),
|
||||
is_first_scan_(false),
|
||||
p_mds_unit_(nullptr) {}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsKvRowScanIterator<UnitKey, UnitValue>::init(mds::MdsTableHandle &mds_table_handle) {
|
||||
#define PRINT_WRAPPER KR(ret), K(mds_table_handle), K(typeid(UnitKey).name()),\
|
||||
K(typeid(UnitValue).name())
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
MDS_LOG_NONE(WARN, "ObMdsKvRowScanIterator init twice");
|
||||
} else if (!mds_table_handle.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG_NONE(WARN, "try iterate invalid mds table");
|
||||
} else if (OB_FAIL(mds_table_handle.get_mds_unit(p_mds_unit_))) {
|
||||
MDS_LOG_NONE(WARN, "fail to find unit in this mds table");
|
||||
} else {
|
||||
construct_lock_guard(unit_guard_, p_mds_unit_->lock_);// lock unit to make sure get kv_row safe
|
||||
is_inited_ = true;
|
||||
is_first_scan_ = true;
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsKvRowScanIterator<UnitKey, UnitValue>::get_next_kv_row(KvRow *&p_kv_row) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this)
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
MDS_LOG_NONE(WARN, "ObMdsKvRowScanIterator not init");
|
||||
} else if (is_first_scan_) {
|
||||
is_first_scan_ = false;
|
||||
kv_row_iter_ = p_mds_unit_->begin();
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (kv_row_iter_ == p_mds_unit_->end()) {
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
p_kv_row = &(*(kv_row_iter_++));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
ObMdsNodeScanIterator<UnitKey, UnitValue>::ObMdsNodeScanIterator()
|
||||
: is_inited_(false),
|
||||
is_first_scan_(true),
|
||||
p_mds_kv_row_(nullptr) {}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsNodeScanIterator<UnitKey, UnitValue>::init(KvRow *&p_kv_row) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), KP(p_kv_row)
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
MDS_LOG_NONE(WARN, "ObMdsNodeScanIterator init twice");
|
||||
} else if (OB_ISNULL(p_kv_row)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG_NONE(WARN, "p_kv_row is NULL");
|
||||
} else {
|
||||
p_mds_kv_row_ = p_kv_row;
|
||||
construct_lock_guard(row_guard_, p_mds_kv_row_->v_.lock_);// lock unit to make sure get kv_row safe
|
||||
is_inited_ = true;
|
||||
is_first_scan_ = true;
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsNodeScanIterator<UnitKey, UnitValue>::get_next_kv_node(UnitKey &key, mds::UserMdsNode<UnitKey, UnitValue> *&p_node) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this)
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
MDS_LOG_NONE(WARN, "ObMdsNodeScanIterator not init");
|
||||
} else if (is_first_scan_) {
|
||||
is_first_scan_ = false;
|
||||
node_iter_ = p_mds_kv_row_->v_.begin();
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (node_iter_ == p_mds_kv_row_->v_.end()) {
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
key = p_mds_kv_row_->k_;
|
||||
p_node = &(*node_iter_++);
|
||||
MDS_LOG_NONE(TRACE, "scan node", K(*p_node));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
bool ObMdsNodeScanIterator<UnitKey, UnitValue>::is_valid() const { return is_inited_; }
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
void ObMdsNodeScanIterator<UnitKey, UnitValue>::reset() {
|
||||
this->~ObMdsNodeScanIterator();
|
||||
new (this) ObMdsNodeScanIterator();
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
ObMdsUnitRowNodeScanIterator<UnitKey, UnitValue>::ObMdsUnitRowNodeScanIterator()
|
||||
: is_inited_(false),
|
||||
is_first_scan_(true) {}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsUnitRowNodeScanIterator<UnitKey, UnitValue>::init(mds::MdsTableHandle &mds_table_handle) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(mds_table_handle)
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
MDS_LOG_NONE(WARN, "ObMdsUnitRowNodeScanIterator init twice");
|
||||
} else if (!mds_table_handle.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG_NONE(WARN, "mds_table_handle invalid");
|
||||
} else {
|
||||
mds_table_handle_ = mds_table_handle;
|
||||
is_inited_ = true;
|
||||
is_first_scan_ = true;
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsUnitRowNodeScanIterator<UnitKey, UnitValue>::get_next(UnitKey &key, mds::UserMdsNode<UnitKey, UnitValue> *&p_node) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this)
|
||||
int ret = OB_SUCCESS;
|
||||
bool node_meet_end = false;
|
||||
bool row_mmet_end = false;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
MDS_LOG_NONE(WARN, "ObMdsUnitRowNodeScanIterator not init");
|
||||
} else if (is_first_scan_) {
|
||||
is_first_scan_ = false;
|
||||
if (OB_FAIL(row_scan_iter_.init(mds_table_handle_))) {
|
||||
MDS_LOG_NONE(WARN, "fail to init row_scan_iter_");
|
||||
}
|
||||
}
|
||||
while (OB_SUCC(ret) &&
|
||||
(!node_scan_iter_.is_valid() || // first time to scan
|
||||
OB_ITER_END == (ret = node_scan_iter_.get_next_kv_node(key, p_node)))) {// every time scan row end
|
||||
node_scan_iter_.reset();
|
||||
KvRow *p_kv_row = nullptr;
|
||||
if (OB_FAIL(row_scan_iter_.get_next_kv_row(p_kv_row))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
MDS_LOG_NONE(WARN, "fail to get kv row");
|
||||
}
|
||||
} else if (OB_FAIL(node_scan_iter_.init(p_kv_row))) {
|
||||
MDS_LOG_NONE(WARN, "fail to init node_scan_iter_");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
110
src/storage/multi_data_source/mds_table_iterator.h
Normal file
110
src/storage/multi_data_source/mds_table_iterator.h
Normal file
@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Copyright (c) 2023 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#ifndef STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_ITERATOR_H
|
||||
#define STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_ITERATOR_H
|
||||
|
||||
#include "lib/ob_errno.h"
|
||||
#include "mds_table_impl.h"
|
||||
#include "lib/guard/ob_light_shared_gaurd.h"
|
||||
#include "mds_table_handle.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
class ObLS;
|
||||
class ObTabletPointer;
|
||||
namespace mds
|
||||
{
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
using FilterFunction = ObFunction<int(mds::UserMdsNode<UnitKey, UnitValue>&, bool &)>;
|
||||
|
||||
inline void construct_lock_guard(mds::MdsRLockGuard &guard, mds::MdsLock &lock) {
|
||||
guard.~MdsRLockGuard();
|
||||
new (&guard) mds::MdsRLockGuard(lock);
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
struct ObMdsKvRowScanIterator {// will lock unit when init, but won't lock row when get_next_kv_row()
|
||||
using KvRowIter = typename mds::MdsUnit<UnitKey, UnitValue>::iterator;
|
||||
using KvRow = mds::KvPair<UnitKey, mds::Row<UnitKey, UnitValue>>;
|
||||
ObMdsKvRowScanIterator();
|
||||
int init(mds::MdsTableHandle &mds_table_handle);
|
||||
int get_next_kv_row(KvRow *&p_kv_row);
|
||||
TO_STRING_KV(KP(this), K_(is_inited), K_(is_first_scan),\
|
||||
K(typeid(UnitKey).name()), K(typeid(UnitValue).name()))
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool is_first_scan_;
|
||||
mds::MdsUnit<UnitKey, UnitValue> *p_mds_unit_;
|
||||
KvRowIter kv_row_iter_;
|
||||
mds::MdsRLockGuard unit_guard_;
|
||||
};
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
struct ObMdsNodeScanIterator {// will lock row when init, but won't lock node when get_next()
|
||||
using KvRow = mds::KvPair<UnitKey, mds::Row<UnitKey, UnitValue>>;
|
||||
using KvRowIter = typename mds::MdsUnit<UnitKey, UnitValue>::iterator;
|
||||
using NodeIter = typename KvRowIter::row_type::iterator;
|
||||
ObMdsNodeScanIterator(FilterFunction<UnitKey, UnitValue> &filter_function);
|
||||
int init(KvRow *&p_kv_rowm);
|
||||
int get_next_kv_node(UnitKey &key, mds::UserMdsNode<UnitKey, UnitValue> *&p_node);
|
||||
bool is_valid() const;
|
||||
void reset();
|
||||
TO_STRING_KV(KP(this), K_(is_inited), K_(is_first_scan),\
|
||||
K(typeid(UnitKey).name()), K(typeid(UnitValue).name()))
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool is_first_scan_;
|
||||
KvRow *p_mds_kv_row_;
|
||||
NodeIter node_iter_;
|
||||
FilterFunction<UnitKey, UnitValue> &filter_function_;
|
||||
mds::MdsRLockGuard row_guard_;
|
||||
};
|
||||
|
||||
/* about filter: init iterator with a filter makes get_next() only get specified nodes.
|
||||
* filter's input is mds node in specified unit, and a flag 'need_skip' to indicate wether this node should be seen when call get_next().
|
||||
* if filter's return value is not OB_SUCCESS, iterator will stop, every time call get_next() will return ITER_STOP.
|
||||
*/
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
struct ObMdsUnitRowNodeScanIterator {// will add mds table ref when init to make sure inner iter safe
|
||||
using KvRow = mds::KvPair<UnitKey, mds::Row<UnitKey, UnitValue>>;
|
||||
using KvRowIter = typename mds::MdsUnit<UnitKey, UnitValue>::iterator;
|
||||
using NodeIter = typename KvRowIter::row_type::iterator;
|
||||
struct DummyFilter {
|
||||
int operator()(mds::UserMdsNode<UnitKey, UnitValue> &, bool &need_skip) { need_skip = false; return OB_SUCCESS; }
|
||||
};
|
||||
ObMdsUnitRowNodeScanIterator();
|
||||
int init(mds::MdsTableHandle &mds_table_handle, const FilterFunction<UnitKey, UnitValue> &filter = DummyFilter());
|
||||
int get_next(UnitKey &key, mds::UserMdsNode<UnitKey, UnitValue> *&p_node);
|
||||
TO_STRING_KV(KP(this), K_(is_inited), K_(is_first_scan), K_(mds_table_handle),\
|
||||
K(typeid(UnitKey).name()), K(typeid(UnitValue).name()))
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool is_first_scan_;
|
||||
mds::MdsTableHandle mds_table_handle_;
|
||||
FilterFunction<UnitKey, UnitValue> filter_function_;
|
||||
ObMdsKvRowScanIterator<UnitKey, UnitValue> row_scan_iter_;
|
||||
ObMdsNodeScanIterator<UnitKey, UnitValue> node_scan_iter_;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_ITERATOR_H_IPP
|
||||
#define STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_ITERATOR_H_IPP
|
||||
#include "mds_table_iterator.ipp"
|
||||
#endif
|
||||
|
||||
#endif
|
218
src/storage/multi_data_source/mds_table_iterator.ipp
Normal file
218
src/storage/multi_data_source/mds_table_iterator.ipp
Normal file
@ -0,0 +1,218 @@
|
||||
/**
|
||||
* Copyright (c) 2023 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#ifndef STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_ITERATOR_IPP
|
||||
#define STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_ITERATOR_IPP
|
||||
|
||||
#ifndef STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_ITERATOR_H_IPP
|
||||
#define STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_ITERATOR_H_IPP
|
||||
#include "mds_table_iterator.h"
|
||||
#endif
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
ObMdsKvRowScanIterator<UnitKey, UnitValue>::ObMdsKvRowScanIterator()
|
||||
: is_inited_(false),
|
||||
is_first_scan_(false),
|
||||
p_mds_unit_(nullptr),
|
||||
kv_row_iter_(),
|
||||
unit_guard_() {}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsKvRowScanIterator<UnitKey, UnitValue>::init(mds::MdsTableHandle &mds_table_handle) {
|
||||
#define PRINT_WRAPPER KR(ret), K(mds_table_handle), K(typeid(UnitKey).name()),\
|
||||
K(typeid(UnitValue).name())
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
MDS_LOG_NONE(WARN, "ObMdsKvRowScanIterator init twice");
|
||||
} else if (!mds_table_handle.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG_NONE(WARN, "try iterate invalid mds table");
|
||||
} else if (OB_FAIL(mds_table_handle.get_mds_unit(p_mds_unit_))) {
|
||||
MDS_LOG_NONE(WARN, "fail to find unit in this mds table");
|
||||
} else {
|
||||
construct_lock_guard(unit_guard_, p_mds_unit_->lock_);// lock unit to make sure get kv_row safe
|
||||
is_inited_ = true;
|
||||
is_first_scan_ = true;
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsKvRowScanIterator<UnitKey, UnitValue>::get_next_kv_row(KvRow *&p_kv_row) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this)
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
MDS_LOG_NONE(WARN, "ObMdsKvRowScanIterator not init");
|
||||
} else if (is_first_scan_) {
|
||||
is_first_scan_ = false;
|
||||
kv_row_iter_ = p_mds_unit_->begin();
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (kv_row_iter_ == p_mds_unit_->end()) {
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
p_kv_row = &(*(kv_row_iter_++));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
ObMdsNodeScanIterator<UnitKey, UnitValue>::ObMdsNodeScanIterator(FilterFunction<UnitKey, UnitValue> &filter_function)
|
||||
: is_inited_(false),
|
||||
is_first_scan_(true),
|
||||
p_mds_kv_row_(nullptr),
|
||||
node_iter_(),
|
||||
filter_function_(filter_function),
|
||||
row_guard_() {}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsNodeScanIterator<UnitKey, UnitValue>::init(KvRow *&p_kv_row) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), KP(p_kv_row)
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
MDS_LOG_NONE(WARN, "ObMdsNodeScanIterator init twice");
|
||||
} else if (OB_ISNULL(p_kv_row)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG_NONE(WARN, "p_kv_row is NULL");
|
||||
} else {
|
||||
p_mds_kv_row_ = p_kv_row;
|
||||
construct_lock_guard(row_guard_, p_mds_kv_row_->v_.lock_);// lock unit to make sure get kv_row safe
|
||||
is_inited_ = true;
|
||||
is_first_scan_ = true;
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsNodeScanIterator<UnitKey, UnitValue>::get_next_kv_node(UnitKey &key,
|
||||
mds::UserMdsNode<UnitKey, UnitValue> *&p_node) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this)
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
MDS_LOG_NONE(WARN, "ObMdsNodeScanIterator not init");
|
||||
} else if (is_first_scan_) {
|
||||
is_first_scan_ = false;
|
||||
node_iter_ = p_mds_kv_row_->v_.begin();
|
||||
}
|
||||
bool need_filter = false;
|
||||
while (OB_SUCC(ret)) {
|
||||
need_filter = false;
|
||||
if (node_iter_ == p_mds_kv_row_->v_.end()) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_FAIL(filter_function_(*node_iter_, need_filter))) {
|
||||
MDS_LOG_NONE(WARN, "scan node failed", K(*p_node));
|
||||
} else if (need_filter) {
|
||||
node_iter_++;
|
||||
} else {
|
||||
key = p_mds_kv_row_->k_;
|
||||
p_node = &(*node_iter_++);
|
||||
break;
|
||||
MDS_LOG_NONE(TRACE, "scan node", K(*p_node));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
bool ObMdsNodeScanIterator<UnitKey, UnitValue>::is_valid() const { return is_inited_; }
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
void ObMdsNodeScanIterator<UnitKey, UnitValue>::reset() {
|
||||
FilterFunction<UnitKey, UnitValue> &filter_function = filter_function_;
|
||||
this->~ObMdsNodeScanIterator();
|
||||
new (this) ObMdsNodeScanIterator(filter_function);
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
ObMdsUnitRowNodeScanIterator<UnitKey, UnitValue>::ObMdsUnitRowNodeScanIterator()
|
||||
: is_inited_(false),
|
||||
is_first_scan_(true),
|
||||
mds_table_handle_(),
|
||||
filter_function_(),
|
||||
row_scan_iter_(),
|
||||
node_scan_iter_(filter_function_) {}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsUnitRowNodeScanIterator<UnitKey, UnitValue>::init(mds::MdsTableHandle &mds_table_handle,
|
||||
const FilterFunction<UnitKey, UnitValue> &filter) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this), K(mds_table_handle)
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
MDS_LOG_NONE(WARN, "ObMdsUnitRowNodeScanIterator init twice");
|
||||
} else if (!mds_table_handle.is_valid() || !filter.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
MDS_LOG_NONE(WARN, "mds_table_handle invalid");
|
||||
} else if (OB_FAIL(filter_function_.assign(filter))) {
|
||||
MDS_LOG_NONE(WARN, "failed to init filter function");
|
||||
} else {
|
||||
mds_table_handle_ = mds_table_handle;
|
||||
is_inited_ = true;
|
||||
is_first_scan_ = true;
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
template <typename UnitKey, typename UnitValue>
|
||||
int ObMdsUnitRowNodeScanIterator<UnitKey, UnitValue>::get_next(UnitKey &key,
|
||||
mds::UserMdsNode<UnitKey, UnitValue> *&p_node) {
|
||||
#define PRINT_WRAPPER KR(ret), K(*this)
|
||||
int ret = OB_SUCCESS;
|
||||
bool node_meet_end = false;
|
||||
bool row_mmet_end = false;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
MDS_LOG_NONE(WARN, "ObMdsUnitRowNodeScanIterator not init");
|
||||
} else if (is_first_scan_) {
|
||||
is_first_scan_ = false;
|
||||
if (OB_FAIL(row_scan_iter_.init(mds_table_handle_))) {
|
||||
MDS_LOG_NONE(WARN, "fail to init row_scan_iter_");
|
||||
}
|
||||
}
|
||||
while (OB_SUCC(ret) &&
|
||||
(!node_scan_iter_.is_valid() || // first time to scan
|
||||
OB_ITER_END == (ret = node_scan_iter_.get_next_kv_node(key, p_node)))) {// every time scan row end
|
||||
node_scan_iter_.reset();
|
||||
KvRow *p_kv_row = nullptr;
|
||||
if (OB_FAIL(row_scan_iter_.get_next_kv_row(p_kv_row))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
MDS_LOG_NONE(WARN, "fail to get kv row");
|
||||
}
|
||||
} else if (OB_FAIL(node_scan_iter_.init(p_kv_row))) {
|
||||
MDS_LOG_NONE(WARN, "fail to init node_scan_iter_");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
@ -12,6 +12,7 @@
|
||||
|
||||
#include "storage/tablet/ob_tablet_medium_info_reader.h"
|
||||
#include "lib/ob_errno.h"
|
||||
#include "storage/compaction/ob_medium_compaction_info.h"
|
||||
#include "storage/tablet/ob_tablet.h"
|
||||
#include "storage/tablet/ob_tablet_mds_data.h"
|
||||
|
||||
@ -43,6 +44,16 @@ ObTabletMediumInfoReader::~ObTabletMediumInfoReader()
|
||||
reset();
|
||||
}
|
||||
|
||||
struct MdsNodeIteratorCommittedNodeFilter {
|
||||
int operator()(mds::UserMdsNode<compaction::ObMediumCompactionInfoKey, compaction::ObMediumCompactionInfo> &node,
|
||||
bool &need_skip) {
|
||||
int ret = OB_SUCCESS;
|
||||
if (!node.is_committed_()) {
|
||||
need_skip = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
int ObTabletMediumInfoReader::init(common::ObArenaAllocator &allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -63,7 +74,7 @@ int ObTabletMediumInfoReader::init(common::ObArenaAllocator &allocator)
|
||||
ret = OB_SUCCESS;
|
||||
LOG_DEBUG("no mds table", K(ret), K(ls_id), K(tablet_id), K_(mds_end));
|
||||
}
|
||||
} else if (OB_FAIL(mds_iter_.init(mds_table))) {
|
||||
} else if (OB_FAIL(mds_iter_.init(mds_table, MdsNodeIteratorCommittedNodeFilter()))) {
|
||||
LOG_WARN("failed to init mds iter", K(ret), K(ls_id), K(tablet_id));
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "storage/meta_mem/ob_tablet_handle.h"
|
||||
#include "storage/multi_data_source/mds_table_handle.h"
|
||||
#include "storage/tablet/ob_tablet_dumped_medium_info.h"
|
||||
#include "storage/multi_data_source/mds_table_iterator.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
|
@ -36,6 +36,7 @@
|
||||
#include <numeric>
|
||||
#include "storage/multi_data_source/runtime_utility/mds_lock.h"
|
||||
#include "storage/tablet/ob_tablet_meta.h"
|
||||
#include "storage/multi_data_source/mds_table_iterator.h"
|
||||
namespace oceanbase {
|
||||
namespace storage
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user