320 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			320 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#include <gtest/gtest.h>
 | 
						|
#include "common/ob_partition_key.h"    // ObPartitionKey
 | 
						|
#include "lib/queue/ob_ext_ms_queue.h"  // ObExtMsQueue
 | 
						|
 | 
						|
#define DEFAULT_LOG_LEVEL "INFO"
 | 
						|
#define DEFAULT_LOG_FILE "test_ext_ms_queue.log"
 | 
						|
 | 
						|
using namespace oceanbase::common;
 | 
						|
 | 
						|
class ExtMsQueueTest : public ::testing::Test {
 | 
						|
public:
 | 
						|
  static const int64_t KEY_COUNT = 10;
 | 
						|
  static const int64_t MAX_CACHED_MS_QUEUE_ITEM_COUNT = KEY_COUNT;
 | 
						|
  static const int64_t MSQ_QUEUE_COUNT = 4;
 | 
						|
  static const int64_t MSQ_QUEUE_LEN = 1024;
 | 
						|
  static const int64_t PRODUCER_NUM = 4;
 | 
						|
  static const int64_t CONSUMER_NUM = 4;
 | 
						|
  static const int64_t RUN_TIME_SEC = 5;
 | 
						|
  static const int64_t OP_TIMEOUT = 100 * 1000;
 | 
						|
  static const int64_t WAIT_TIME = 100;
 | 
						|
 | 
						|
public:
 | 
						|
  typedef ObPartitionKey KeyType;
 | 
						|
  typedef ObLink TaskType;
 | 
						|
 | 
						|
  struct KeyValue {
 | 
						|
    KeyType key_;
 | 
						|
    int64_t seq_;
 | 
						|
 | 
						|
    KeyValue() : key_(), seq_(0)
 | 
						|
    {}
 | 
						|
    ~KeyValue()
 | 
						|
    {}
 | 
						|
 | 
						|
    void reset()
 | 
						|
    {
 | 
						|
      key_.reset();
 | 
						|
      seq_ = 0;
 | 
						|
    }
 | 
						|
  };
 | 
						|
 | 
						|
  static void* produce_func(void* args);
 | 
						|
  static void* consume_func(void* args);
 | 
						|
 | 
						|
  int start_consumers_();
 | 
						|
  int start_producers_();
 | 
						|
  void wait_producers_();
 | 
						|
  void wait_consumers_();
 | 
						|
  int terminate_all_ms_queue_();
 | 
						|
 | 
						|
public:
 | 
						|
  ExtMsQueueTest();
 | 
						|
  virtual ~ExtMsQueueTest();
 | 
						|
  virtual void SetUp();
 | 
						|
  virtual void TearDown();
 | 
						|
 | 
						|
public:
 | 
						|
  int64_t end_time_;
 | 
						|
  KeyValue kv_[KEY_COUNT];
 | 
						|
  TaskType task_[MSQ_QUEUE_COUNT];  // every Queue in MsQueue corresponds to a Task
 | 
						|
  pthread_t producers_[PRODUCER_NUM];
 | 
						|
  pthread_t consumers_[CONSUMER_NUM];
 | 
						|
  int64_t consumer_thread_index_counter_;
 | 
						|
  int64_t producer_thread_index_counter_;
 | 
						|
  ObExtMsQueue<KeyType> ext_ms_queue_;
 | 
						|
 | 
						|
private:
 | 
						|
  DISALLOW_COPY_AND_ASSIGN(ExtMsQueueTest);
 | 
						|
};
 | 
						|
 | 
						|
ExtMsQueueTest::ExtMsQueueTest()
 | 
						|
{}
 | 
						|
 | 
						|
ExtMsQueueTest::~ExtMsQueueTest()
 | 
						|
{}
 | 
						|
 | 
						|
void ExtMsQueueTest::SetUp()
 | 
						|
{
 | 
						|
  char* run_time_sec_str = getenv("run_time_sec");
 | 
						|
  int64_t run_time_sec = NULL == run_time_sec_str ? 0 : strtoll(run_time_sec_str, NULL, 10);
 | 
						|
  run_time_sec = run_time_sec <= 0 ? RUN_TIME_SEC : run_time_sec;
 | 
						|
 | 
						|
  _LIB_LOG(INFO, "run_time_sec = %ld sec", run_time_sec);
 | 
						|
 | 
						|
  (void)memset(producers_, 0, sizeof(producers_));
 | 
						|
  (void)memset(consumers_, 0, sizeof(consumers_));
 | 
						|
 | 
						|
  srandom((unsigned int)ObTimeUtility::current_time());
 | 
						|
 | 
						|
  end_time_ = ObTimeUtility::current_time() + run_time_sec * 1000 * 1000;
 | 
						|
 | 
						|
  consumer_thread_index_counter_ = 0;
 | 
						|
  producer_thread_index_counter_ = 0;
 | 
						|
}
 | 
						|
 | 
						|
