use ObTxBufferNodeArray in mds_range

This commit is contained in:
KyrielightWei
2023-06-29 02:43:05 +00:00
committed by ob-robot
parent 8502fbf8a6
commit 348bb57bac
5 changed files with 208 additions and 127 deletions

View File

@ -74,10 +74,19 @@ void ObTxBufferNode::replace_data(const common::ObString &data)
has_submitted_ = false; has_submitted_ = false;
has_synced_ = false; has_synced_ = false;
} }
//#####################################################
// ObTxMDSRange
//#####################################################
bool ObTxBufferNode::operator==(const ObTxBufferNode &buffer_node) const
{
bool is_same = false;
if (has_submitted_ == buffer_node.has_submitted_ && has_synced_ == buffer_node.has_synced_
&& mds_base_scn_ == buffer_node.mds_base_scn_ && type_ == buffer_node.type_
&& data_ == buffer_node.data_) {
is_same = true;
}
return is_same;
}
//##################################################### //#####################################################
// ObMulSourceTxDataNotifier // ObMulSourceTxDataNotifier

View File

@ -117,6 +117,8 @@ public:
const share::SCN &get_base_scn() { return mds_base_scn_; } const share::SCN &get_base_scn() { return mds_base_scn_; }
bool operator==(const ObTxBufferNode & buffer_node) const;
void log_sync_fail() void log_sync_fail()
{ {
has_submitted_ = false; has_submitted_ = false;

View File

@ -706,32 +706,29 @@ DEFINE_TO_STRING_AND_YSON(ObTransKey, OB_ID(hash), hash_val_,
void ObTxMDSRange::reset() void ObTxMDSRange::reset()
{ {
list_ptr_ = nullptr; tx_ctx_ = nullptr;
start_iter_ = ObTxBufferNodeList::iterator(); range_array_.reset();
count_ = 0;
} }
void ObTxMDSRange::clear() // void ObTxMDSRange::clear()
{ // {
list_ptr_ = nullptr; // list_ptr_ = nullptr;
start_iter_ = ObTxBufferNodeList::iterator(); // start_iter_ = ObTxBufferNodeList::iterator();
} // }
int ObTxMDSRange::init(ObTxBufferNodeList *list_ptr) int ObTxMDSRange::init(ObPartTransCtx *tx_ctx)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_NOT_NULL(list_ptr_)) { if (OB_NOT_NULL(tx_ctx_)) {
ret = OB_INIT_TWICE; ret = OB_INIT_TWICE;
} else if (OB_ISNULL(list_ptr)) { } else if (OB_ISNULL(tx_ctx)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
} else { } else {
list_ptr_ = list_ptr; tx_ctx_ = tx_ctx;
start_iter_ = list_ptr_->end();
count_ = 0;
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "init MDS range failed", K(ret)); TRANS_LOG(WARN, "init MDS range failed", K(ret), KPC(tx_ctx));
} }
return ret; return ret;
@ -741,79 +738,40 @@ int ObTxMDSRange::update_range(ObTxBufferNodeList::iterator iter)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(list_ptr_)) { if (OB_ISNULL(tx_ctx_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
TRANS_LOG(WARN, "MDS range is not init", K(ret)); TRANS_LOG(WARN, "MDS range is not init", K(ret), KPC(tx_ctx_));
} else if (iter == list_ptr_->end()) { } else if (!(*iter).is_valid()) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid iter", K(ret)); TRANS_LOG(WARN, "invalid iter", K(ret), K(*iter));
} else if (start_iter_ == list_ptr_->end() || 0 == count_) { } else if (OB_FAIL(range_array_.push_back(*iter))) {
start_iter_ = iter; TRANS_LOG(WARN, "push back into the range array failed", K(ret), K(*iter), KPC(this),
count_ = 1; KPC(tx_ctx_));
} else {
count_++;
} }
return ret; return ret;
} }
int ObTxMDSRange::move_to(ObTxBufferNodeArray &tx_buffer_node_arr) int ObTxMDSRange::move_from_cache_to_arr(ObTxMDSCache &mds_cache,
ObTxBufferNodeArray &mds_durable_arr)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTxBufferNodeList::iterator del_iterator, next_iterator; if (OB_ISNULL(tx_ctx_)) {
if (OB_ISNULL(list_ptr_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
TRANS_LOG(WARN, "MDS range is not init", K(ret)); TRANS_LOG(WARN, "MDS range is not init", K(ret), KPC(tx_ctx_));
} else if (start_iter_ == list_ptr_->end() || 0 == count_) { } else if (range_array_.empty()) {
// empty MDS range TRANS_LOG(WARN, "empty range in move function", K(ret), KPC(tx_ctx_));
TRANS_LOG(WARN, "use empty mds range when move", K(this), K(lbt()));
} else { } else {
int64_t i = 0; for (int64_t i = 0; i < range_array_.count() && OB_SUCC(ret); i++) {
del_iterator = list_ptr_->end(); if (OB_FAIL(mds_cache.earse_from_cache(range_array_[i]))) {
next_iterator = start_iter_; TRANS_LOG(WARN, "earse from mds cache failed", K(ret), K(range_array_[i]), K(mds_cache),
K(mds_durable_arr));
for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); i++) { } else if (OB_FALSE_IT(range_array_[i].set_synced())) {
del_iterator = next_iterator; //do nothing
next_iterator++; } else if (OB_FAIL(mds_durable_arr.push_back(range_array_[i]))) {
TRANS_LOG(WARN, "push back into mds_durable_arr failed", K(ret), K(range_array_[i]),
if (!del_iterator->is_submitted()) { K(mds_cache), K(mds_durable_arr));
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "try to move unsubmitted MDS node", K(ret));
} else if (OB_FALSE_IT(del_iterator->set_synced())) {
TRANS_LOG(WARN, "set synced MDS node failed", K(*del_iterator));
} else if (OB_FAIL(tx_buffer_node_arr.push_back(*del_iterator))) {
TRANS_LOG(WARN, "push back MDS node failed", K(ret));
} else if (OB_FAIL(list_ptr_->erase(del_iterator))) {
TRANS_LOG(WARN, "earse from MDS list failed", K(ret));
}
}
}
return ret;
}
int ObTxMDSRange::copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const
{
int ret = OB_SUCCESS;
ObTxBufferNodeList::iterator next_iterator;
if (OB_ISNULL(list_ptr_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "MDS range is not init", K(ret));
} else if (start_iter_ == list_ptr_->end() || 0 == count_) {
// empty MDS range
TRANS_LOG(WARN, "use empty mds range when copy", K(this), K(lbt()));
} else {
int64_t i = 0;
next_iterator = start_iter_;
for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end();
i++, next_iterator++) {
if (OB_FAIL(tx_buffer_node_arr.push_back(*next_iterator))) {
TRANS_LOG(WARN, "push back MDS node failed", K(ret));
} }
} }
} }
@ -821,41 +779,91 @@ int ObTxMDSRange::copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const
return ret; return ret;
} }
// int ObTxMDSRange::move_to(ObTxBufferNodeArray &tx_buffer_node_arr)
// {
// int ret = OB_SUCCESS;
//
// ObTxBufferNodeList::iterator del_iterator, next_iterator;
//
// if (OB_ISNULL(list_ptr_)) {
// ret = OB_NOT_INIT;
// TRANS_LOG(WARN, "MDS range is not init", K(ret));
// } else if (start_iter_ == list_ptr_->end() || 0 == count_) {
// // empty MDS range
// TRANS_LOG(WARN, "use empty mds range when move", K(this), K(lbt()));
// } else {
// int64_t i = 0;
// del_iterator = list_ptr_->end();
// next_iterator = start_iter_;
//
// for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); i++) {
// del_iterator = next_iterator;
// next_iterator++;
//
// if (!del_iterator->is_submitted()) {
// ret = OB_ERR_UNEXPECTED;
// TRANS_LOG(WARN, "try to move unsubmitted MDS node", K(ret));
// } else if (OB_FALSE_IT(del_iterator->set_synced())) {
// TRANS_LOG(WARN, "set synced MDS node failed", K(*del_iterator));
// } else if (OB_FAIL(tx_buffer_node_arr.push_back(*del_iterator))) {
// TRANS_LOG(WARN, "push back MDS node failed", K(ret));
// } else if (OB_FAIL(list_ptr_->erase(del_iterator))) {
// TRANS_LOG(WARN, "earse from MDS list failed", K(ret));
// }
// }
// }
//
// return ret;
// }
//
// int ObTxMDSRange::copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const
// {
// int ret = OB_SUCCESS;
//
// ObTxBufferNodeList::iterator next_iterator;
//
// if (OB_ISNULL(list_ptr_)) {
// ret = OB_NOT_INIT;
// TRANS_LOG(WARN, "MDS range is not init", K(ret));
// } else if (start_iter_ == list_ptr_->end() || 0 == count_) {
// // empty MDS range
// TRANS_LOG(WARN, "use empty mds range when copy", K(this), K(lbt()));
// } else {
// int64_t i = 0;
// next_iterator = start_iter_;
//
// for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end();
// i++, next_iterator++) {
// if (OB_FAIL(tx_buffer_node_arr.push_back(*next_iterator))) {
// TRANS_LOG(WARN, "push back MDS node failed", K(ret));
// }
// }
// }
//
// return ret;
// }
//
int ObTxMDSRange::range_submitted(ObTxMDSCache &cache) int ObTxMDSRange::range_submitted(ObTxMDSCache &cache)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTxBufferNodeList::iterator next_iterator;
int64_t i = 0; int64_t i = 0;
if (OB_ISNULL(list_ptr_)) { if (OB_ISNULL(tx_ctx_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
TRANS_LOG(WARN, "MDS range is not init", K(ret)); TRANS_LOG(WARN, "MDS range is not init", K(ret),KPC(this),KPC(tx_ctx_));
} else if (start_iter_ == list_ptr_->end() || 0 == count_) { } else if (range_array_.empty()) {
// empty MDS range // empty MDS range
TRANS_LOG(WARN, "use empty mds range when submit range", K(cache), K(this), K(lbt())); TRANS_LOG(WARN, "use empty mds range when submit range", K(ret),K(cache), KPC(this), KPC(tx_ctx_));
} else { } else {
next_iterator = start_iter_; cache.update_submitted_iterator(range_array_);
for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end();
i++, next_iterator++) {
next_iterator->set_submitted();
cache.update_submitted_iterator(next_iterator);
}
} }
return ret; return ret;
} }
void ObTxMDSRange::range_sync_failed() void ObTxMDSRange::range_sync_failed(ObTxMDSCache &cache)
{ {
ObTxBufferNodeList::iterator next_iterator; cache.update_sync_failed_range(range_array_);
int64_t i = 0;
next_iterator = start_iter_;
for (i = 0; i < count_ && next_iterator != list_ptr_->end(); i++, next_iterator++) {
next_iterator->log_sync_fail();
}
} }
void ObTxMDSCache::reset() void ObTxMDSCache::reset()
@ -914,7 +922,8 @@ int ObTxMDSCache::rollback_last_mds_node()
return ret; return ret;
} }
int ObTxMDSCache::fill_mds_log(ObTxMultiDataSourceLog &mds_log, int ObTxMDSCache::fill_mds_log(ObPartTransCtx *ctx,
ObTxMultiDataSourceLog &mds_log,
ObTxMDSRange &mds_range, ObTxMDSRange &mds_range,
logservice::ObReplayBarrierType &barrier_flag, logservice::ObReplayBarrierType &barrier_flag,
share::SCN &mds_base_scn) share::SCN &mds_base_scn)
@ -929,7 +938,7 @@ int ObTxMDSCache::fill_mds_log(ObTxMultiDataSourceLog &mds_log,
tmp_base_scn.reset(); tmp_base_scn.reset();
logservice::ObReplayBarrierType tmp_barrier_type = logservice::ObReplayBarrierType::NO_NEED_BARRIER; logservice::ObReplayBarrierType tmp_barrier_type = logservice::ObReplayBarrierType::NO_NEED_BARRIER;
if (OB_FAIL(mds_range.init(&mds_list_))) { if (OB_FAIL(mds_range.init(ctx))) {
TRANS_LOG(WARN, "init mds range failed", K(ret)); TRANS_LOG(WARN, "init mds range failed", K(ret));
} else { } else {
if (submitted_iterator_ == mds_list_.end()) { if (submitted_iterator_ == mds_list_.end()) {
@ -1003,6 +1012,55 @@ int ObTxMDSCache::copy_to(ObTxBufferNodeArray &tmp_array) const
return ret; return ret;
} }
#define SEARCH_ITER_AFTER_SUBMITTED \
int64_t search_count = 0; \
do { \
if (search_iter == mds_list_.end()) { \
search_iter = mds_list_.begin(); \
} else { \
search_iter++; \
if (search_iter == mds_list_.end()) { \
search_iter = mds_list_.begin(); \
} \
} \
search_count++; \
if (search_count > mds_list_.size()) { \
if (REACH_TIME_INTERVAL(1000 * 1000)) { \
TRANS_LOG(ERROR, "unexpected buffer_node in mds_range", K(search_count), \
K(mds_list_.size()), K(range_array[i]), K(*search_iter), KPC(this)); \
} \
} \
} while (!((*search_iter) == range_array[i])); \
void ObTxMDSCache::update_submitted_iterator(ObTxBufferNodeArray &range_array)
{
int ret = OB_SUCCESS;
ObTxBufferNodeList::iterator search_iter = submitted_iterator_;
for (int i = 0; i < range_array.count() && OB_SUCC(ret); i++) {
SEARCH_ITER_AFTER_SUBMITTED
search_iter->set_submitted();
range_array[i].set_submitted();
unsubmitted_size_ = unsubmitted_size_ - search_iter->get_serialize_size();
submitted_iterator_ = search_iter;
}
}
void ObTxMDSCache::update_sync_failed_range(ObTxBufferNodeArray &range_array)
{
int ret = OB_SUCCESS;
ObTxBufferNodeList::iterator search_iter = submitted_iterator_;
for (int i = 0; i < range_array.count() && OB_SUCC(ret); i++) {
SEARCH_ITER_AFTER_SUBMITTED
search_iter->log_sync_fail();
range_array[i].log_sync_fail();
}
}
bool ObTxMDSCache::is_contain(const ObTxDataSourceType target_type) const bool ObTxMDSCache::is_contain(const ObTxDataSourceType target_type) const
{ {
bool contain = false; bool contain = false;

View File

@ -1554,19 +1554,22 @@ public:
int insert_mds_node(const ObTxBufferNode &buf_node); int insert_mds_node(const ObTxBufferNode &buf_node);
int rollback_last_mds_node(); int rollback_last_mds_node();
int fill_mds_log(ObTxMultiDataSourceLog &mds_log, int fill_mds_log(ObPartTransCtx* ctx,
ObTxMultiDataSourceLog &mds_log,
ObTxMDSRange &mds_range, ObTxMDSRange &mds_range,
logservice::ObReplayBarrierType &barrier_flag, logservice::ObReplayBarrierType &barrier_flag,
share::SCN &mds_base_scn); share::SCN &mds_base_scn);
int earse_from_cache(const ObTxBufferNode &node) {return mds_list_.erase(node); }
int copy_to(ObTxBufferNodeArray &tmp_array) const; int copy_to(ObTxBufferNodeArray &tmp_array) const;
int64_t get_unsubmitted_size() const { return unsubmitted_size_; } int64_t get_unsubmitted_size() const { return unsubmitted_size_; }
int64_t count() const { return mds_list_.size(); } int64_t count() const { return mds_list_.size(); }
void update_submitted_iterator(const ObTxBufferNodeList::iterator &iter) void update_submitted_iterator(ObTxBufferNodeArray & range_array);
{ void update_sync_failed_range(ObTxBufferNodeArray & range_array);
unsubmitted_size_ = unsubmitted_size_ - iter->get_serialize_size(); // {
submitted_iterator_ = iter; // unsubmitted_size_ = unsubmitted_size_ - iter->get_serialize_size();
} // submitted_iterator_ = iter;
// }
void clear_submitted_iterator() { submitted_iterator_ = mds_list_.end(); } void clear_submitted_iterator() { submitted_iterator_ = mds_list_.end(); }
bool is_contain(const ObTxDataSourceType target_type) const; bool is_contain(const ObTxDataSourceType target_type) const;
@ -1589,25 +1592,31 @@ class ObTxMDSRange
public: public:
ObTxMDSRange() { reset(); } ObTxMDSRange() { reset(); }
void reset(); void reset();
void clear(); // void clear();
int init(ObTxBufferNodeList *list_ptr); int init(ObPartTransCtx * tx_ctx);
int update_range(ObTxBufferNodeList::iterator iter); int update_range(ObTxBufferNodeList::iterator iter);
int move_to(ObTxBufferNodeArray &tx_buffer_node_arr); int move_from_cache_to_arr(ObTxMDSCache & mds_cache, ObTxBufferNodeArray& mds_durable_arr);
int copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const; // int move_to(ObTxBufferNodeArray &tx_buffer_node_arr);
// int copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const;
int range_submitted(ObTxMDSCache &cache); int range_submitted(ObTxMDSCache &cache);
void range_sync_failed(); void range_sync_failed(ObTxMDSCache &cache);
int64_t count() const { return count_; }; int64_t count() const { return range_array_.count(); };
TO_STRING_KV(K(count_)); const ObTxBufferNodeArray & get_range_array() {return range_array_;}
TO_STRING_KV(K(range_array_.count()),K(range_array_));
private: private:
ObTxBufferNodeList *list_ptr_; ObTxBufferNodeArray range_array_;
ObTxBufferNodeList::iterator start_iter_; ObPartTransCtx * tx_ctx_;
int64_t count_;
// ObTxBufferNodeList *list_ptr_;
// ObTxBufferNodeList::iterator start_iter_;
// int64_t count_;
}; };
static const int64_t MAX_TABLET_MODIFY_RECORD_COUNT = 16; static const int64_t MAX_TABLET_MODIFY_RECORD_COUNT = 16;

View File

@ -1990,16 +1990,19 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
if (ObTxLogType::TX_REDO_LOG == log_type) { if (ObTxLogType::TX_REDO_LOG == log_type) {
// do nothing // do nothing
} else if (ObTxLogType::TX_MULTI_DATA_SOURCE_LOG == log_type) { } else if (ObTxLogType::TX_MULTI_DATA_SOURCE_LOG == log_type) {
tmp_array.reset();
share::SCN notify_redo_scn = share::SCN notify_redo_scn =
log_cb->get_first_part_scn().is_valid() ? log_cb->get_first_part_scn() : log_ts; log_cb->get_first_part_scn().is_valid() ? log_cb->get_first_part_scn() : log_ts;
if (OB_FAIL(log_cb->get_mds_range().copy_to(tmp_array))) { if (OB_FAIL(log_cb->get_mds_range().move_from_cache_to_arr(mds_cache_,
TRANS_LOG(WARN, "copy mds log array failed", K(ret)); exec_info_.multi_data_source_))) {
} else if (OB_FAIL(log_cb->get_mds_range().move_to(exec_info_.multi_data_source_))) { TRANS_LOG(WARN, "move from mds cache to durable arr failed", K(ret));
TRANS_LOG(WARN, "move MDS range into exec_info failed", K(ret)); // } else if (OB_FAIL(log_cb->get_mds_range().move_to(exec_info_.multi_data_source_))) {
// TRANS_LOG(WARN, "move MDS range into exec_info failed", K(ret));
} else if (FALSE_IT(mds_cache_.clear_submitted_iterator())) { } else if (FALSE_IT(mds_cache_.clear_submitted_iterator())) {
//do nothing // do nothing
} else if (OB_FAIL(notify_data_source_(NotifyType::ON_REDO, notify_redo_scn, false, tmp_array))) { } else if (OB_FAIL(notify_data_source_(NotifyType::ON_REDO,
notify_redo_scn,
false,
log_cb->get_mds_range().get_range_array()))) {
TRANS_LOG(WARN, "notify data source for ON_REDO", K(ret)); TRANS_LOG(WARN, "notify data source for ON_REDO", K(ret));
} else { } else {
log_cb->get_mds_range().reset(); log_cb->get_mds_range().reset();
@ -2280,7 +2283,7 @@ int ObPartTransCtx::on_failure(ObTxLogCb *log_cb)
const SCN log_ts = log_cb->get_log_ts(); const SCN log_ts = log_cb->get_log_ts();
// TODO, dingxi // TODO, dingxi
mt_ctx_.sync_log_fail(log_cb->get_callbacks()); mt_ctx_.sync_log_fail(log_cb->get_callbacks());
log_cb->get_mds_range().range_sync_failed(); log_cb->get_mds_range().range_sync_failed(mds_cache_);
if (log_ts == ctx_tx_data_.get_start_log_ts()) { if (log_ts == ctx_tx_data_.get_start_log_ts()) {
ctx_tx_data_.set_start_log_ts(SCN()); ctx_tx_data_.set_start_log_ts(SCN());
} }
@ -6075,7 +6078,7 @@ int ObPartTransCtx::submit_multi_data_source_(ObTxLogBlock &log_block)
TRANS_LOG(WARN, "get log cb failed", KR(ret), K(*this)); TRANS_LOG(WARN, "get log cb failed", KR(ret), K(*this));
} }
} else { } else {
ret = mds_cache_.fill_mds_log(log, log_cb->get_mds_range(), barrier_type, mds_base_scn); ret = mds_cache_.fill_mds_log(this, log, log_cb->get_mds_range(), barrier_type, mds_base_scn);
} }
// TRANS_LOG(INFO, "after fill mds log", K(ret), K(trans_id_)); // TRANS_LOG(INFO, "after fill mds log", K(ret), K(trans_id_));