diff --git a/src/logservice/palf/fixed_sliding_window.h b/src/logservice/palf/fixed_sliding_window.h index 7aa67b9752..86a18ca861 100644 --- a/src/logservice/palf/fixed_sliding_window.h +++ b/src/logservice/palf/fixed_sliding_window.h @@ -69,6 +69,7 @@ struct SlidingCond int ret_; }; +// static bool PALF_FIXED_SW_GET_HUNG = false; template class FixedSlidingWindow { @@ -174,6 +175,11 @@ public: } else if (OB_SUCC(check_id_in_range_(g_id))) { int64_t idx = calc_idx_(g_id); T *tmp_ptr = &(array_[idx]); + // for unittest + // while (true == PALF_FIXED_SW_GET_HUNG && g_id == 10) { + // ob_usleep(1000); + // PALF_LOG(TRACE, "sw get hung", K(g_id)); + // } ATOMIC_INC(&(array_[idx].ref_)); // double check to avoid array_[idx] has been slid // Note that we do not require begin_sn_ and end_dn_ have not changed during get(), @@ -181,7 +187,7 @@ public: if (OB_SUCC(check_id_in_range_(g_id))) { val = tmp_ptr; PALF_LOG(TRACE, "get succ", K(g_id), K(array_[idx].ref_)); - } else if (OB_SUCC(revert(g_id))) { + } else if (OB_SUCC(revert_(g_id))) { // begin_sn_ inc and greater than g_id, so dec ref count and return common::OB_ERR_OUT_OF_LOWER_BOUND // must call revert rather than ATOMIC_DEC(&array_[idx].ref_); PALF_LOG(INFO, "get fail and revert", K(g_id), K(begin_sn_), K(array_[idx].ref_)); @@ -209,8 +215,21 @@ public: if (IS_NOT_INIT) { ret = common::OB_NOT_INIT; PALF_LOG(WARN, "FixedSlidingWindow not init", KR(ret)); - // if r_id >= end_sn_, then slidingwindow[r_id] must haven't be getted, and revert(r_id) makes no sense. - } else if (r_id >= get_end_sn_() || r_id < (get_begin_sn() - size_)) { + } else if (OB_UNLIKELY(r_id < (get_begin_sn() - size_))) { + ret = common::OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "FixedSlidingWindow revert error", KR(ret), K(r_id), K(begin_sn_), K(end_sn_)); + } else { + ret = revert_(r_id); + } + return ret; + } + +private: + int revert_(const int64_t r_id) + { + int ret = common::OB_SUCCESS; + // if r_id >= end_sn_, then slidingwindow[r_id] must haven't be getted, and revert(r_id) makes no sense. + if (OB_UNLIKELY(r_id >= get_end_sn_())) { ret = common::OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "FixedSlidingWindow revert error", KR(ret), K(r_id), K(begin_sn_), K(end_sn_)); } else { @@ -225,7 +244,7 @@ public: int64_t idx = calc_idx_(r_id); int64_t tmp_id = r_id; int64_t curr_ref = common::OB_INVALID_COUNT; - if (0 > (curr_ref = ATOMIC_SAF(&(array_[idx].ref_), 1))) { + if (OB_UNLIKELY(0 > (curr_ref = ATOMIC_SAF(&(array_[idx].ref_), 1)))) { ret = common::OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "FixedSlidingWindow revert error", KR(ret), K(r_id), K(begin_sn_), K(end_sn_), K(curr_ref)); } else if (0 == curr_ref) { @@ -261,6 +280,7 @@ public: return ret; } +public: // desc: slide the slidingwindow[begin_sn_] continuously until timeout // For each log, first execute sliding_cb() and then inc begin_sn_ // @param[in] timeout_us: slide continuously until timeout diff --git a/unittest/logservice/test_fixed_sliding_window.cpp b/unittest/logservice/test_fixed_sliding_window.cpp index df84201c91..3a307c8560 100644 --- a/unittest/logservice/test_fixed_sliding_window.cpp +++ b/unittest/logservice/test_fixed_sliding_window.cpp @@ -147,9 +147,18 @@ public: int64_t timeout_us = 1 * 1000 * 1000; LogTaskDummyCallBack cb; PALF_LOG(INFO, "before slide", K(th_id)); - while (sw_->get_begin_sn() < 8193) { + int64_t start_sn = sw_->get_begin_sn(); + while (start_sn < 8193) { + int get_ret = common::OB_SUCCESS; + LogDummyData *val = NULL; + get_ret = sw_->get(start_sn, val); EXPECT_EQ(common::OB_SUCCESS, sw_->slide(timeout_us, &cb)); - usleep(1000); + usleep(500); + if (OB_SUCCESS == get_ret) { + sw_->revert(start_sn); + } + usleep(500); + start_sn = sw_->get_begin_sn(); } } FixedSlidingWindow *sw_; @@ -354,6 +363,76 @@ TEST(TestConcurrentSlidingWindow, test_concurrent_sliding_window) sw.destroy(); } +// reproduce bug +// TEST(TestBaseSlidingWindow, test_concurrent_get_slide) +// { +// // sliding window construct and destruct +// common::ObILogAllocator *alloc_mgr = NULL; +// ObTenantMutilAllocatorMgr::get_instance().init(); +// EXPECT_EQ(common::OB_SUCCESS, TMA_MGR_INSTANCE.get_tenant_log_allocator(common::OB_SERVER_TENANT_ID, alloc_mgr)); + +// // 1. set [0, 128) can slide +// const int64_t size = 128; +// FixedSlidingWindow sw3; +// EXPECT_EQ(OB_SUCCESS, sw3.init(0, size, alloc_mgr)); +// for (int64_t i = 0; i < size; ++i) { +// LogDummyData *val(NULL); +// EXPECT_EQ(common::OB_SUCCESS, sw3.get(i, val)); +// EXPECT_NE((LogDummyData*)NULL, val); +// val->log_id = i; +// val->can_remove = true; +// EXPECT_EQ(common::OB_SUCCESS, sw3.revert(i)); +// } + +// // 2. create and run a thread to wait in get(10) +// PALF_FIXED_SW_GET_HUNG = true; +// GetRunnable get_thread; +// get_thread.cnt_ = 1; +// get_thread.start_ = 10; +// get_thread.sw_ = &sw3; +// get_thread.run(); +// sleep(1); + +// // 3. slide to [128, 256) +// LogTaskDummyCallBack cb; +// EXPECT_EQ(common::OB_SUCCESS, sw3.slide(10 * 1000 * 1000, &cb)); +// EXPECT_EQ(128, sw3.get_begin_sn()); +// EXPECT_EQ(256, sw3.get_end_sn()); + +// // 4. slide to [256, 384) +// for (int64_t i = size; i < 2 * size; ++i) { +// LogDummyData *val(NULL); +// EXPECT_EQ(common::OB_SUCCESS, sw3.get(i, val)); +// EXPECT_NE((LogDummyData*)NULL, val); +// val->log_id = i; +// val->can_remove = true; +// EXPECT_EQ(common::OB_SUCCESS, sw3.revert(i)); +// } +// EXPECT_EQ(common::OB_SUCCESS, sw3.slide(10 * 1000 * 1000, &cb)); +// EXPECT_EQ(2 * size, sw3.get_begin_sn()); +// EXPECT_EQ(3 * size, sw3.get_end_sn()); + +// // 5. resume the thread and join +// PALF_FIXED_SW_GET_HUNG = false; +// sleep(1); +// // 6. continue to slide +// for (int64_t j = 2; j < 100; j++) { +// for (int64_t i = j * size; i < (j+1) * size; ++i) { +// LogDummyData *val(NULL); +// EXPECT_EQ(common::OB_SUCCESS, sw3.get(i, val)); +// EXPECT_NE((LogDummyData*)NULL, val); +// val->log_id = i; +// val->can_remove = true; +// EXPECT_EQ(common::OB_SUCCESS, sw3.revert(i)); +// } +// EXPECT_EQ(common::OB_SUCCESS, sw3.slide(10 * 1000 * 1000, &cb)); +// EXPECT_EQ((j+1) * size, sw3.get_begin_sn()); +// EXPECT_EQ((j+2) * size, sw3.get_end_sn()); +// } +// get_thread.join(); +// sw3.destroy(); +// } + } // end namespace unittest } // end namespace oceanbase