296 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			296 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#include "gtest/gtest.h"
 | 
						|
 | 
						|
#include "share/ob_define.h"
 | 
						|
#include "storage/ob_storage_log_type.h"
 | 
						|
#include "storage/transaction/ob_trans_log.h"
 | 
						|
 | 
						|
#include "obcdc/src/ob_log_fetcher_part_stream.h"
 | 
						|
#include "test_log_fetcher_common_utils.h"
 | 
						|
 | 
						|
using namespace oceanbase;
 | 
						|
using namespace common;
 | 
						|
using namespace liboblog;
 | 
						|
using namespace fetcher;
 | 
						|
using namespace transaction;
 | 
						|
using namespace storage;
 | 
						|
using namespace clog;
 | 
						|
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
namespace unittest
 | 
						|
{
 | 
						|
 | 
						|
/*
 | 
						|
 * Basic Function Tests.
 | 
						|
 */
 | 
						|
/*
 | 
						|
 * Half commit, half abort.
 | 
						|
 * Fixed redo log cnt.
 | 
						|
 */
 | 
						|
TEST(PartitionStream, BasicTest1)
 | 
						|
{
 | 
						|
  int err = OB_SUCCESS;
 | 
						|
 | 
						|
  ObTransPrepareLog prepare_;
 | 
						|
 | 
						|
  // Commit half trans, whose has even idx.
 | 
						|
  const int64_t trans_cnt = 1000;
 | 
						|
  const int64_t commit_trans_cnt = trans_cnt / 2;
 | 
						|
  const int64_t redo_cnt = 5;
 | 
						|
 | 
						|
  // Pkey.
 | 
						|
  ObPartitionKey pkey(1000U, 1, 1);
 | 
						|
  // Log gen.
 | 
						|
  TransLogEntryGenerator1 log_gen(pkey);
 | 
						|
  // Task Pool.
 | 
						|
  ObConcurrentFIFOAllocator fifo_allocator;
 | 
						|
  ObLogTransTaskPool<PartTransTask> task_pool;
 | 
						|
  err = fifo_allocator.init(16 * _G_, 16 * _M_, OB_MALLOC_NORMAL_BLOCK_SIZE);
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  err = task_pool.init(&fifo_allocator, 10240, 1024, 4 * 1024 * 1024, true);
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  // Parser.
 | 
						|
  MockParser1 parser;
 | 
						|
  FetcherConfig cfg;
 | 
						|
 | 
						|
  // Init.
 | 
						|
  PartitionStream ps;
 | 
						|
  err = ps.init(pkey, &parser, &task_pool, &cfg);
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
  // Read logs.
 | 
						|
  ObLogIdArray missing;
 | 
						|
  for (int64_t idx = 0; idx < trans_cnt; ++idx) {
 | 
						|
    // Commit trans with even idx.
 | 
						|
    log_gen.next_trans(redo_cnt, (0 == idx % 2));
 | 
						|
    ObLogEntry log_entry;
 | 
						|
    while (OB_SUCCESS == log_gen.next_log_entry(log_entry)) {
 | 
						|
      err = ps.read(log_entry, missing);
 | 
						|
      EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    }
 | 
						|
    err = ps.flush();
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
 | 
						|
  // Check.
 | 
						|
  EXPECT_EQ(commit_trans_cnt, parser.get_trans_cnt());
 | 
						|
 | 
						|
  // Destroy.
 | 
						|
  err = ps.destroy();
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Half commit, half abort.
 | 
						|
 * Commit with Prepare-Commit trans log.
 | 
						|
 */
 | 
						|
TEST(PartitionStream, BasicTest2)
 | 
						|
{
 | 
						|
  int err = OB_SUCCESS;
 | 
						|
 | 
						|
  ObTransPrepareLog prepare_;
 | 
						|
 | 
						|
  // Commit half trans, whose has even idx.
 | 
						|
  const int64_t trans_cnt = 1000;
 | 
						|
  const int64_t commit_trans_cnt = trans_cnt / 2;
 | 
						|
  const int64_t redo_cnt = 5;
 | 
						|
 | 
						|
  // Pkey.
 | 
						|
  ObPartitionKey pkey(1000U, 1, 1);
 | 
						|
  // Log gen.
 | 
						|
  TransLogEntryGenerator1 log_gen(pkey);
 | 
						|
  // Task Pool.
 | 
						|
  ObConcurrentFIFOAllocator fifo_allocator;
 | 
						|
  ObLogTransTaskPool<PartTransTask> task_pool;
 | 
						|
  err = fifo_allocator.init(16 * _G_, 16 * _M_, OB_MALLOC_NORMAL_BLOCK_SIZE);
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  err = task_pool.init(&fifo_allocator, 10240, 1024, 4 * 1024 * 1024, true);
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  // Parser.
 | 
						|
  MockParser1 parser;
 | 
						|
  FetcherConfig cfg;
 | 
						|
 | 
						|
  // Init.
 | 
						|
  PartitionStream ps;
 | 
						|
  err = ps.init(pkey, &parser, &task_pool, &cfg);
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
  // Read logs.
 | 
						|
  ObLogIdArray missing;
 | 
						|
  for (int64_t idx = 0; idx < trans_cnt; ++idx) {
 | 
						|
    // Commit trans with even idx.
 | 
						|
    log_gen.next_trans(redo_cnt, (0 == idx % 2));
 | 
						|
    ObLogEntry log_entry;
 | 
						|
    while (OB_SUCCESS == log_gen.next_log_entry_2(log_entry)) {
 | 
						|
      err = ps.read(log_entry, missing);
 | 
						|
      EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    }
 | 
						|
    err = ps.flush();
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
 | 
						|
  // Check.
 | 
						|
  EXPECT_EQ(commit_trans_cnt, parser.get_trans_cnt());
 | 
						|
 | 
						|
  // Destroy.
 | 
						|
  err = ps.destroy();
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/*
 | 
						|
 * Test partition progress tracker.
 | 
						|
 */
 | 
						|
TEST(PartProgressTracker, BasicTest1)
 | 
						|
{
 | 
						|
  const int64_t progress_cnt = 4 * 10000;
 | 
						|
  PartProgressTracker tracker;
 | 
						|
 | 
						|
  int err = tracker.init(progress_cnt);
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
  ObArray<int64_t> indices;
 | 
						|
  const int64_t time = get_timestamp();
 | 
						|
 | 
						|
  // Acquire progresses and update their values.
 | 
						|
  for (int64_t idx = 0, cnt = progress_cnt; idx < cnt; ++idx) {
 | 
						|
    int64_t progress_idx = 0;
 | 
						|
    err = tracker.acquire_progress(progress_idx);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = indices.push_back(progress_idx);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = tracker.update_progress(progress_idx, time);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
 | 
						|
  // Get min progress test.
 | 
						|
  const int64_t test_cnt = 10000;
 | 
						|
  int64_t start = get_timestamp();
 | 
						|
  for (int64_t idx = 0, cnt = test_cnt; idx < cnt; ++idx) {
 | 
						|
    int64_t min = 0;
 | 
						|
    err = tracker.get_min_progress(min);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
  const int64_t avg = ((get_timestamp() - start)/ test_cnt);
 | 
						|
 | 
						|
  // Release.
 | 
						|
  while (0 != indices.count()) {
 | 
						|
    int64_t progress_idx = indices.at(indices.count() - 1);
 | 
						|
    indices.pop_back();
 | 
						|
    err = tracker.release_progress(progress_idx);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
 | 
						|
  err = tracker.destroy();
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
  // Print result.
 | 
						|
  fprintf(stderr, "partition progress tracker get min for %ld progresses costs %s\n",
 | 
						|
          progress_cnt, TVAL_TO_STR(avg));
 | 
						|
}
 | 
						|
 | 
						|
// Perf test.
 | 
						|
// This test requires at least 3 cores: 1 core tests reading, 2 cores update data.
 | 
						|
struct PerfTest1Updater : public Runnable
 | 
						|
{
 | 
						|
  virtual int routine()
 | 
						|
  {
 | 
						|
    while (ATOMIC_LOAD(&atomic_run_)) {
 | 
						|
      int64_t seed = get_timestamp();
 | 
						|
      for (int i = 0; i < 10000; ++i) {
 | 
						|
        progress_tracker_->update_progress(indices_->at((seed % (indices_->count()))), seed);
 | 
						|
        seed += 777;
 | 
						|
      }
 | 
						|
    }
 | 
						|
    return common::OB_SUCCESS;
 | 
						|
  }
 | 
						|
  bool atomic_run_;
 | 
						|
  PartProgressTracker *progress_tracker_;
 | 
						|
  ObArray<int64_t> *indices_;
 | 
						|
};
 | 
						|
TEST(PartProgressTracker, PerfTest1)
 | 
						|
{
 | 
						|
  const int64_t progress_cnt = 4 * 10000;
 | 
						|
  PartProgressTracker tracker;
 | 
						|
 | 
						|
  int err = tracker.init(progress_cnt);
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
  ObArray<int64_t> indices;
 | 
						|
  const int64_t time = get_timestamp();
 | 
						|
 | 
						|
  // Acquire progresses and update their values.
 | 
						|
  for (int64_t idx = 0, cnt = progress_cnt; idx < cnt; ++idx) {
 | 
						|
    int64_t progress_idx = 0;
 | 
						|
    err = tracker.acquire_progress(progress_idx);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = indices.push_back(progress_idx);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = tracker.update_progress(progress_idx, time);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
 | 
						|
  // Trigger updaters.
 | 
						|
  const int64_t updater_cnt = 2;
 | 
						|
  PerfTest1Updater updaters[updater_cnt];
 | 
						|
  for (int i = 0; i < updater_cnt; ++i) {
 | 
						|
    updaters[i].atomic_run_ = true;
 | 
						|
    updaters[i].progress_tracker_ = &tracker;
 | 
						|
    updaters[i].indices_ = &indices;
 | 
						|
    updaters[i].create();
 | 
						|
  }
 | 
						|
 | 
						|
  // Get min progress test.
 | 
						|
  const int64_t test_cnt = 10000;
 | 
						|
  int64_t start = get_timestamp();
 | 
						|
  for (int64_t idx = 0, cnt = test_cnt; idx < cnt; ++idx) {
 | 
						|
    int64_t min = 0;
 | 
						|
    err = tracker.get_min_progress(min);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
  const int64_t avg = ((get_timestamp() - start)/ test_cnt);
 | 
						|
 | 
						|
  // Stop updaters.
 | 
						|
  for (int i = 0; i < updater_cnt; ++i) {
 | 
						|
    ATOMIC_STORE(&(updaters[i].atomic_run_), false);
 | 
						|
    updaters[i].join();
 | 
						|
  }
 | 
						|
 | 
						|
  // Release.
 | 
						|
  while (0 != indices.count()) {
 | 
						|
    int64_t progress_idx = indices.at(indices.count() - 1);
 | 
						|
    indices.pop_back();
 | 
						|
    err = tracker.release_progress(progress_idx);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
 | 
						|
  err = tracker.destroy();
 | 
						|
  EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
  // Print result.
 | 
						|
  fprintf(stderr, "partition progress tracker 2 updaters get min for %ld progresses costs %s\n",
 | 
						|
          progress_cnt, TVAL_TO_STR(avg));
 | 
						|
}
 | 
						|
}
 | 
						|
}
 | 
						|
 | 
						|
int main(int argc, char **argv)
 | 
						|
{
 | 
						|
  //ObLogger::get_logger().set_mod_log_levels("ALL.*:DEBUG, TLOG.*:DEBUG");
 | 
						|
  testing::InitGoogleTest(&argc,argv);
 | 
						|
  // testing::FLAGS_gtest_filter = "DO_NOT_RUN";
 | 
						|
  return RUN_ALL_TESTS();
 | 
						|
}
 |