void ExtMsQueueTest::TearDown()
 | 
						|
{
 | 
						|
  (void)memset(producers_, 0, sizeof(producers_));
 | 
						|
  (void)memset(consumers_, 0, sizeof(consumers_));
 | 
						|
}
 | 
						|
 | 
						|
void* ExtMsQueueTest::produce_func(void* args)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  ExtMsQueueTest* test = (ExtMsQueueTest*)args;
 | 
						|
 | 
						|
  if (NULL != test) {
 | 
						|
    int64_t end_time = test->end_time_;
 | 
						|
    int64_t thread_index = ATOMIC_FAA(&test->producer_thread_index_counter_, 1);
 | 
						|
    ObExtMsQueue<KeyType>& queue = test->ext_ms_queue_;
 | 
						|
    LIB_LOG(INFO, "producer thread started", K(thread_index));
 | 
						|
 | 
						|
    while (OB_SUCC(ret)) {
 | 
						|
      int key_index = (int)(random() % KEY_COUNT);
 | 
						|
      // every transaction push atmost one task to every Queue in MsQueue because of share Task global
 | 
						|
      int64_t stmt_count = (random() % MSQ_QUEUE_COUNT) + 1;
 | 
						|
      int64_t seq = ATOMIC_FAA(&(test->kv_[key_index].seq_), 1);
 | 
						|
      KeyType& key = test->kv_[key_index].key_;
 | 
						|
 | 
						|
      for (int64_t index = 0; OB_SUCC(ret) && index < stmt_count; index++) {
 | 
						|
        TaskType* task = test->task_ + index;  // push Task
 | 
						|
 | 
						|
        while (true) {
 | 
						|
          ret = queue.push(key, task, seq, index, OP_TIMEOUT);
 | 
						|
 | 
						|
          if (OB_TIMEOUT != ret) {
 | 
						|
            if (OB_FAIL(ret)) {
 | 
						|
              LIB_LOG(ERROR, "producer push queue fail", K(ret), K(key), K(seq), K(index));
 | 
						|
            }
 | 
						|
            break;
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
      }
 | 
						|
 | 
						|
      if (OB_SUCC(ret)) {
 | 
						|
        if (OB_FAIL(queue.end_batch(key, seq, stmt_count))) {
 | 
						|
          LIB_LOG(ERROR, "producer end_batch fail", K(ret), K(key), K(seq));
 | 
						|
        }
 | 
						|
 | 
						|
        EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
      }
 | 
						|
 | 
						|
      this_routine::usleep(WAIT_TIME);
 | 
						|
 | 
						|
      if (ObTimeUtility::current_time() >= end_time) {
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    LIB_LOG(INFO, "producer thread stoped", K(thread_index));
 | 
						|
  }
 | 
						|
 | 
						|
  return NULL;
 | 
						|
}
 | 
						|
 | 
						|
void* ExtMsQueueTest::consume_func(void* args)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  ExtMsQueueTest* test = (ExtMsQueueTest*)args;
 | 
						|
 | 
						|
  if (NULL != test) {
 | 
						|
    int64_t thread_index = ATOMIC_FAA(&test->consumer_thread_index_counter_, 1);
 | 
						|
    ObExtMsQueue<KeyType>& queue = test->ext_ms_queue_;
 | 
						|
    LIB_LOG(INFO, "consumer thread started", K(thread_index), "ms_queue_count", queue.get_ms_queue_count());
 | 
						|
 | 
						|
    TaskType* task = NULL;
 | 
						|
    void* ctx = NULL;
 | 
						|
    while (OB_SUCCESS == ret && queue.get_ms_queue_count() > 0) {
 | 
						|
      while (true) {
 | 
						|
        ret = queue.get(task, ctx, OP_TIMEOUT);
 | 
						|
        if (OB_TIMEOUT != ret) {
 | 
						|
          if (OB_FAIL(ret)) {
 | 
						|
            LIB_LOG(ERROR, "get task from queue fail", K(ret));
 | 
						|
          }
 | 
						|
          break;
 | 
						|
        }
 | 
						|
 | 
						|
        if (queue.get_ms_queue_count() <= 0) {
 | 
						|
          ret = OB_SUCCESS;
 | 
						|
          break;
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
    }
 | 
						|
 | 
						|
    LIB_LOG(INFO, "consumer thread stoped", K(thread_index), "ms_queue_count", queue.get_ms_queue_count());
 | 
						|
  }
 | 
						|
  return NULL;
 | 
						|
}
 | 
						|
 | 
						|
void ExtMsQueueTest::wait_producers_()
 | 
						|
{
 | 
						|
  LIB_LOG(INFO, "wait producers");
 | 
						|
 | 
						|
  for (int64_t index = 0; index < PRODUCER_NUM; index++) {
 | 
						|
    if (0 != producers_[index]) {
 | 
						|
      pthread_join(producers_[index], NULL);
 | 
						|
    }
 | 
						|
 | 
						|
    producers_[index] = 0;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void ExtMsQueueTest::wait_consumers_()
 | 
						|
{
 | 
						|
  LIB_LOG(INFO, "wait consumers");
 | 
						|
 | 
						|
  for (int64_t index = 0; index < CONSUMER_NUM; index++) {
 | 
						|
    if (0 != consumers_[index]) {
 | 
						|
      pthread_join(consumers_[index], NULL);
 | 
						|
    }
 | 
						|
 | 
						|
    consumers_[index] = 0;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
int ExtMsQueueTest::start_consumers_()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  for (int64_t index = 0; index < CONSUMER_NUM; index++) {
 | 
						|
    pthread_create(consumers_ + index, NULL, consume_func, this);
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int ExtMsQueueTest::start_producers_()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  for (int64_t index = 0; index < PRODUCER_NUM; index++) {
 | 
						|
    pthread_create(producers_ + index, NULL, produce_func, this);
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int ExtMsQueueTest::terminate_all_ms_queue_()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  LIB_LOG(INFO, "terminate_all_ms_queue");
 | 
						|
  for (int64_t index = 0; OB_SUCC(ret) && index < KEY_COUNT; index++) {
 | 
						|
    KeyType& key = kv_[index].key_;
 | 
						|
    int64_t end_seq = kv_[index].seq_;
 | 
						|
    LIB_LOG(DEBUG, "terminate_ms_queue", K(key), K(end_seq));
 | 
						|
 | 
						|
    while (true) {
 | 
						|
      ret = ext_ms_queue_.terminate_ms_queue(key, end_seq, OP_TIMEOUT);
 | 
						|
 | 
						|
      if (OB_TIMEOUT != ret) {
 | 
						|
        if (OB_FAIL(ret)) {
 | 
						|
          LIB_LOG(ERROR, "terminate_ms_queue fail", K(ret), K(key), K(end_seq));
 | 
						|
        }
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(ExtMsQueueTest, basic_test)
 | 
						|
{
 | 
						|
  // init ExtMsQueue
 | 
						|
  EXPECT_EQ(OB_SUCCESS, ext_ms_queue_.init(MAX_CACHED_MS_QUEUE_ITEM_COUNT, MSQ_QUEUE_COUNT, MSQ_QUEUE_LEN));
 | 
						|
 | 
						|
  // add MSQueue
 | 
						|
  for (int64_t index = 0; index < KEY_COUNT; index++) {
 | 
						|
    kv_[index].reset();
 | 
						|
    ASSERT_EQ(OB_SUCCESS, kv_[index].key_.init(index, 0, 1));
 | 
						|
 | 
						|
    EXPECT_EQ(OB_SUCCESS, ext_ms_queue_.add_ms_queue(kv_[index].key_));
 | 
						|
  }
 | 
						|
 | 
						|
  // add duplicated key
 | 
						|
  EXPECT_EQ(OB_ENTRY_EXIST, ext_ms_queue_.add_ms_queue(kv_[0].key_));
 | 
						|
 | 
						|
  // start consumers
 | 
						|
  EXPECT_EQ(OB_SUCCESS, start_consumers_());
 | 
						|
 | 
						|
  // start produces
 | 
						|
  EXPECT_EQ(OB_SUCCESS, start_producers_());
 | 
						|
 | 
						|
  // wait all producers exit
 | 
						|
  wait_producers_();
 | 
						|
 | 
						|
  // stop all MsQueue
 | 
						|
  EXPECT_EQ(OB_SUCCESS, terminate_all_ms_queue_());
 | 
						|
 | 
						|
  // wait all consumers exit
 | 
						|
  wait_consumers_();
 | 
						|
 | 
						|
  ext_ms_queue_.destroy();
 | 
						|
}
 | 
						|
 | 
						|
int main(int argc, char** argv)
 | 
						|
{
 | 
						|
  const char* log_level = getenv("log_level");
 | 
						|
  const char* log_file = getenv("log_file");
 | 
						|
  log_level = (NULL == log_level ? DEFAULT_LOG_LEVEL : log_level);
 | 
						|
  log_file = (NULL == log_file) ? DEFAULT_LOG_FILE : log_file;
 | 
						|
 | 
						|
  OB_LOGGER.set_log_level(log_level);
 | 
						|
  OB_LOGGER.set_file_name(log_file, true);
 | 
						|
  ::testing::InitGoogleTest(&argc, argv);
 | 
						|
  return RUN_ALL_TESTS();
 | 
						|
}
 |