diff --git a/deps/oblib/src/lib/queue/ob_fixed_queue.h b/deps/oblib/src/lib/queue/ob_fixed_queue.h index 9cacb4031e..5db781b9b2 100644 --- a/deps/oblib/src/lib/queue/ob_fixed_queue.h +++ b/deps/oblib/src/lib/queue/ob_fixed_queue.h @@ -251,6 +251,160 @@ int ObFixedQueue::head_unsafe(T *&ptr) } return ret; } +template +class ObOrderedFixedQueue +{ +public: + ObOrderedFixedQueue() : is_inited_(false), + max_num_(0), + array_(NULL), + allocator_(NULL), + consumer_(0), + producer_(0) + {} + ~ObOrderedFixedQueue() + { + destroy(); + } +public: + int init(const int64_t max_num, + ObIAllocator *allocator = global_default_allocator, + const lib::ObLabel &label = ObModIds::OB_FIXED_QUEUE); + int init(const int64_t max_num, ObIAllocator *allocator, const lib::ObMemAttr &attr); + void destroy(); +public: + int push(T *ptr); + int pop(T *&ptr); + inline int64_t get_total() const + { + return producer_ - consumer_; + } + bool is_inited() const {return is_inited_;}; + int64_t capacity()const { return max_num_; } +private: + struct ArrayItem + { + int64_t idx_; + T *data_; + }; +private: + bool is_inited_; + int64_t max_num_; + ArrayItem *array_; + ObIAllocator *allocator_; + uint64_t consumer_ CACHE_ALIGNED; + uint64_t producer_ CACHE_ALIGNED; +private: + DISALLOW_COPY_AND_ASSIGN(ObOrderedFixedQueue); +}; +template +int ObOrderedFixedQueue::init(const int64_t max_num, ObIAllocator *allocator, const lib::ObLabel &label) +{ + lib::ObMemAttr attr; + attr.label_ = label; + return init(max_num, allocator, attr); +} +template +int ObOrderedFixedQueue::init(const int64_t max_num, ObIAllocator *allocator, const lib::ObMemAttr &attr) +{ + int ret = common::OB_SUCCESS; + if (NULL == allocator || 0 >= max_num) { + ret = common::OB_INVALID_ARGUMENT; + } else if (is_inited_) { + ret = common::OB_INIT_TWICE; + } else if (NULL == (array_ = static_cast(allocator->alloc( + sizeof(ArrayItem) * max_num, attr)))) { + ret = common::OB_ALLOCATE_MEMORY_FAILED; + } else { + for (int i = 0; i < max_num; i++) { + array_[i].data_ = NULL; + array_[i].idx_ = i; + } + max_num_ = max_num; + allocator_ = allocator; + consumer_ = 0; + producer_ = 0; + is_inited_ = true; + } + return ret; +} +template +void ObOrderedFixedQueue::destroy() +{ + if (is_inited_) { + if (NULL != allocator_) { + allocator_->free(array_); + array_ = NULL; + } + array_ = NULL; + max_num_ = 0; + consumer_ = 0; + producer_ = 0; + allocator_ = NULL; + is_inited_ = false; + } +} +template +int ObOrderedFixedQueue::push(T *ptr) +{ + int ret = common::OB_SUCCESS; + if (IS_NOT_INIT) { + ret = common::OB_NOT_INIT; + } else if (NULL == ptr) { + ret = common::OB_INVALID_ARGUMENT; + } else { + uint64_t push = ATOMIC_LOAD(&producer_); + uint64_t push_limit = ATOMIC_LOAD(&consumer_) + max_num_; + uint64_t old_push = 0; + while (((old_push = push) < push_limit || push < (push_limit = ATOMIC_LOAD(&consumer_) + max_num_)) + && old_push != (push = ATOMIC_CAS(&producer_, old_push, old_push + 1))) { + PAUSE(); + } + if (push < push_limit) { + while (push != ATOMIC_LOAD(&array_[push % max_num_].idx_)) { + PAUSE(); // ensure that only one push thread holding the array slot + } + T **pdata = &array_[push % max_num_].data_; + while (NULL != ATOMIC_CAS(pdata, NULL, ptr)) { + PAUSE(); + } + } else { + ret = common::OB_SIZE_OVERFLOW; + } + } + return ret; +} + +template +int ObOrderedFixedQueue::pop(T *&ptr) +{ + int ret = common::OB_SUCCESS; + if (IS_NOT_INIT) { + ret = common::OB_NOT_INIT; + } else { + uint64_t pop = ATOMIC_LOAD(&consumer_); + uint64_t pop_limit = ATOMIC_LOAD(&producer_); + uint64_t old_pop = 0; + while (((old_pop = pop) < pop_limit || pop < (pop_limit = ATOMIC_LOAD(&producer_))) + && old_pop != (pop = ATOMIC_CAS(&consumer_, old_pop, old_pop + 1))) { + PAUSE(); + } + if (pop < pop_limit) { + while (pop != ATOMIC_LOAD(&array_[pop % max_num_].idx_)) { + PAUSE(); // ensure that only one pop thread holding the array slot + } + T **pdata = &array_[(pop % max_num_)].data_; + while (NULL == (ptr = static_cast(ATOMIC_TAS(pdata, NULL)))) { + PAUSE(); + } + ATOMIC_AAF(&array_[(pop % max_num_)].idx_, max_num_); + } else { + ret = common::OB_ENTRY_NOT_EXIST; + } + } + return ret; +} + } // namespace common } // namespace oceanbase #endif //OCEANBASE_COMMON_FIXED_QUEUE_ diff --git a/deps/oblib/src/lib/queue/ob_lighty_queue.h b/deps/oblib/src/lib/queue/ob_lighty_queue.h index c4fcce3730..203578e64f 100644 --- a/deps/oblib/src/lib/queue/ob_lighty_queue.h +++ b/deps/oblib/src/lib/queue/ob_lighty_queue.h @@ -102,7 +102,7 @@ public: int pop(void *&data, const int64_t timeout = 0); int multi_pop(void **data, const int64_t data_count, int64_t &avail_count, const int64_t timeout = 0); private: - typedef ObFixedQueue Queue; + typedef ObOrderedFixedQueue Queue; Queue queue_; Cond cond_; private: diff --git a/deps/oblib/unittest/lib/CMakeLists.txt b/deps/oblib/unittest/lib/CMakeLists.txt index 74f7b93bca..0cc0862bda 100644 --- a/deps/oblib/unittest/lib/CMakeLists.txt +++ b/deps/oblib/unittest/lib/CMakeLists.txt @@ -79,6 +79,7 @@ oblib_addtest(oblog/test_ob_log_performance.cpp) oblib_addtest(profile/test_ob_trace_id.cpp) oblib_addtest(profile/test_perf_event.cpp) oblib_addtest(queue/test_lighty_queue.cpp) +oblib_addtest(queue/test_fixed_queue.cpp) oblib_addtest(queue/test_link_queue.cpp) oblib_addtest(queue/test_priority_queue.cpp) oblib_addtest(random/test_mysql_random.cpp) diff --git a/deps/oblib/unittest/lib/queue/test_fixed_queue.cpp b/deps/oblib/unittest/lib/queue/test_fixed_queue.cpp new file mode 100644 index 0000000000..992e00806f --- /dev/null +++ b/deps/oblib/unittest/lib/queue/test_fixed_queue.cpp @@ -0,0 +1,144 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "lib/queue/ob_fixed_queue.h" +#include +#include "lib/atomic/ob_atomic.h" +#include "lib/oblog/ob_log.h" +#include "lib/coro/testing.h" + +using namespace oceanbase::common; +using namespace std; + +int run() +{ + ObFixedQueue queue; + queue.init(64); + + cotesting::FlexPool pool([&queue] { + for (int i = 0; i < 100; i++) { + void *task = nullptr; + queue.pop(task); + // cout << (int64_t)task << endl; + } + }, 1); + pool.start(false); + ::usleep(1000000); + cotesting::FlexPool([&queue] (){ + for (auto i = 0; i < 10; ++i) { + queue.push((void*)1); + } + }, 10).start(); + pool.wait(); + return 0; +} +#define PUSH_CNT 800 +#define PUSH_THREAD_CNT 2 +#define POP_THREAD_CNT 2 +#define ROUND_COUNT 5000 +int run_order_test() { + int failed_cnt = 0; + ObOrderedFixedQueue queue; + // ObFixedQueue queue; + queue.init(4); + srand(time(0)); + int64_t pop_cnt = 0; + int64_t pop_index = 0; + cotesting::FlexPool pool([&queue, &failed_cnt, &pop_cnt, &pop_index] { + int ret = OB_SUCCESS; + int64_t pop_thread_index = ATOMIC_FAA(&pop_index, 1); + int64_t *output = (int64_t *)calloc(PUSH_THREAD_CNT * PUSH_CNT, sizeof(int64_t)); + int64_t *output_index = (int64_t *)calloc(PUSH_THREAD_CNT, sizeof(int64_t)); + int64_t start_pop_time = get_cur_ts(); + while (pop_cnt < PUSH_CNT * PUSH_THREAD_CNT) { + void *task = nullptr; + if (OB_FAIL(queue.pop(task))) { + // printf("pop failed, ret = %d\n", ret); + } else { + ATOMIC_FAA(&pop_cnt, 1); + int64_t value = (int64_t)task; + int thread_index = value/1000; + output[thread_index * PUSH_CNT + (output_index[thread_index]++)] = value; + if (pop_thread_index == 0) { + // fprintf(stdout, "%ld ", (int64_t)task); + } + } + int64_t cur_time = get_cur_ts(); + if (cur_time - start_pop_time > 10 * 1000 * 1000) { + fprintf(stderr, "pop cost too much time, cur_time=%ld, start_pop_time=%ld\n", cur_time, start_pop_time); + ATOMIC_FAA(&failed_cnt, -10000); + break; + } + } + if (pop_thread_index == 0) { + for (int i = 0; ATOMIC_LOAD(&failed_cnt) == 0 && i < PUSH_THREAD_CNT; i++) { + for (int j = 0; j < PUSH_CNT; j++) { + int64_t value = output[i * PUSH_CNT + j]; + if (value == 0) { + break; + } + fprintf(stdout, "%ld ", value); + if (j > 0 && value < output[i * PUSH_CNT + j - 1]) { + fprintf(stdout, "%ld_ERROR ", value); + ATOMIC_FAA(&failed_cnt, 1); + } + } + fprintf(stdout, "\n"); + } + } + free(output); + free(output_index); + }, POP_THREAD_CNT); + pool.start(false); + int64_t index = 0; + cotesting::FlexPool([&queue, &index] () { + int64_t thread_index = ATOMIC_FAA(&index, 1); + int64_t start_push_time = get_cur_ts(); + for (auto i = 0; i < PUSH_CNT; ++i) { + int64_t value = thread_index * 1000 + i + 1; + int ret = OB_SUCCESS; + do { + if (OB_FAIL(queue.push((void*)(value)))) { + // printf("[%ld] push failed, ret = %d, count = %ld\n", thread_index, ret, queue.count_); + usleep(rand() % 60); + } + } while (ret != OB_SUCCESS); + int64_t cur_time = get_cur_ts(); + if (cur_time - start_push_time > 10 * 1000 * 1000) { + fprintf(stderr, "push cost too much time, cur_time=%ld, start_push_time=%ld\n", cur_time, start_push_time); + break; + } + } + }, PUSH_THREAD_CNT).start(); + pool.wait(); + return failed_cnt; +} + +TEST(TestObLightyQueue, Main) +{ + run(); +} +TEST(TestObLightyQueue, OrderTest) +{ + for (int i = 0; i < ROUND_COUNT; i++) { + printf("ROUND %d:\n", i); + int ret = run_order_test(); + ASSERT_EQ(ret, 0); + } +} + +int main(int argc, char *argv[]) +{ + OB_LOGGER.set_log_level("INFO"); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}