diff --git a/src/storage/tx/ob_multi_data_source.cpp b/src/storage/tx/ob_multi_data_source.cpp index 3a80e050af..bc706e3df7 100644 --- a/src/storage/tx/ob_multi_data_source.cpp +++ b/src/storage/tx/ob_multi_data_source.cpp @@ -74,10 +74,19 @@ void ObTxBufferNode::replace_data(const common::ObString &data) has_submitted_ = false; has_synced_ = false; } -//##################################################### -// ObTxMDSRange -//##################################################### +bool ObTxBufferNode::operator==(const ObTxBufferNode &buffer_node) const +{ + bool is_same = false; + + if (has_submitted_ == buffer_node.has_submitted_ && has_synced_ == buffer_node.has_synced_ + && mds_base_scn_ == buffer_node.mds_base_scn_ && type_ == buffer_node.type_ + && data_ == buffer_node.data_) { + is_same = true; + } + + return is_same; +} //##################################################### // ObMulSourceTxDataNotifier diff --git a/src/storage/tx/ob_multi_data_source.h b/src/storage/tx/ob_multi_data_source.h index ba83ff5fb6..0b3b83f4bf 100644 --- a/src/storage/tx/ob_multi_data_source.h +++ b/src/storage/tx/ob_multi_data_source.h @@ -117,6 +117,8 @@ public: const share::SCN &get_base_scn() { return mds_base_scn_; } + bool operator==(const ObTxBufferNode & buffer_node) const; + void log_sync_fail() { has_submitted_ = false; diff --git a/src/storage/tx/ob_trans_define.cpp b/src/storage/tx/ob_trans_define.cpp index 00d3285dc2..98b91d5128 100755 --- a/src/storage/tx/ob_trans_define.cpp +++ b/src/storage/tx/ob_trans_define.cpp @@ -706,32 +706,29 @@ DEFINE_TO_STRING_AND_YSON(ObTransKey, OB_ID(hash), hash_val_, void ObTxMDSRange::reset() { - list_ptr_ = nullptr; - start_iter_ = ObTxBufferNodeList::iterator(); - count_ = 0; + tx_ctx_ = nullptr; + range_array_.reset(); } -void ObTxMDSRange::clear() -{ - list_ptr_ = nullptr; - start_iter_ = ObTxBufferNodeList::iterator(); -} +// void ObTxMDSRange::clear() +// { +// list_ptr_ = nullptr; +// start_iter_ = ObTxBufferNodeList::iterator(); +// } -int ObTxMDSRange::init(ObTxBufferNodeList *list_ptr) +int ObTxMDSRange::init(ObPartTransCtx *tx_ctx) { int ret = OB_SUCCESS; - if (OB_NOT_NULL(list_ptr_)) { + if (OB_NOT_NULL(tx_ctx_)) { ret = OB_INIT_TWICE; - } else if (OB_ISNULL(list_ptr)) { + } else if (OB_ISNULL(tx_ctx)) { ret = OB_INVALID_ARGUMENT; } else { - list_ptr_ = list_ptr; - start_iter_ = list_ptr_->end(); - count_ = 0; + tx_ctx_ = tx_ctx; } if (OB_FAIL(ret)) { - TRANS_LOG(WARN, "init MDS range failed", K(ret)); + TRANS_LOG(WARN, "init MDS range failed", K(ret), KPC(tx_ctx)); } return ret; @@ -741,79 +738,40 @@ int ObTxMDSRange::update_range(ObTxBufferNodeList::iterator iter) { int ret = OB_SUCCESS; - if (OB_ISNULL(list_ptr_)) { + if (OB_ISNULL(tx_ctx_)) { ret = OB_NOT_INIT; - TRANS_LOG(WARN, "MDS range is not init", K(ret)); - } else if (iter == list_ptr_->end()) { + TRANS_LOG(WARN, "MDS range is not init", K(ret), KPC(tx_ctx_)); + } else if (!(*iter).is_valid()) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid iter", K(ret)); - } else if (start_iter_ == list_ptr_->end() || 0 == count_) { - start_iter_ = iter; - count_ = 1; - } else { - count_++; + TRANS_LOG(WARN, "invalid iter", K(ret), K(*iter)); + } else if (OB_FAIL(range_array_.push_back(*iter))) { + TRANS_LOG(WARN, "push back into the range array failed", K(ret), K(*iter), KPC(this), + KPC(tx_ctx_)); } return ret; } -int ObTxMDSRange::move_to(ObTxBufferNodeArray &tx_buffer_node_arr) +int ObTxMDSRange::move_from_cache_to_arr(ObTxMDSCache &mds_cache, + ObTxBufferNodeArray &mds_durable_arr) { int ret = OB_SUCCESS; - ObTxBufferNodeList::iterator del_iterator, next_iterator; - - if (OB_ISNULL(list_ptr_)) { + if (OB_ISNULL(tx_ctx_)) { ret = OB_NOT_INIT; - TRANS_LOG(WARN, "MDS range is not init", K(ret)); - } else if (start_iter_ == list_ptr_->end() || 0 == count_) { - // empty MDS range - TRANS_LOG(WARN, "use empty mds range when move", K(this), K(lbt())); + TRANS_LOG(WARN, "MDS range is not init", K(ret), KPC(tx_ctx_)); + } else if (range_array_.empty()) { + TRANS_LOG(WARN, "empty range in move function", K(ret), KPC(tx_ctx_)); } else { - int64_t i = 0; - del_iterator = list_ptr_->end(); - next_iterator = start_iter_; - - for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); i++) { - del_iterator = next_iterator; - next_iterator++; - - if (!del_iterator->is_submitted()) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "try to move unsubmitted MDS node", K(ret)); - } else if (OB_FALSE_IT(del_iterator->set_synced())) { - TRANS_LOG(WARN, "set synced MDS node failed", K(*del_iterator)); - } else if (OB_FAIL(tx_buffer_node_arr.push_back(*del_iterator))) { - TRANS_LOG(WARN, "push back MDS node failed", K(ret)); - } else if (OB_FAIL(list_ptr_->erase(del_iterator))) { - TRANS_LOG(WARN, "earse from MDS list failed", K(ret)); - } - } - } - - return ret; -} - -int ObTxMDSRange::copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const -{ - int ret = OB_SUCCESS; - - ObTxBufferNodeList::iterator next_iterator; - - if (OB_ISNULL(list_ptr_)) { - ret = OB_NOT_INIT; - TRANS_LOG(WARN, "MDS range is not init", K(ret)); - } else if (start_iter_ == list_ptr_->end() || 0 == count_) { - // empty MDS range - TRANS_LOG(WARN, "use empty mds range when copy", K(this), K(lbt())); - } else { - int64_t i = 0; - next_iterator = start_iter_; - - for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); - i++, next_iterator++) { - if (OB_FAIL(tx_buffer_node_arr.push_back(*next_iterator))) { - TRANS_LOG(WARN, "push back MDS node failed", K(ret)); + for (int64_t i = 0; i < range_array_.count() && OB_SUCC(ret); i++) { + if (OB_FAIL(mds_cache.earse_from_cache(range_array_[i]))) { + TRANS_LOG(WARN, "earse from mds cache failed", K(ret), K(range_array_[i]), K(mds_cache), + K(mds_durable_arr)); + } else if (OB_FALSE_IT(range_array_[i].set_synced())) { + //do nothing + } else if (OB_FAIL(mds_durable_arr.push_back(range_array_[i]))) { + TRANS_LOG(WARN, "push back into mds_durable_arr failed", K(ret), K(range_array_[i]), + K(mds_cache), K(mds_durable_arr)); } } } @@ -821,41 +779,91 @@ int ObTxMDSRange::copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const return ret; } +// int ObTxMDSRange::move_to(ObTxBufferNodeArray &tx_buffer_node_arr) +// { +// int ret = OB_SUCCESS; +// +// ObTxBufferNodeList::iterator del_iterator, next_iterator; +// +// if (OB_ISNULL(list_ptr_)) { +// ret = OB_NOT_INIT; +// TRANS_LOG(WARN, "MDS range is not init", K(ret)); +// } else if (start_iter_ == list_ptr_->end() || 0 == count_) { +// // empty MDS range +// TRANS_LOG(WARN, "use empty mds range when move", K(this), K(lbt())); +// } else { +// int64_t i = 0; +// del_iterator = list_ptr_->end(); +// next_iterator = start_iter_; +// +// for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); i++) { +// del_iterator = next_iterator; +// next_iterator++; +// +// if (!del_iterator->is_submitted()) { +// ret = OB_ERR_UNEXPECTED; +// TRANS_LOG(WARN, "try to move unsubmitted MDS node", K(ret)); +// } else if (OB_FALSE_IT(del_iterator->set_synced())) { +// TRANS_LOG(WARN, "set synced MDS node failed", K(*del_iterator)); +// } else if (OB_FAIL(tx_buffer_node_arr.push_back(*del_iterator))) { +// TRANS_LOG(WARN, "push back MDS node failed", K(ret)); +// } else if (OB_FAIL(list_ptr_->erase(del_iterator))) { +// TRANS_LOG(WARN, "earse from MDS list failed", K(ret)); +// } +// } +// } +// +// return ret; +// } +// +// int ObTxMDSRange::copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const +// { +// int ret = OB_SUCCESS; +// +// ObTxBufferNodeList::iterator next_iterator; +// +// if (OB_ISNULL(list_ptr_)) { +// ret = OB_NOT_INIT; +// TRANS_LOG(WARN, "MDS range is not init", K(ret)); +// } else if (start_iter_ == list_ptr_->end() || 0 == count_) { +// // empty MDS range +// TRANS_LOG(WARN, "use empty mds range when copy", K(this), K(lbt())); +// } else { +// int64_t i = 0; +// next_iterator = start_iter_; +// +// for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); +// i++, next_iterator++) { +// if (OB_FAIL(tx_buffer_node_arr.push_back(*next_iterator))) { +// TRANS_LOG(WARN, "push back MDS node failed", K(ret)); +// } +// } +// } +// +// return ret; +// } +// int ObTxMDSRange::range_submitted(ObTxMDSCache &cache) { int ret = OB_SUCCESS; - ObTxBufferNodeList::iterator next_iterator; int64_t i = 0; - if (OB_ISNULL(list_ptr_)) { + if (OB_ISNULL(tx_ctx_)) { ret = OB_NOT_INIT; - TRANS_LOG(WARN, "MDS range is not init", K(ret)); - } else if (start_iter_ == list_ptr_->end() || 0 == count_) { + TRANS_LOG(WARN, "MDS range is not init", K(ret),KPC(this),KPC(tx_ctx_)); + } else if (range_array_.empty()) { // empty MDS range - TRANS_LOG(WARN, "use empty mds range when submit range", K(cache), K(this), K(lbt())); + TRANS_LOG(WARN, "use empty mds range when submit range", K(ret),K(cache), KPC(this), KPC(tx_ctx_)); } else { - next_iterator = start_iter_; - - for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); - i++, next_iterator++) { - next_iterator->set_submitted(); - cache.update_submitted_iterator(next_iterator); - } + cache.update_submitted_iterator(range_array_); } return ret; } -void ObTxMDSRange::range_sync_failed() +void ObTxMDSRange::range_sync_failed(ObTxMDSCache &cache) { - ObTxBufferNodeList::iterator next_iterator; - int64_t i = 0; - - next_iterator = start_iter_; - - for (i = 0; i < count_ && next_iterator != list_ptr_->end(); i++, next_iterator++) { - next_iterator->log_sync_fail(); - } + cache.update_sync_failed_range(range_array_); } void ObTxMDSCache::reset() @@ -914,7 +922,8 @@ int ObTxMDSCache::rollback_last_mds_node() return ret; } -int ObTxMDSCache::fill_mds_log(ObTxMultiDataSourceLog &mds_log, +int ObTxMDSCache::fill_mds_log(ObPartTransCtx *ctx, + ObTxMultiDataSourceLog &mds_log, ObTxMDSRange &mds_range, logservice::ObReplayBarrierType &barrier_flag, share::SCN &mds_base_scn) @@ -929,7 +938,7 @@ int ObTxMDSCache::fill_mds_log(ObTxMultiDataSourceLog &mds_log, tmp_base_scn.reset(); logservice::ObReplayBarrierType tmp_barrier_type = logservice::ObReplayBarrierType::NO_NEED_BARRIER; - if (OB_FAIL(mds_range.init(&mds_list_))) { + if (OB_FAIL(mds_range.init(ctx))) { TRANS_LOG(WARN, "init mds range failed", K(ret)); } else { if (submitted_iterator_ == mds_list_.end()) { @@ -1003,6 +1012,55 @@ int ObTxMDSCache::copy_to(ObTxBufferNodeArray &tmp_array) const return ret; } +#define SEARCH_ITER_AFTER_SUBMITTED \ + int64_t search_count = 0; \ + do { \ + if (search_iter == mds_list_.end()) { \ + search_iter = mds_list_.begin(); \ + } else { \ + search_iter++; \ + if (search_iter == mds_list_.end()) { \ + search_iter = mds_list_.begin(); \ + } \ + } \ + search_count++; \ + if (search_count > mds_list_.size()) { \ + if (REACH_TIME_INTERVAL(1000 * 1000)) { \ + TRANS_LOG(ERROR, "unexpected buffer_node in mds_range", K(search_count), \ + K(mds_list_.size()), K(range_array[i]), K(*search_iter), KPC(this)); \ + } \ + } \ + } while (!((*search_iter) == range_array[i])); \ + +void ObTxMDSCache::update_submitted_iterator(ObTxBufferNodeArray &range_array) +{ + + int ret = OB_SUCCESS; + ObTxBufferNodeList::iterator search_iter = submitted_iterator_; + for (int i = 0; i < range_array.count() && OB_SUCC(ret); i++) { + SEARCH_ITER_AFTER_SUBMITTED + + search_iter->set_submitted(); + range_array[i].set_submitted(); + + unsubmitted_size_ = unsubmitted_size_ - search_iter->get_serialize_size(); + submitted_iterator_ = search_iter; + } +} + +void ObTxMDSCache::update_sync_failed_range(ObTxBufferNodeArray &range_array) +{ + + int ret = OB_SUCCESS; + ObTxBufferNodeList::iterator search_iter = submitted_iterator_; + for (int i = 0; i < range_array.count() && OB_SUCC(ret); i++) { + SEARCH_ITER_AFTER_SUBMITTED + + search_iter->log_sync_fail(); + range_array[i].log_sync_fail(); + } +} + bool ObTxMDSCache::is_contain(const ObTxDataSourceType target_type) const { bool contain = false; diff --git a/src/storage/tx/ob_trans_define.h b/src/storage/tx/ob_trans_define.h index 08bff1d8fa..1e3354e7a8 100644 --- a/src/storage/tx/ob_trans_define.h +++ b/src/storage/tx/ob_trans_define.h @@ -1554,19 +1554,22 @@ public: int insert_mds_node(const ObTxBufferNode &buf_node); int rollback_last_mds_node(); - int fill_mds_log(ObTxMultiDataSourceLog &mds_log, + int fill_mds_log(ObPartTransCtx* ctx, + ObTxMultiDataSourceLog &mds_log, ObTxMDSRange &mds_range, logservice::ObReplayBarrierType &barrier_flag, share::SCN &mds_base_scn); + int earse_from_cache(const ObTxBufferNode &node) {return mds_list_.erase(node); } int copy_to(ObTxBufferNodeArray &tmp_array) const; int64_t get_unsubmitted_size() const { return unsubmitted_size_; } int64_t count() const { return mds_list_.size(); } - void update_submitted_iterator(const ObTxBufferNodeList::iterator &iter) - { - unsubmitted_size_ = unsubmitted_size_ - iter->get_serialize_size(); - submitted_iterator_ = iter; - } + void update_submitted_iterator(ObTxBufferNodeArray & range_array); + void update_sync_failed_range(ObTxBufferNodeArray & range_array); + // { + // unsubmitted_size_ = unsubmitted_size_ - iter->get_serialize_size(); + // submitted_iterator_ = iter; + // } void clear_submitted_iterator() { submitted_iterator_ = mds_list_.end(); } bool is_contain(const ObTxDataSourceType target_type) const; @@ -1589,25 +1592,31 @@ class ObTxMDSRange public: ObTxMDSRange() { reset(); } void reset(); - void clear(); + // void clear(); - int init(ObTxBufferNodeList *list_ptr); + int init(ObPartTransCtx * tx_ctx); int update_range(ObTxBufferNodeList::iterator iter); - int move_to(ObTxBufferNodeArray &tx_buffer_node_arr); - int copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const; + int move_from_cache_to_arr(ObTxMDSCache & mds_cache, ObTxBufferNodeArray& mds_durable_arr); + // int move_to(ObTxBufferNodeArray &tx_buffer_node_arr); + // int copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const; int range_submitted(ObTxMDSCache &cache); - void range_sync_failed(); + void range_sync_failed(ObTxMDSCache &cache); - int64_t count() const { return count_; }; + int64_t count() const { return range_array_.count(); }; - TO_STRING_KV(K(count_)); + const ObTxBufferNodeArray & get_range_array() {return range_array_;} + + TO_STRING_KV(K(range_array_.count()),K(range_array_)); private: - ObTxBufferNodeList *list_ptr_; - ObTxBufferNodeList::iterator start_iter_; - int64_t count_; + ObTxBufferNodeArray range_array_; + ObPartTransCtx * tx_ctx_; + + // ObTxBufferNodeList *list_ptr_; + // ObTxBufferNodeList::iterator start_iter_; + // int64_t count_; }; static const int64_t MAX_TABLET_MODIFY_RECORD_COUNT = 16; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 4c3d9ae6ab..8e93921e80 100755 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1990,16 +1990,19 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) if (ObTxLogType::TX_REDO_LOG == log_type) { // do nothing } else if (ObTxLogType::TX_MULTI_DATA_SOURCE_LOG == log_type) { - tmp_array.reset(); share::SCN notify_redo_scn = log_cb->get_first_part_scn().is_valid() ? log_cb->get_first_part_scn() : log_ts; - if (OB_FAIL(log_cb->get_mds_range().copy_to(tmp_array))) { - TRANS_LOG(WARN, "copy mds log array failed", K(ret)); - } else if (OB_FAIL(log_cb->get_mds_range().move_to(exec_info_.multi_data_source_))) { - TRANS_LOG(WARN, "move MDS range into exec_info failed", K(ret)); + if (OB_FAIL(log_cb->get_mds_range().move_from_cache_to_arr(mds_cache_, + exec_info_.multi_data_source_))) { + TRANS_LOG(WARN, "move from mds cache to durable arr failed", K(ret)); + // } else if (OB_FAIL(log_cb->get_mds_range().move_to(exec_info_.multi_data_source_))) { + // TRANS_LOG(WARN, "move MDS range into exec_info failed", K(ret)); } else if (FALSE_IT(mds_cache_.clear_submitted_iterator())) { - //do nothing - } else if (OB_FAIL(notify_data_source_(NotifyType::ON_REDO, notify_redo_scn, false, tmp_array))) { + // do nothing + } else if (OB_FAIL(notify_data_source_(NotifyType::ON_REDO, + notify_redo_scn, + false, + log_cb->get_mds_range().get_range_array()))) { TRANS_LOG(WARN, "notify data source for ON_REDO", K(ret)); } else { log_cb->get_mds_range().reset(); @@ -2280,7 +2283,7 @@ int ObPartTransCtx::on_failure(ObTxLogCb *log_cb) const SCN log_ts = log_cb->get_log_ts(); // TODO, dingxi mt_ctx_.sync_log_fail(log_cb->get_callbacks()); - log_cb->get_mds_range().range_sync_failed(); + log_cb->get_mds_range().range_sync_failed(mds_cache_); if (log_ts == ctx_tx_data_.get_start_log_ts()) { ctx_tx_data_.set_start_log_ts(SCN()); } @@ -6075,7 +6078,7 @@ int ObPartTransCtx::submit_multi_data_source_(ObTxLogBlock &log_block) TRANS_LOG(WARN, "get log cb failed", KR(ret), K(*this)); } } else { - ret = mds_cache_.fill_mds_log(log, log_cb->get_mds_range(), barrier_type, mds_base_scn); + ret = mds_cache_.fill_mds_log(this, log, log_cb->get_mds_range(), barrier_type, mds_base_scn); } // TRANS_LOG(INFO, "after fill mds log", K(ret), K(trans_id_));