223 lines
6.3 KiB
C++
223 lines
6.3 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include <gtest/gtest.h>
|
|
#include "test_io_performance.h"
|
|
|
|
using namespace oceanbase::common;
|
|
using namespace oceanbase::lib;
|
|
|
|
namespace oceanbase {
|
|
namespace unittest {
|
|
|
|
class TestIOPerformance : public ::testing::Test {
|
|
public:
|
|
virtual void SetUp()
|
|
{
|
|
COMMON_LOG(INFO, "start set up");
|
|
ObIOManager& io_manager = ObIOManager::get_instance();
|
|
// io_manager.destroy();
|
|
io_manager.init();
|
|
}
|
|
|
|
virtual void TearDown()
|
|
{
|
|
ObIOManager& io_manager = ObIOManager::get_instance();
|
|
io_manager.destroy();
|
|
}
|
|
};
|
|
|
|
TEST_F(TestIOPerformance, single)
|
|
{
|
|
IOStress stress;
|
|
ASSERT_EQ(OB_SUCCESS, stress.init("stress.config"));
|
|
ASSERT_EQ(OB_SUCCESS, stress.run());
|
|
}
|
|
|
|
class MPSC_ThreadPool : public lib::ThreadPool {
|
|
public:
|
|
MPSC_ThreadPool() : inited_(false), is_lighty_(true)
|
|
{}
|
|
virtual ~MPSC_ThreadPool()
|
|
{}
|
|
int init(const int64_t queue_length);
|
|
virtual void run1();
|
|
static const int64_t PRODUCER_COUNT = 16;
|
|
static const int64_t CONSUMER_COUNT = 1;
|
|
static const int64_t PRODUCE_INTERVAL_US = 100;
|
|
|
|
private:
|
|
int produce();
|
|
int consume();
|
|
|
|
private:
|
|
bool inited_;
|
|
ObLightyQueue lighty_queue_;
|
|
ObFixedQueue<int64_t> fixed_queue_;
|
|
ObThreadCond cond_;
|
|
bool is_lighty_;
|
|
};
|
|
|
|
int MPSC_ThreadPool::init(const int64_t queue_length)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (inited_) {
|
|
COMMON_LOG(WARN, "init twice");
|
|
} else if (OB_FAIL(lighty_queue_.init(queue_length))) {
|
|
COMMON_LOG(WARN, "fail to init lighty queue", K(ret));
|
|
} else if (OB_FAIL(fixed_queue_.init(queue_length))) {
|
|
COMMON_LOG(WARN, "fail to init lighty queue", K(ret));
|
|
} else if (OB_FAIL(cond_.init(ObWaitEventIds::IO_QUEUE_LOCK_WAIT))) {
|
|
COMMON_LOG(WARN, "fail to init lighty queue", K(ret));
|
|
} else if (OB_FAIL(set_thread_count(PRODUCER_COUNT + CONSUMER_COUNT))) {
|
|
COMMON_LOG(WARN, "fail to set thread count", K(ret));
|
|
} else {
|
|
inited_ = true;
|
|
if (OB_FAIL(start())) {
|
|
inited_ = false;
|
|
COMMON_LOG(WARN, "fail to start thread pool", K(ret));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int MPSC_ThreadPool::produce()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
static int64_t val = 0;
|
|
ATOMIC_INC(&val);
|
|
|
|
if (is_lighty_) { // push lighty queue
|
|
if (OB_FAIL(lighty_queue_.push(reinterpret_cast<void*>(val)))) {
|
|
COMMON_LOG(WARN, "fail to push to lighty queue", K(ret), K(val));
|
|
}
|
|
} else { // push fixed queue
|
|
ObThreadCondGuard cond_guard(cond_);
|
|
if (OB_FAIL(cond_guard.get_ret())) {
|
|
COMMON_LOG(ERROR, "Fail to guard queue condition", K(ret));
|
|
} else if (OB_FAIL(fixed_queue_.push(reinterpret_cast<int64_t*>(val)))) {
|
|
COMMON_LOG(WARN, "Fail to push to fixed queue", K(ret), K(val));
|
|
} else if (OB_FAIL(cond_.signal())) {
|
|
COMMON_LOG(ERROR, "Fail to signal queue condition", K(ret));
|
|
}
|
|
}
|
|
|
|
if (REACH_TIME_INTERVAL(1000 * 1000L)) {
|
|
static int64_t last_val = 0;
|
|
COMMON_LOG(INFO, "wenqu debug: produced", K(val), "qps", val - last_val);
|
|
last_val = val;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int MPSC_ThreadPool::consume()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
static const int64_t timeout_us = 1000L;
|
|
int64_t val = 0;
|
|
|
|
if (is_lighty_) { // consume lighty queue
|
|
if (OB_FAIL(lighty_queue_.pop(reinterpret_cast<void*&>(val), timeout_us))) {
|
|
if (OB_ENTRY_NOT_EXIST != ret) {
|
|
COMMON_LOG(WARN, "fail to pop from lighty queue", K(ret), K(val));
|
|
} else {
|
|
ret = OB_SUCCESS;
|
|
}
|
|
} else if (val <= 0) {
|
|
COMMON_LOG(WARN, "invalid value", K(val));
|
|
}
|
|
} else { // consume fixed queue
|
|
if (fixed_queue_.get_curr_total() <= 0) {
|
|
ObThreadCondGuard cond_guard(cond_);
|
|
if (OB_FAIL(cond_guard.get_ret())) {
|
|
COMMON_LOG(ERROR, "Fail to guard queue condition", K(ret));
|
|
} else if (OB_FAIL(cond_.wait_us(timeout_us))) {
|
|
if (OB_TIMEOUT == ret) {
|
|
ret = OB_SUCCESS;
|
|
} else {
|
|
COMMON_LOG(ERROR, "fail to wait queue condition", K(ret));
|
|
}
|
|
}
|
|
}
|
|
if (OB_FAIL(ret)) {
|
|
} else if (OB_FAIL(fixed_queue_.pop(reinterpret_cast<int64_t*&>(val)))) {
|
|
if (OB_ENTRY_NOT_EXIST != ret) {
|
|
COMMON_LOG(WARN, "fail to pop from fixed queue", K(ret), K(val));
|
|
} else {
|
|
ret = OB_SUCCESS;
|
|
}
|
|
} else if (val <= 0) {
|
|
COMMON_LOG(WARN, "invalid value", K(val));
|
|
}
|
|
}
|
|
|
|
if (REACH_TIME_INTERVAL(1000 * 1000L)) {
|
|
static int64_t last_val = 0;
|
|
COMMON_LOG(INFO, "wenqu debug: consumed", K(val), "qps", val - last_val);
|
|
last_val = val;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void MPSC_ThreadPool::run1()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const int64_t thread_id = get_thread_idx();
|
|
if (OB_UNLIKELY(!inited_)) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "The thread pool has not been inited, ", K(ret));
|
|
} else {
|
|
if (thread_id < PRODUCER_COUNT) { // submit thread pool
|
|
while (!has_set_stop()) {
|
|
produce();
|
|
if (PRODUCE_INTERVAL_US > 0) {
|
|
usleep(PRODUCE_INTERVAL_US);
|
|
}
|
|
}
|
|
COMMON_LOG(INFO, "produce thread stopped", K(thread_id));
|
|
} else if (thread_id < PRODUCER_COUNT + CONSUMER_COUNT) { // getevent thread pool
|
|
while (!has_set_stop()) {
|
|
consume();
|
|
}
|
|
COMMON_LOG(INFO, "consumer thread stopped", K(thread_id));
|
|
} else {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
|
|
COMMON_LOG(ERROR, "unexpected thread", K(ret), K(thread_id));
|
|
}
|
|
sleep(1);
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST(TestObLightyQueue, DISABLED_mpsc)
|
|
{
|
|
MPSC_ThreadPool tp;
|
|
ASSERT_EQ(OB_SUCCESS, tp.init(10 * 10000L));
|
|
sleep(1000);
|
|
}
|
|
|
|
} // end namespace unittest
|
|
} // end namespace oceanbase
|
|
|
|
int main(int argc, char** argv)
|
|
{
|
|
OB_LOGGER.set_max_file_size(256 * 1024 * 1024);
|
|
system("rm -f test_io_performance.log*");
|
|
OB_LOGGER.set_file_name("test_io_performance.log");
|
|
OB_LOGGER.set_log_level("INFO");
|
|
set_memory_limit(40L * 1024L * 1024L * 1024L); // 40GB
|
|
ObIOManager::get_instance().init();
|
|
testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|