[fix] fix concurrent get and slide in FixedSlidingWindow
This commit is contained in:
@ -69,6 +69,7 @@ struct SlidingCond
|
|||||||
int ret_;
|
int ret_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// static bool PALF_FIXED_SW_GET_HUNG = false;
|
||||||
template <typename T = FixedSlidingWindowSlot>
|
template <typename T = FixedSlidingWindowSlot>
|
||||||
class FixedSlidingWindow
|
class FixedSlidingWindow
|
||||||
{
|
{
|
||||||
@ -174,6 +175,11 @@ public:
|
|||||||
} else if (OB_SUCC(check_id_in_range_(g_id))) {
|
} else if (OB_SUCC(check_id_in_range_(g_id))) {
|
||||||
int64_t idx = calc_idx_(g_id);
|
int64_t idx = calc_idx_(g_id);
|
||||||
T *tmp_ptr = &(array_[idx]);
|
T *tmp_ptr = &(array_[idx]);
|
||||||
|
// for unittest
|
||||||
|
// while (true == PALF_FIXED_SW_GET_HUNG && g_id == 10) {
|
||||||
|
// ob_usleep(1000);
|
||||||
|
// PALF_LOG(TRACE, "sw get hung", K(g_id));
|
||||||
|
// }
|
||||||
ATOMIC_INC(&(array_[idx].ref_));
|
ATOMIC_INC(&(array_[idx].ref_));
|
||||||
// double check to avoid array_[idx] has been slid
|
// double check to avoid array_[idx] has been slid
|
||||||
// Note that we do not require begin_sn_ and end_dn_ have not changed during get(),
|
// Note that we do not require begin_sn_ and end_dn_ have not changed during get(),
|
||||||
@ -181,7 +187,7 @@ public:
|
|||||||
if (OB_SUCC(check_id_in_range_(g_id))) {
|
if (OB_SUCC(check_id_in_range_(g_id))) {
|
||||||
val = tmp_ptr;
|
val = tmp_ptr;
|
||||||
PALF_LOG(TRACE, "get succ", K(g_id), K(array_[idx].ref_));
|
PALF_LOG(TRACE, "get succ", K(g_id), K(array_[idx].ref_));
|
||||||
} else if (OB_SUCC(revert(g_id))) {
|
} else if (OB_SUCC(revert_(g_id))) {
|
||||||
// begin_sn_ inc and greater than g_id, so dec ref count and return common::OB_ERR_OUT_OF_LOWER_BOUND
|
// begin_sn_ inc and greater than g_id, so dec ref count and return common::OB_ERR_OUT_OF_LOWER_BOUND
|
||||||
// must call revert rather than ATOMIC_DEC(&array_[idx].ref_);
|
// must call revert rather than ATOMIC_DEC(&array_[idx].ref_);
|
||||||
PALF_LOG(INFO, "get fail and revert", K(g_id), K(begin_sn_), K(array_[idx].ref_));
|
PALF_LOG(INFO, "get fail and revert", K(g_id), K(begin_sn_), K(array_[idx].ref_));
|
||||||
@ -209,8 +215,21 @@ public:
|
|||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = common::OB_NOT_INIT;
|
ret = common::OB_NOT_INIT;
|
||||||
PALF_LOG(WARN, "FixedSlidingWindow not init", KR(ret));
|
PALF_LOG(WARN, "FixedSlidingWindow not init", KR(ret));
|
||||||
// if r_id >= end_sn_, then slidingwindow[r_id] must haven't be getted, and revert(r_id) makes no sense.
|
} else if (OB_UNLIKELY(r_id < (get_begin_sn() - size_))) {
|
||||||
} else if (r_id >= get_end_sn_() || r_id < (get_begin_sn() - size_)) {
|
ret = common::OB_ERR_UNEXPECTED;
|
||||||
|
PALF_LOG(ERROR, "FixedSlidingWindow revert error", KR(ret), K(r_id), K(begin_sn_), K(end_sn_));
|
||||||
|
} else {
|
||||||
|
ret = revert_(r_id);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
int revert_(const int64_t r_id)
|
||||||
|
{
|
||||||
|
int ret = common::OB_SUCCESS;
|
||||||
|
// if r_id >= end_sn_, then slidingwindow[r_id] must haven't be getted, and revert(r_id) makes no sense.
|
||||||
|
if (OB_UNLIKELY(r_id >= get_end_sn_())) {
|
||||||
ret = common::OB_ERR_UNEXPECTED;
|
ret = common::OB_ERR_UNEXPECTED;
|
||||||
PALF_LOG(ERROR, "FixedSlidingWindow revert error", KR(ret), K(r_id), K(begin_sn_), K(end_sn_));
|
PALF_LOG(ERROR, "FixedSlidingWindow revert error", KR(ret), K(r_id), K(begin_sn_), K(end_sn_));
|
||||||
} else {
|
} else {
|
||||||
@ -225,7 +244,7 @@ public:
|
|||||||
int64_t idx = calc_idx_(r_id);
|
int64_t idx = calc_idx_(r_id);
|
||||||
int64_t tmp_id = r_id;
|
int64_t tmp_id = r_id;
|
||||||
int64_t curr_ref = common::OB_INVALID_COUNT;
|
int64_t curr_ref = common::OB_INVALID_COUNT;
|
||||||
if (0 > (curr_ref = ATOMIC_SAF(&(array_[idx].ref_), 1))) {
|
if (OB_UNLIKELY(0 > (curr_ref = ATOMIC_SAF(&(array_[idx].ref_), 1)))) {
|
||||||
ret = common::OB_ERR_UNEXPECTED;
|
ret = common::OB_ERR_UNEXPECTED;
|
||||||
PALF_LOG(ERROR, "FixedSlidingWindow revert error", KR(ret), K(r_id), K(begin_sn_), K(end_sn_), K(curr_ref));
|
PALF_LOG(ERROR, "FixedSlidingWindow revert error", KR(ret), K(r_id), K(begin_sn_), K(end_sn_), K(curr_ref));
|
||||||
} else if (0 == curr_ref) {
|
} else if (0 == curr_ref) {
|
||||||
@ -261,6 +280,7 @@ public:
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
// desc: slide the slidingwindow[begin_sn_] continuously until timeout
|
// desc: slide the slidingwindow[begin_sn_] continuously until timeout
|
||||||
// For each log, first execute sliding_cb() and then inc begin_sn_
|
// For each log, first execute sliding_cb() and then inc begin_sn_
|
||||||
// @param[in] timeout_us: slide continuously until timeout
|
// @param[in] timeout_us: slide continuously until timeout
|
||||||
|
|||||||
@ -147,9 +147,18 @@ public:
|
|||||||
int64_t timeout_us = 1 * 1000 * 1000;
|
int64_t timeout_us = 1 * 1000 * 1000;
|
||||||
LogTaskDummyCallBack cb;
|
LogTaskDummyCallBack cb;
|
||||||
PALF_LOG(INFO, "before slide", K(th_id));
|
PALF_LOG(INFO, "before slide", K(th_id));
|
||||||
while (sw_->get_begin_sn() < 8193) {
|
int64_t start_sn = sw_->get_begin_sn();
|
||||||
|
while (start_sn < 8193) {
|
||||||
|
int get_ret = common::OB_SUCCESS;
|
||||||
|
LogDummyData *val = NULL;
|
||||||
|
get_ret = sw_->get(start_sn, val);
|
||||||
EXPECT_EQ(common::OB_SUCCESS, sw_->slide(timeout_us, &cb));
|
EXPECT_EQ(common::OB_SUCCESS, sw_->slide(timeout_us, &cb));
|
||||||
usleep(1000);
|
usleep(500);
|
||||||
|
if (OB_SUCCESS == get_ret) {
|
||||||
|
sw_->revert(start_sn);
|
||||||
|
}
|
||||||
|
usleep(500);
|
||||||
|
start_sn = sw_->get_begin_sn();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FixedSlidingWindow<LogDummyData> *sw_;
|
FixedSlidingWindow<LogDummyData> *sw_;
|
||||||
@ -354,6 +363,76 @@ TEST(TestConcurrentSlidingWindow, test_concurrent_sliding_window)
|
|||||||
sw.destroy();
|
sw.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reproduce bug
|
||||||
|
// TEST(TestBaseSlidingWindow, test_concurrent_get_slide)
|
||||||
|
// {
|
||||||
|
// // sliding window construct and destruct
|
||||||
|
// common::ObILogAllocator *alloc_mgr = NULL;
|
||||||
|
// ObTenantMutilAllocatorMgr::get_instance().init();
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, TMA_MGR_INSTANCE.get_tenant_log_allocator(common::OB_SERVER_TENANT_ID, alloc_mgr));
|
||||||
|
|
||||||
|
// // 1. set [0, 128) can slide
|
||||||
|
// const int64_t size = 128;
|
||||||
|
// FixedSlidingWindow<LogDummyData> sw3;
|
||||||
|
// EXPECT_EQ(OB_SUCCESS, sw3.init(0, size, alloc_mgr));
|
||||||
|
// for (int64_t i = 0; i < size; ++i) {
|
||||||
|
// LogDummyData *val(NULL);
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, sw3.get(i, val));
|
||||||
|
// EXPECT_NE((LogDummyData*)NULL, val);
|
||||||
|
// val->log_id = i;
|
||||||
|
// val->can_remove = true;
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, sw3.revert(i));
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // 2. create and run a thread to wait in get(10)
|
||||||
|
// PALF_FIXED_SW_GET_HUNG = true;
|
||||||
|
// GetRunnable get_thread;
|
||||||
|
// get_thread.cnt_ = 1;
|
||||||
|
// get_thread.start_ = 10;
|
||||||
|
// get_thread.sw_ = &sw3;
|
||||||
|
// get_thread.run();
|
||||||
|
// sleep(1);
|
||||||
|
|
||||||
|
// // 3. slide to [128, 256)
|
||||||
|
// LogTaskDummyCallBack cb;
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, sw3.slide(10 * 1000 * 1000, &cb));
|
||||||
|
// EXPECT_EQ(128, sw3.get_begin_sn());
|
||||||
|
// EXPECT_EQ(256, sw3.get_end_sn());
|
||||||
|
|
||||||
|
// // 4. slide to [256, 384)
|
||||||
|
// for (int64_t i = size; i < 2 * size; ++i) {
|
||||||
|
// LogDummyData *val(NULL);
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, sw3.get(i, val));
|
||||||
|
// EXPECT_NE((LogDummyData*)NULL, val);
|
||||||
|
// val->log_id = i;
|
||||||
|
// val->can_remove = true;
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, sw3.revert(i));
|
||||||
|
// }
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, sw3.slide(10 * 1000 * 1000, &cb));
|
||||||
|
// EXPECT_EQ(2 * size, sw3.get_begin_sn());
|
||||||
|
// EXPECT_EQ(3 * size, sw3.get_end_sn());
|
||||||
|
|
||||||
|
// // 5. resume the thread and join
|
||||||
|
// PALF_FIXED_SW_GET_HUNG = false;
|
||||||
|
// sleep(1);
|
||||||
|
// // 6. continue to slide
|
||||||
|
// for (int64_t j = 2; j < 100; j++) {
|
||||||
|
// for (int64_t i = j * size; i < (j+1) * size; ++i) {
|
||||||
|
// LogDummyData *val(NULL);
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, sw3.get(i, val));
|
||||||
|
// EXPECT_NE((LogDummyData*)NULL, val);
|
||||||
|
// val->log_id = i;
|
||||||
|
// val->can_remove = true;
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, sw3.revert(i));
|
||||||
|
// }
|
||||||
|
// EXPECT_EQ(common::OB_SUCCESS, sw3.slide(10 * 1000 * 1000, &cb));
|
||||||
|
// EXPECT_EQ((j+1) * size, sw3.get_begin_sn());
|
||||||
|
// EXPECT_EQ((j+2) * size, sw3.get_end_sn());
|
||||||
|
// }
|
||||||
|
// get_thread.join();
|
||||||
|
// sw3.destroy();
|
||||||
|
// }
|
||||||
|
|
||||||
} // end namespace unittest
|
} // end namespace unittest
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user