1185 lines
		
	
	
		
			40 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1185 lines
		
	
	
		
			40 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#include <gtest/gtest.h>
 | 
						|
#include <gmock/gmock.h>
 | 
						|
#define private public
 | 
						|
 | 
						|
#include "share/schema/ob_schema_getter_guard.h"
 | 
						|
#include "observer/ob_server.h"
 | 
						|
#include "observer/table/ob_table_api_row_iterator.h"
 | 
						|
#include "observer/table/ob_table_service.h"
 | 
						|
#include "observer/table/ob_table_ttl_manager.h"
 | 
						|
#include "rootserver/ob_ttl_scheduler.h"
 | 
						|
#include "share/table/ob_ttl_util.h"
 | 
						|
 | 
						|
namespace oceanbase {
 | 
						|
 | 
						|
namespace observer {
 | 
						|
 | 
						|
using namespace oceanbase::table;
 | 
						|
// #define UNUSED(x) (x)
 | 
						|
static const int64_t TEST_COLUMN_CNT = 3;
 | 
						|
static const int64_t TEST_ROWKEY_COLUMN_CNT = 1;
 | 
						|
 | 
						|
class TestTableApi : public::testing::Test {
 | 
						|
public:
 | 
						|
  TestTableApi();
 | 
						|
  virtual ~TestTableApi()
 | 
						|
  {}
 | 
						|
  virtual void SetUp();
 | 
						|
  virtual void TearDown();
 | 
						|
 | 
						|
private:
 | 
						|
  void prepare_schema();
 | 
						|
 | 
						|
protected:
 | 
						|
  ObArenaAllocator allocator_;
 | 
						|
  ObTableSchema table_schema_;
 | 
						|
};
 | 
						|
 | 
						|
class TestObTableApiRowIterator : public ObTableApiRowIterator {
 | 
						|
public:
 | 
						|
  void set_table_schema(const ObTableSchema *table_schema) { table_schema_ = table_schema; }
 | 
						|
  void set_is_init(bool is_init) { is_inited_ = is_init; }
 | 
						|
  void set_has_gen_column(bool is_has) { has_generate_column_ = is_has; }
 | 
						|
  void set_entity(table::ObITableEntity *entity) { _entity = entity; }
 | 
						|
  int open() { return cons_all_columns(*_entity, true); }
 | 
						|
  virtual int get_next_row(ObNewRow*& row);
 | 
						|
  int cons_row(const table::ObITableEntity &entity, common::ObNewRow *&row);
 | 
						|
 | 
						|
private: 
 | 
						|
  table::ObITableEntity *_entity;
 | 
						|
};
 | 
						|
 | 
						|
int TestObTableApiRowIterator::get_next_row(ObNewRow *&row)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  row_allocator_.reuse();
 | 
						|
  if (OB_ISNULL(_entity)) {
 | 
						|
    ret = OB_NOT_INIT;
 | 
						|
    COMMON_LOG(INFO, "The entity is null, ", K(ret));
 | 
						|
  } else if (OB_FAIL(cons_row(*_entity, row))) {
 | 
						|
    COMMON_LOG(INFO, "Fail to construct insert row, ", K(ret));
 | 
						|
  } else {
 | 
						|
    //success
 | 
						|
    COMMON_LOG(INFO, "Api insert row iter, ", K(*row));
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int TestObTableApiRowIterator::cons_row(const table::ObITableEntity &entity, common::ObNewRow *&row)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  if (OB_SUCCESS != entity_to_row(entity, row_objs_)) {
 | 
						|
    COMMON_LOG(INFO, "Fail to generate row from entity", K(ret));
 | 
						|
  } else {
 | 
						|
    const int64_t N = missing_default_objs_.count();
 | 
						|
    for (int64_t i = 0; OB_SUCC(ret) && i < N; i++) {
 | 
						|
      if (OB_FAIL(row_objs_.push_back(missing_default_objs_.at(i)))) {
 | 
						|
        COMMON_LOG(INFO, "Fail to push default value to row, ", K(ret));
 | 
						|
      }
 | 
						|
    }
 | 
						|
    if (OB_SUCC(ret)) {
 | 
						|
      row_.assign(&row_objs_.at(0), row_objs_.count());
 | 
						|
      if (has_generate_column_ && OB_FAIL(fill_generate_columns(row_))) {
 | 
						|
        COMMON_LOG(INFO, "Fail to fill generate columns, ", K(ret));
 | 
						|
      }
 | 
						|
    }
 | 
						|
    if (OB_SUCC(ret)) {
 | 
						|
      if (OB_FAIL(check_row(row_))) {
 | 
						|
        COMMON_LOG(INFO, "Fail to check row, ", K(ret), K_(row));
 | 
						|
      } else {
 | 
						|
        row = &row_;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
TestTableApi::TestTableApi() : allocator_(ObModIds::TEST)
 | 
						|
{}
 | 
						|
 | 
						|
void TestTableApi::SetUp()
 | 
						|
{
 | 
						|
  prepare_schema();
 | 
						|
}
 | 
						|
 | 
						|
void TestTableApi::TearDown()
 | 
						|
{
 | 
						|
  table_schema_.reset();
 | 
						|
}
 | 
						|
 | 
						|
void TestTableApi::prepare_schema()
 | 
						|
{
 | 
						|
  ObColumnSchemaV2 column;
 | 
						|
  int64_t table_id = 3001;
 | 
						|
  int64_t micro_block_size = 16 * 1024;
 | 
						|
  //init table schema
 | 
						|
  table_schema_.reset();
 | 
						|
  ASSERT_EQ(OB_SUCCESS, table_schema_.set_table_name("test_tableapi"));
 | 
						|
  table_schema_.set_tenant_id(1);
 | 
						|
  table_schema_.set_tablegroup_id(1);
 | 
						|
  table_schema_.set_database_id(1);
 | 
						|
  table_schema_.set_table_id(table_id);
 | 
						|
  table_schema_.set_rowkey_column_num(TEST_ROWKEY_COLUMN_CNT);
 | 
						|
  table_schema_.set_max_used_column_id(TEST_COLUMN_CNT);
 | 
						|
  table_schema_.set_block_size(micro_block_size);
 | 
						|
  table_schema_.set_compress_func_name("none");
 | 
						|
  //init column
 | 
						|
  char name[OB_MAX_FILE_NAME_LENGTH];
 | 
						|
  memset(name, 0, sizeof(name));
 | 
						|
 | 
						|
  for(int32_t i = 0; i < TEST_COLUMN_CNT; ++i) {
 | 
						|
    ObObjType obj_type = static_cast<ObObjType>(ObIntType);
 | 
						|
    column.reset();
 | 
						|
    column.set_table_id(table_id);
 | 
						|
    column.set_column_id(i + OB_APP_MIN_COLUMN_ID);
 | 
						|
    sprintf(name, "c%d", i);
 | 
						|
    ASSERT_EQ(OB_SUCCESS, column.set_column_name(name));
 | 
						|
    column.set_data_type(obj_type);
 | 
						|
    column.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
 | 
						|
    column.set_data_length(1);
 | 
						|
    if (i < TEST_ROWKEY_COLUMN_CNT) {
 | 
						|
      column.set_rowkey_position(i + 1);
 | 
						|
    } else {
 | 
						|
      column.set_rowkey_position(0);
 | 
						|
    }
 | 
						|
    ASSERT_EQ(OB_SUCCESS, table_schema_.add_column(column));
 | 
						|
  }
 | 
						|
  // check rowkey column
 | 
						|
  const ObRowkeyInfo& rowkey_info = table_schema_.get_rowkey_info();
 | 
						|
  for (int64_t i = 0; i < rowkey_info.get_size(); ++i) {
 | 
						|
    uint64_t column_id = OB_INVALID_ID;
 | 
						|
    ASSERT_EQ(OB_SUCCESS, rowkey_info.get_column_id(i, column_id));
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTableApi, entity_factory)
 | 
						|
{
 | 
						|
  table::ObTableEntityFactory<table::ObTableEntity> entity_factory;
 | 
						|
  static const int64_t N = 100;
 | 
						|
  static const int64_t R = 3;
 | 
						|
  for (int round = 0; round < R; ++round) {
 | 
						|
    for (int i = 0; i < N; ++i) {
 | 
						|
      table::ObITableEntity *entity = entity_factory.alloc();
 | 
						|
      ASSERT_TRUE(NULL != entity);
 | 
						|
    } // end for
 | 
						|
    fprintf(stderr, "used=%ld free=%ld mem_total=%ld mem_used=%ld\n",
 | 
						|
            entity_factory.get_used_count(), entity_factory.get_free_count(),
 | 
						|
            entity_factory.get_total_mem(), entity_factory.get_used_mem());
 | 
						|
    entity_factory.free_and_reuse();
 | 
						|
    fprintf(stderr, "used=%ld free=%ld mem_total=%ld mem_used=%ld\n",
 | 
						|
            entity_factory.get_used_count(), entity_factory.get_free_count(),
 | 
						|
            entity_factory.get_total_mem(), entity_factory.get_used_mem());
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTableApi, serialize_batch_result)
 | 
						|
{
 | 
						|
  ObTableBatchOperationResult result;
 | 
						|
  table::ObTableEntity result_entity;
 | 
						|
  ObTableOperationResult single_op_result;
 | 
						|
  single_op_result.set_entity(result_entity);
 | 
						|
  single_op_result.set_errno(1234);
 | 
						|
  single_op_result.set_type(table::ObTableOperationType::INSERT_OR_UPDATE);
 | 
						|
  single_op_result.set_affected_rows(4321);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, result.push_back(single_op_result));
 | 
						|
  int64_t expected_len = result.get_serialize_size();
 | 
						|
  char buf[1024];
 | 
						|
  int64_t pos = 0;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, result.serialize(buf, 1024, pos));
 | 
						|
  ASSERT_EQ(expected_len, pos);
 | 
						|
 | 
						|
  ObTableBatchOperationResult result2;
 | 
						|
  table::ObTableEntityFactory<table::ObTableEntity> entity_factory;
 | 
						|
  result2.set_entity_factory(&entity_factory);
 | 
						|
  int64_t data_len = pos;
 | 
						|
  pos = 0;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, result2.deserialize(buf, data_len, pos));
 | 
						|
  ASSERT_EQ(1, result2.count());
 | 
						|
  ASSERT_EQ(1234, result2.at(0).get_errno());
 | 
						|
  ASSERT_EQ(4321, result2.at(0).get_affected_rows());
 | 
						|
  ASSERT_EQ(table::ObTableOperationType::INSERT_OR_UPDATE, result2.at(0).type());
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTableApi, serialize_table_query)
 | 
						|
{
 | 
						|
  ObTableQuery query;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query.add_select_column("c1"));
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query.add_select_column("c2"));
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query.add_select_column("c3"));
 | 
						|
 | 
						|
  ObObj pk_objs_start[2];
 | 
						|
  pk_objs_start[0].set_int(0);
 | 
						|
  pk_objs_start[1].set_min_value();
 | 
						|
  ObObj pk_objs_end[2];
 | 
						|
  pk_objs_end[0].set_int(0);
 | 
						|
  pk_objs_end[1].set_max_value();
 | 
						|
  ObNewRange range;
 | 
						|
  range.start_key_.assign(pk_objs_start, 2);
 | 
						|
  range.end_key_.assign(pk_objs_end, 2);
 | 
						|
  range.border_flag_.set_inclusive_start();
 | 
						|
  range.border_flag_.set_inclusive_end();
 | 
						|
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range));
 | 
						|
  int64_t serialize_len = query.get_serialize_size();
 | 
						|
  fprintf(stderr, "serialize_size=%ld\n", serialize_len);
 | 
						|
  char buf[1024];
 | 
						|
  int64_t pos = 0;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query.serialize(buf, 1024, pos));
 | 
						|
  ASSERT_EQ(pos, serialize_len);
 | 
						|
 | 
						|
  ObTableQuery query2;
 | 
						|
  ObArenaAllocator alloc;
 | 
						|
  query2.set_deserialize_allocator(&alloc);
 | 
						|
  pos = 0;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query2.deserialize(buf, serialize_len, pos));
 | 
						|
  const ObIArray<ObString> &select_columns = query2.get_select_columns();
 | 
						|
  const ObIArray<ObNewRange> &scan_ranges = query2.get_scan_ranges();
 | 
						|
  ASSERT_EQ(3, select_columns.count());
 | 
						|
  ASSERT_EQ(1, scan_ranges.count());
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTableApi, serialize_query_result)
 | 
						|
{
 | 
						|
  ObTableQueryResult query_result;
 | 
						|
  ObObj objs[3];
 | 
						|
  objs[0].set_int(123);
 | 
						|
  objs[1].set_null();
 | 
						|
  objs[2].set_varchar(ObString::make_string("serialize_query_result"));
 | 
						|
  ObNewRow row;
 | 
						|
  row.assign(objs, 3);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query_result.add_property_name("c1"));
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query_result.add_property_name("c2"));
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query_result.add_property_name("c3"));
 | 
						|
  for (int64_t i = 0; i < 1024; ++i) {
 | 
						|
    ASSERT_EQ(OB_SUCCESS, query_result.add_row(row));
 | 
						|
  }
 | 
						|
  ASSERT_EQ(1024, query_result.get_row_count());
 | 
						|
  ASSERT_EQ(3, query_result.get_property_count());
 | 
						|
  // serialize
 | 
						|
  char *buf = static_cast<char*>(ob_malloc(OB_MALLOC_BIG_BLOCK_SIZE, ObModIds::TEST));
 | 
						|
  ASSERT_TRUE(nullptr != buf);
 | 
						|
  int64_t pos = 0;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query_result.serialize(buf, OB_MALLOC_BIG_BLOCK_SIZE, pos));
 | 
						|
  ASSERT_EQ(pos, query_result.get_serialize_size());
 | 
						|
  fprintf(stderr, "serialize_size=%ld\n", pos);
 | 
						|
  // deserialize & check
 | 
						|
  ObTableQueryResult query_result2;
 | 
						|
  int64_t data_len = pos;
 | 
						|
  pos = 0;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, query_result2.deserialize(buf, data_len, pos));
 | 
						|
  ASSERT_EQ(1024, query_result2.get_row_count());
 | 
						|
  ASSERT_EQ(3, query_result2.get_property_count());
 | 
						|
  const table::ObITableEntity *entity = NULL;
 | 
						|
  for (int64_t i = 0; i < 1024; ++i) {
 | 
						|
    ASSERT_EQ(OB_SUCCESS, query_result2.get_next_entity(entity));
 | 
						|
    ASSERT_TRUE(NULL != entity);
 | 
						|
    ASSERT_EQ(3, entity->get_properties_count());
 | 
						|
    ObObj value;
 | 
						|
    ASSERT_EQ(OB_SUCCESS, entity->get_property("c1", value));
 | 
						|
    ASSERT_EQ(123, value.get_int());
 | 
						|
    ASSERT_EQ(OB_SUCCESS, entity->get_property("c2", value));
 | 
						|
    ASSERT_TRUE(value.is_null());
 | 
						|
    ASSERT_EQ(OB_SUCCESS, entity->get_property("c3", value));
 | 
						|
    ObString str;
 | 
						|
    ASSERT_EQ(OB_SUCCESS, value.get_varchar(str));
 | 
						|
    ASSERT_TRUE(str == ObString::make_string("serialize_query_result"));
 | 
						|
  }
 | 
						|
  ASSERT_EQ(OB_ITER_END, query_result2.get_next_entity(entity));
 | 
						|
  // cleanup
 | 
						|
  if (NULL != buf) {
 | 
						|
    ob_free(buf);
 | 
						|
    buf = NULL;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTableApi, table_entity)
 | 
						|
{
 | 
						|
  int ret;
 | 
						|
  ObSEArray<ObString, 1> ppts;
 | 
						|
  // set row key
 | 
						|
  ObObj key_objs[3];
 | 
						|
  key_objs[0].set_varbinary("table_entity");
 | 
						|
  key_objs[1].set_varchar("hi");
 | 
						|
  key_objs[2].set_int(1);
 | 
						|
  ObRowkey rk(key_objs, 3);
 | 
						|
  // cons entity
 | 
						|
  table::ObTableEntity entity;
 | 
						|
  ObObj value;
 | 
						|
  entity.set_rowkey(rk);
 | 
						|
  ASSERT_EQ(3, entity.get_rowkey_size());
 | 
						|
  ASSERT_EQ(0, entity.get_rowkey_value(2, value));
 | 
						|
  ASSERT_EQ(1, value.get_int());
 | 
						|
  // properaties
 | 
						|
  value.set_varchar("value");
 | 
						|
  value.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
 | 
						|
  ASSERT_EQ(0, entity.set_property("c1", value));
 | 
						|
  ASSERT_EQ(0, entity.get_property("c1", value));
 | 
						|
  ASSERT_EQ(ObString::make_string("value"), value.get_varchar());
 | 
						|
  ASSERT_EQ(0, entity.get_properties_names(ppts));
 | 
						|
  ASSERT_EQ(1, ppts.count());
 | 
						|
  ASSERT_EQ(1, entity.get_properties_count());
 | 
						|
  // reset entity
 | 
						|
  entity.reset();
 | 
						|
  ASSERT_TRUE(entity.is_empty());
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTableApi, open_and_get_next_row)
 | 
						|
{
 | 
						|
  ObTableOperation table_operation;
 | 
						|
  TestObTableApiRowIterator row_iterator;
 | 
						|
 | 
						|
  row_iterator.set_is_init(true);
 | 
						|
  row_iterator.set_has_gen_column(false);
 | 
						|
  row_iterator.set_table_schema(&table_schema_);
 | 
						|
 | 
						|
  table::ObTableEntity entity;
 | 
						|
  // set rowkey
 | 
						|
  ObObj key_objs[1];
 | 
						|
  key_objs[0].set_int(1);
 | 
						|
  ObRowkey rk(key_objs, 1);
 | 
						|
  entity.set_rowkey(rk);
 | 
						|
  // set properties
 | 
						|
  ObObj value;
 | 
						|
  value.set_int(111);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, entity.set_property("c1", value));
 | 
						|
  value.set_int(222);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, entity.set_property("c2", value));
 | 
						|
  
 | 
						|
  ObNewRow *row = nullptr;
 | 
						|
  row_iterator.set_entity(&entity);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, row_iterator.open());
 | 
						|
  ASSERT_EQ(OB_SUCCESS, row_iterator.get_next_row(row));
 | 
						|
}
 | 
						|
 | 
						|
#if 0
 | 
						|
 | 
						|
class TestTableTTL : public::testing::Test {
 | 
						|
public:
 | 
						|
  TestTableTTL() {}
 | 
						|
  virtual ~TestTableTTL() {}
 | 
						|
  virtual void SetUp(); /*global event*/
 | 
						|
  virtual void TearDown();
 | 
						|
  void insert_one_partition_task();
 | 
						|
 | 
						|
  common::ObMySQLProxy sql_proxy_;
 | 
						|
};
 | 
						|
 | 
						|
void TestTableTTL::SetUp()
 | 
						|
{
 | 
						|
  ObTTLManager::get_instance().event_delay_ = 20 * 1000L;
 | 
						|
  ObTTLManager::get_instance().periodic_delay_ = 20 * 1000L;
 | 
						|
  ObTTLManager::get_instance().sql_proxy_ = &sql_proxy_;
 | 
						|
  ObTTLManager::get_instance().init();
 | 
						|
  ObTTLManager::get_instance().start();
 | 
						|
  usleep(100 * 1000L);
 | 
						|
}
 | 
						|
 | 
						|
void TestTableTTL::TearDown()
 | 
						|
{
 | 
						|
  ObTTLManager::get_instance().stop();
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTableTTL, proc_rs_cmd)
 | 
						|
{
 | 
						|
  uint64_t tenant_id[3] = {1, 2, 3};
 | 
						|
  uint64_t table_id[4];
 | 
						|
  ObPartitionKey pk[5];
 | 
						|
  int64_t task_id1 = 1;
 | 
						|
  ObTTLTaskInfo task_info;
 | 
						|
  ObTTLPara para;
 | 
						|
 | 
						|
  table_id[0] = combine_id(tenant_id[0], 1);
 | 
						|
  table_id[1] = combine_id(tenant_id[1], 2);
 | 
						|
  table_id[2] = combine_id(tenant_id[1], 1);
 | 
						|
  table_id[3] = combine_id(tenant_id[2], 1);
 | 
						|
 | 
						|
  pk[0] = ObPartitionKey(table_id[0], 1, 1);
 | 
						|
  pk[1] = ObPartitionKey(table_id[1], 1, 2);
 | 
						|
  pk[2] = ObPartitionKey(table_id[2], 1, 3);
 | 
						|
  pk[3] = ObPartitionKey(table_id[3], 1, 4);
 | 
						|
  pk[4] = ObPartitionKey(table_id[3], 2, 5);
 | 
						|
  
 | 
						|
  ObTTLTaskCtx* ctx = NULL;
 | 
						|
  int64_t rsp_time = OB_INVALID_ID;
 | 
						|
  ObTTLManager::ObTTLTenantInfo* tenant_ttl = NULL;
 | 
						|
  ObTTLManager& ttl_mgr = ObTTLManager::get_instance();
 | 
						|
  /*1.1 simulate: rs send a trigger request*/
 | 
						|
  ttl_mgr.proc_rs_cmd(tenant_id[0], task_id1, true, OB_TTL_TASK_RUNNING);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ASSERT_EQ(1, ttl_mgr.ttl_tenant_parts_map_.size());
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_EQ(tenant_ttl->state_, OB_TTL_TASK_RUNNING);
 | 
						|
    ASSERT_EQ(tenant_ttl->is_usr_trigger_, true);
 | 
						|
    rsp_time = tenant_ttl->rsp_time_;
 | 
						|
  }
 | 
						|
 | 
						|
  /*1.2. simulate: new a partition task*/
 | 
						|
  {
 | 
						|
    for (int i = 0; i<5; i++) {
 | 
						|
      if (pk[i].get_tenant_id() == tenant_id[0]) {
 | 
						|
        task_info.pkey_ = pk[i];
 | 
						|
        task_info.task_id_ = task_id1;
 | 
						|
        ttl_mgr.generate_one_partition_task(task_info, para);
 | 
						|
        common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
        ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
        ASSERT_TRUE(NULL != ctx);
 | 
						|
        ASSERT_EQ(ctx->task_status_ , OB_TTL_TASK_PREPARE);
 | 
						|
      }      
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  usleep(100 * 1000L);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ASSERT_EQ(tenant_ttl->part_task_map_.size() , 1);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_EQ(ctx->task_status_ , OB_TTL_TASK_PREPARE);
 | 
						|
 | 
						|
    /*1.3. simulate: sync sys table logical*/
 | 
						|
    ctx->is_dirty_ = false;
 | 
						|
    ctx->task_status_ = OB_TTL_TASK_RUNNING;
 | 
						|
    ctx->rsp_time_ = tenant_ttl->rsp_time_;
 | 
						|
  }
 | 
						|
 | 
						|
  /*1.4. reponse to rs*/
 | 
						|
  usleep(100 * 1000L);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ASSERT_EQ(tenant_ttl->rsp_time_ , OB_INVALID_ID); 
 | 
						|
    ASSERT_EQ(tenant_ttl->is_dirty_ , false); 
 | 
						|
  }
 | 
						|
 | 
						|
  /*5.1 retrigger*/
 | 
						|
  ttl_mgr.proc_rs_cmd(tenant_id[0], 3, true, OB_TTL_TASK_RUNNING);
 | 
						|
  /*mock response rs*/
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ctx->is_dirty_ = false;
 | 
						|
    ctx->rsp_time_ = tenant_ttl->rsp_time_;
 | 
						|
  }
 | 
						|
  ctx->rsp_time_ = tenant_ttl->rsp_time_;
 | 
						|
  /*5.1. reponse to rs*/
 | 
						|
  usleep(100 * 1000L);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ASSERT_EQ(tenant_ttl->rsp_time_ , OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(tenant_ttl->is_dirty_ , false);
 | 
						|
    ASSERT_EQ(tenant_ttl->task_id_ , 3);
 | 
						|
  }
 | 
						|
 | 
						|
  /*2.1 simulate: rs send a pending request*/
 | 
						|
  ttl_mgr.proc_rs_cmd(tenant_id[0], task_id1, true, OB_TTL_TASK_PENDING);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_TRUE(NULL != ctx);
 | 
						|
    ASSERT_EQ(tenant_ttl->state_, OB_TTL_TASK_PENDING);
 | 
						|
    ASSERT_TRUE(tenant_ttl->rsp_time_ != OB_INVALID_ID);
 | 
						|
    ASSERT_TRUE(tenant_ttl->rsp_time_ != ctx->rsp_time_);
 | 
						|
    ASSERT_EQ(tenant_ttl->is_dirty_, true);
 | 
						|
    ASSERT_EQ(ctx->task_status_, OB_TTL_TASK_RUNNING);
 | 
						|
    ASSERT_EQ(ctx->is_dirty_, false);
 | 
						|
  }
 | 
						|
 | 
						|
  usleep(100 * 1000L);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_TRUE(NULL != ctx);
 | 
						|
    ASSERT_EQ(ctx->is_dirty_, true);
 | 
						|
    ASSERT_EQ(ctx->task_status_, OB_TTL_TASK_PENDING);
 | 
						|
    
 | 
						|
    /*2.3. simulate: sync sys table logical*/
 | 
						|
    ctx->is_dirty_ = false;
 | 
						|
    ctx->rsp_time_ = tenant_ttl->rsp_time_;
 | 
						|
  }
 | 
						|
 | 
						|
  /*2.4 wait rsp*/
 | 
						|
  usleep(100 * 1000L);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_TRUE(NULL != ctx);
 | 
						|
    ASSERT_EQ(tenant_ttl->rsp_time_ , OB_INVALID_ID); 
 | 
						|
    ASSERT_EQ(tenant_ttl->is_dirty_ , false); 
 | 
						|
    ASSERT_EQ(tenant_ttl->state_ , ctx->task_status_); 
 | 
						|
  }
 | 
						|
 | 
						|
  /*3.1 simulate: rs send a resume request*/
 | 
						|
  ttl_mgr.proc_rs_cmd(tenant_id[0], task_id1, true, OB_TTL_TASK_RUNNING);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_TRUE(NULL != ctx);
 | 
						|
    ASSERT_EQ(tenant_ttl->state_, OB_TTL_TASK_RUNNING);
 | 
						|
    ASSERT_TRUE(tenant_ttl->rsp_time_ != OB_INVALID_ID);
 | 
						|
    ASSERT_TRUE(tenant_ttl->rsp_time_ != ctx->rsp_time_);
 | 
						|
    ASSERT_EQ(tenant_ttl->is_dirty_, true);
 | 
						|
    ASSERT_EQ(ctx->task_status_, OB_TTL_TASK_PENDING);
 | 
						|
    ASSERT_EQ(ctx->is_dirty_, false);
 | 
						|
  }
 | 
						|
 | 
						|
  usleep(100 * 1000L);
 | 
						|
  {
 | 
						|
    /*3.2. simulate: sync sys table logical*/
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_TRUE(NULL != ctx);
 | 
						|
    ASSERT_EQ(ctx->is_dirty_, true);
 | 
						|
    ASSERT_EQ(ctx->task_status_, OB_TTL_TASK_RUNNING);
 | 
						|
    
 | 
						|
    /*3.3. simulate: sync sys table logical*/
 | 
						|
    ctx->is_dirty_ = false;
 | 
						|
    ctx->rsp_time_ = tenant_ttl->rsp_time_;
 | 
						|
  }
 | 
						|
 | 
						|
  /*3.4 wait rsp*/
 | 
						|
  usleep(100 * 1000L);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_TRUE(NULL != ctx);
 | 
						|
    ASSERT_EQ(tenant_ttl->rsp_time_ , OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(tenant_ttl->is_dirty_ , false);
 | 
						|
    ASSERT_EQ(ctx->task_status_ , OB_TTL_TASK_RUNNING);
 | 
						|
  }
 | 
						|
 | 
						|
    /*4.1 simulate: rs send a resume request*/
 | 
						|
  ttl_mgr.proc_rs_cmd(tenant_id[0], task_id1, true, OB_TTL_TASK_CANCEL);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_TRUE(NULL != ctx);
 | 
						|
    ASSERT_EQ(tenant_ttl->state_, OB_TTL_TASK_CANCEL);
 | 
						|
    ASSERT_TRUE(tenant_ttl->rsp_time_ != OB_INVALID_ID);
 | 
						|
    ASSERT_TRUE(tenant_ttl->rsp_time_ != ctx->rsp_time_);
 | 
						|
    ASSERT_EQ(tenant_ttl->is_dirty_, true);
 | 
						|
    ASSERT_EQ(ctx->task_status_, OB_TTL_TASK_RUNNING);
 | 
						|
    ASSERT_EQ(ctx->is_dirty_, false);
 | 
						|
  }
 | 
						|
 | 
						|
  usleep(100 * 1000L);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_TRUE(NULL != ctx);
 | 
						|
    ASSERT_EQ(ctx->is_dirty_, true);
 | 
						|
    ASSERT_EQ(ctx->task_status_, OB_TTL_TASK_CANCEL);
 | 
						|
    
 | 
						|
    /*4.2. simulate: sync sys table logical*/
 | 
						|
    ctx->is_dirty_ = false;
 | 
						|
    ctx->rsp_time_ = tenant_ttl->rsp_time_;
 | 
						|
  }
 | 
						|
 | 
						|
  /*4.3 wait rsp*/
 | 
						|
  usleep(100 * 1000L);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(tenant_id[0], false);
 | 
						|
    ctx = ttl_mgr.get_one_partition_ctx(pk[0]);
 | 
						|
    ASSERT_TRUE(NULL == tenant_ttl);
 | 
						|
    ASSERT_TRUE(NULL == ctx);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTableTTL, dag_report)
 | 
						|
{
 | 
						|
  ObTTLManager& ttl_mgr = ObTTLManager::get_instance();
 | 
						|
  ObPartitionKey pkey(combine_id(1, 1) ,1 , 1);
 | 
						|
  ObTTLTaskCtx* ctx = NULL;
 | 
						|
  ObTTLManager::ObTTLTenantInfo* tenant_ttl = NULL;
 | 
						|
  ObTTLTaskInfo task_info;
 | 
						|
  ObTTLPara para;
 | 
						|
  bool is_stop = false;
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  uint32_t tenant_id = 1001;
 | 
						|
  uint32_t tenant_id1 = 1002;
 | 
						|
 | 
						|
  /*1.1 simulate: rs send a trigger request*/
 | 
						|
  ttl_mgr.proc_rs_cmd(pkey.get_tenant_id(), 1, true, OB_TTL_TASK_RUNNING);
 | 
						|
  {
 | 
						|
    common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
    tenant_ttl = (ObTTLManager::ObTTLTenantInfo*)ttl_mgr.get_tenant_info(pkey.get_tenant_id(), false);
 | 
						|
    ASSERT_EQ(1, ttl_mgr.ttl_tenant_parts_map_.size());
 | 
						|
    ASSERT_TRUE(NULL != tenant_ttl);
 | 
						|
    ASSERT_EQ(tenant_ttl->state_, OB_TTL_TASK_RUNNING);
 | 
						|
    ASSERT_EQ(tenant_ttl->is_usr_trigger_, true);
 | 
						|
    tenant_ttl->ttl_continue_ = true;
 | 
						|
  }
 | 
						|
  task_info.pkey_ = pkey;
 | 
						|
  ret = ttl_mgr.generate_one_partition_task(task_info, para);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  usleep(100 * 1000L); /*wait schedule*/
 | 
						|
  {
 | 
						|
    /*1.1 simulate: rs send a trigger request*/
 | 
						|
    
 | 
						|
    task_info.err_code_ = OB_SUCCESS;
 | 
						|
    ret = ttl_mgr.report_task_status(task_info, para, is_stop);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(is_stop, false);
 | 
						|
    {
 | 
						|
      common::ObSpinLockGuard guard(ttl_mgr.lock_);
 | 
						|
      ctx = ttl_mgr.get_one_partition_ctx(pkey);
 | 
						|
      ASSERT_TRUE(NULL != ctx);
 | 
						|
      ASSERT_EQ(ctx->task_status_, OB_TTL_TASK_PENDING);  /*config disable ttl*/
 | 
						|
    }
 | 
						|
 | 
						|
    task_info.err_code_ = OB_NOT_INIT;
 | 
						|
    ret = ttl_mgr.report_task_status(task_info, para, is_stop);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(is_stop, true);
 | 
						|
    ASSERT_EQ(ctx->task_status_, OB_TTL_TASK_PENDING); 
 | 
						|
 | 
						|
    task_info.err_code_ = OB_ITER_END;
 | 
						|
    ret = ttl_mgr.report_task_status(task_info, para, is_stop);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(is_stop, true);
 | 
						|
    ASSERT_EQ(ctx->task_status_, OB_TTL_TASK_FINISH); 
 | 
						|
  }
 | 
						|
 | 
						|
}
 | 
						|
#endif 
 | 
						|
 | 
						|
#if 0
 | 
						|
class TestRsTTL : public::testing::Test {
 | 
						|
public:
 | 
						|
  TestRsTTL() {};
 | 
						|
  virtual ~TestRsTTL() {}
 | 
						|
  virtual void SetUp(); /*global event*/
 | 
						|
  virtual void TearDown();
 | 
						|
 | 
						|
};
 | 
						|
 | 
						|
class TestRsTTLTaskMgr : public rootserver::ObTTLTenantTaskMgr {
 | 
						|
 | 
						|
public:
 | 
						|
  static TestRsTTLTaskMgr& get_instance() {
 | 
						|
    static TestRsTTLTaskMgr instance_;
 | 
						|
    return instance_;
 | 
						|
  }
 | 
						|
 | 
						|
  int init() {
 | 
						|
    return ObTTLTenantTaskMgr::init();
 | 
						|
  }
 | 
						|
 | 
						|
  int get_alive_servers(uint64_t tenant_id,  rootserver::ServerList& server_infos) { 
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    UNUSED(server_infos);
 | 
						|
    return OB_SUCCESS; 
 | 
						|
  }
 | 
						|
  
 | 
						|
  int get_tenant_ids(ObIArray<uint64_t>& tenant_ids)
 | 
						|
  {
 | 
						|
    UNUSED(tenant_ids);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
 | 
						|
  int read_tenant_status(uint64_t tenant_id, 
 | 
						|
                        common::ObTTLStatusArray& tenant_tasks)
 | 
						|
  {
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    UNUSED(tenant_tasks);
 | 
						|
    return OB_SUCCESS; 
 | 
						|
  }
 | 
						|
 | 
						|
  int delete_task(uint64_t tenant_id, uint64_t task_id)
 | 
						|
  {
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    UNUSED(task_id);
 | 
						|
    return OB_SUCCESS; 
 | 
						|
  }
 | 
						|
 | 
						|
  int update_task_status(uint64_t tenant_id,
 | 
						|
                      uint64_t task_id,
 | 
						|
                      int64_t rs_new_status)
 | 
						|
  {
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    UNUSED(task_id);
 | 
						|
    UNUSED(rs_new_status);
 | 
						|
    return OB_SUCCESS; 
 | 
						|
  }
 | 
						|
 | 
						|
  int fetch_ttl_task_id(int64_t &new_task_id, int64_t last_task_id) {
 | 
						|
    UNUSED(last_task_id);
 | 
						|
    new_task_id = ++task_id_;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  int insert_tenant_task(ObTTLStatus& ttl_task) {
 | 
						|
    UNUSED(ttl_task);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  int in_active_time(uint64_t tenant_id, bool& is_active_time) {
 | 
						|
   UNUSED(tenant_id);
 | 
						|
   is_active_time = active_time_;
 | 
						|
   return OB_SUCCESS; 
 | 
						|
  }
 | 
						|
 | 
						|
  void set_active_time(bool active_time) {
 | 
						|
    active_time_ = active_time;
 | 
						|
  }
 | 
						|
 | 
						|
  bool is_enable_ttl(uint64_t tenant_id) {
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    return enable_ttl_;
 | 
						|
  }
 | 
						|
 | 
						|
  void set_enable_ttl(bool enable_ttl) {
 | 
						|
    enable_ttl_ = enable_ttl;
 | 
						|
  }
 | 
						|
 | 
						|
  int dispatch_ttl_request(rootserver::ServerList& addrs, 
 | 
						|
                          rootserver::ServerList& eliminate_addrs, 
 | 
						|
                          uint64_t tenant_id,
 | 
						|
                          int ttl_cmd,
 | 
						|
                          int trigger_type,
 | 
						|
                          int64_t task_id) {
 | 
						|
    UNUSED(addrs);
 | 
						|
    UNUSED(eliminate_addrs);
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    UNUSED(ttl_cmd);
 | 
						|
    UNUSED(trigger_type);
 | 
						|
    UNUSED(task_id);
 | 
						|
    ++snd_server_cnt_;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
private:
 | 
						|
  TestRsTTLTaskMgr() 
 | 
						|
    : ObTTLTenantTaskMgr(),
 | 
						|
      task_id_(0),
 | 
						|
      enable_ttl_(true),
 | 
						|
      active_time_(true),
 | 
						|
      snd_server_cnt_(0) {}
 | 
						|
 | 
						|
public:
 | 
						|
  uint64_t task_id_;
 | 
						|
  bool enable_ttl_;
 | 
						|
  bool active_time_;
 | 
						|
  uint64_t snd_server_cnt_ = 0;
 | 
						|
};
 | 
						|
 | 
						|
#define TESTTTLMGR TestRsTTLTaskMgr::get_instance()
 | 
						|
 | 
						|
class TTLTestHelper {
 | 
						|
public:
 | 
						|
  static int construct_ttl_status(uint64_t tenant_id, size_t count, common::ObTTLStatusArray& tenant_tasks);
 | 
						|
};
 | 
						|
 | 
						|
 | 
						|
void TestRsTTL::SetUp()
 | 
						|
{
 | 
						|
  TESTTTLMGR.init();
 | 
						|
}
 | 
						|
 | 
						|
void TestRsTTL::TearDown()
 | 
						|
{ 
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
#define OB_TTL_RESPONSE_MASK (1 << 30)
 | 
						|
#define OB_TTL_STATUS_MASK  (OB_TTL_RESPONSE_MASK - 1)
 | 
						|
 | 
						|
#define SET_TASK_PURE_STATUS(status, state) ((status) = ((state) & OB_TTL_STATUS_MASK) + ((status & OB_TTL_RESPONSE_MASK)))
 | 
						|
#define SET_TASK_RESPONSE(status, state) ((status) |= (((state) & 1) << 30))
 | 
						|
#define SET_TASK_STATUS(status, pure_status, is_responsed) { SET_TASK_PURE_STATUS(status, pure_status), SET_TASK_RESPONSE(status, is_responsed); }
 | 
						|
 | 
						|
#define EVAL_TASK_RESPONSE(status) (((status) & OB_TTL_RESPONSE_MASK) >> 30)
 | 
						|
#define EVAL_TASK_PURE_STATUS(status) (static_cast<ObTTLTaskStatus>((status) & OB_TTL_STATUS_MASK))
 | 
						|
 | 
						|
/**
 | 
						|
 * add tenant
 | 
						|
 * add ttl task
 | 
						|
 * send msg
 | 
						|
 * insert into table
 | 
						|
*/
 | 
						|
TEST_F(TestRsTTL, ttl_basic)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  uint32_t tenant_id = 1001;
 | 
						|
  uint32_t tenant_id1 = 1002;
 | 
						|
 | 
						|
  ret = TESTTTLMGR.add_tenant(tenant_id);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  ret = TESTTTLMGR.refresh_tenant(tenant_id);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  ret = TESTTTLMGR.refresh_tenant(tenant_id);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  rootserver::ObTTLTenantTask* task_ptr = NULL;
 | 
						|
  ret = TESTTTLMGR.get_tenant_tasks_ptr(tenant_id, task_ptr);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  rootserver::ObTTLTenantTask& task_ref = *task_ptr;
 | 
						|
  ASSERT_EQ(task_ref.tasks_.count(), 0);
 | 
						|
  
 | 
						|
  ret = TESTTTLMGR.add_tenant(tenant_id1);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  // no ttl task exist return error
 | 
						|
  ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_CANCEL);
 | 
						|
  ASSERT_NE(ret, OB_SUCCESS);
 | 
						|
  
 | 
						|
  ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_MOVE);
 | 
						|
  ASSERT_NE(ret, OB_SUCCESS);
 | 
						|
  
 | 
						|
  ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_RESUME);
 | 
						|
  ASSERT_NE(ret, OB_SUCCESS);
 | 
						|
  
 | 
						|
  ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_SUSPEND);
 | 
						|
  ASSERT_NE(ret, OB_SUCCESS);
 | 
						|
  
 | 
						|
  // add ttl task
 | 
						|
  ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_TRIGGER);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  {
 | 
						|
    ASSERT_EQ(task_ref.tasks_.count(), 1);
 | 
						|
 | 
						|
    rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(0);
 | 
						|
    ASSERT_EQ(rs_task.send_servers_.count(), 0);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.tenant_id_, tenant_id);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.table_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.partition_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::USER_TRIGGER));
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE));  
 | 
						|
  }
 | 
						|
 | 
						|
  {
 | 
						|
    // add ttl task, cancel first, and add new one
 | 
						|
    ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_TRIGGER);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(task_ref.tasks_.count(), 2);
 | 
						|
    {
 | 
						|
      rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(0);
 | 
						|
      ASSERT_EQ(rs_task.send_servers_.count(), 0);
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.tenant_id_, tenant_id);
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.table_id_, OB_INVALID_ID);
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.partition_id_, OB_INVALID_ID);
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::USER_TRIGGER));
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CANCEL));
 | 
						|
    }
 | 
						|
 | 
						|
    {
 | 
						|
      // new added task, init status means waiting to scheduler
 | 
						|
      rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(1);
 | 
						|
      ASSERT_EQ(rs_task.send_servers_.count(), 0);
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.tenant_id_, tenant_id);
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.table_id_, OB_INVALID_ID);
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.partition_id_, OB_INVALID_ID);
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::USER_TRIGGER));
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE));
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  {
 | 
						|
    // add ttl task, 2 task in queue, alter second task status
 | 
						|
    ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_SUSPEND);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(task_ref.tasks_.count(), 2);
 | 
						|
    // new added task, alter status
 | 
						|
    rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(1);
 | 
						|
    ASSERT_EQ(rs_task.send_servers_.count(), 0);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.tenant_id_, tenant_id);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.table_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.partition_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::USER_TRIGGER));
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_SUSPEND));
 | 
						|
  }
 | 
						|
 | 
						|
  {
 | 
						|
    // add ttl task, 2 task in queue, alter second task status
 | 
						|
    ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_RESUME);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(task_ref.tasks_.count(), 2);
 | 
						|
    // new added task, alter status
 | 
						|
    rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(1);
 | 
						|
    ASSERT_EQ(rs_task.send_servers_.count(), 0);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.tenant_id_, tenant_id);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.table_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.partition_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::USER_TRIGGER));
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE));
 | 
						|
  }
 | 
						|
 | 
						|
  {
 | 
						|
    // add ttl task, 2 task in queue, alter second task status
 | 
						|
    ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_CANCEL);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(task_ref.tasks_.count(), 1);
 | 
						|
    // new added task, alter status
 | 
						|
    rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(0);
 | 
						|
    ASSERT_EQ(rs_task.send_servers_.count(), 0);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.tenant_id_, tenant_id);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.table_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.partition_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::USER_TRIGGER));
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CANCEL));
 | 
						|
  }
 | 
						|
 | 
						|
  {
 | 
						|
    // add ttl task failed
 | 
						|
    ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_RESUME);
 | 
						|
    ASSERT_NE(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(task_ref.tasks_.count(), 1);
 | 
						|
    // new added task, alter status
 | 
						|
    rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(0);
 | 
						|
    ASSERT_EQ(rs_task.send_servers_.count(), 0);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.tenant_id_, tenant_id);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.table_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.partition_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::USER_TRIGGER));
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CANCEL));
 | 
						|
  }
 | 
						|
 | 
						|
  // process revieve msg
 | 
						|
  {
 | 
						|
    rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(0);
 | 
						|
    int64_t addr_value1 = 0x1234123412;
 | 
						|
    common::ObAddr addr1(addr_value1);
 | 
						|
    
 | 
						|
    ret = rs_task.send_servers_.push_back(addr1);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
    uint64_t task_id = rs_task.ttl_status_.task_id_;
 | 
						|
    uint64_t invalid_task_id = OB_INVALID_ID ;
 | 
						|
 | 
						|
    int64_t rsp_task_type = static_cast<int64_t>(ObTTLTaskType::OB_TTL_CANCEL);
 | 
						|
    //  ret = TESTTTLMGR.process_tenant_task_rsp(tenant_id, invalid_task_id, rsp_task_type, addr1);
 | 
						|
    //  ASSERT_NE(ret, OB_SUCCESS);
 | 
						|
    //  
 | 
						|
    //  
 | 
						|
    //  rsp_task_type = static_cast<int64_t>(ObTTLTaskType::OB_TTL_MOVE);
 | 
						|
    //  ret = TESTTTLMGR.process_tenant_task_rsp(tenant_id, task_id, rsp_task_type, addr1);
 | 
						|
    //  ASSERT_NE(ret, OB_SUCCESS);
 | 
						|
 | 
						|
    rsp_task_type = static_cast<int64_t>(ObTTLTaskType::OB_TTL_CANCEL);
 | 
						|
    ret = TESTTTLMGR.process_tenant_task_rsp(tenant_id, task_id, rsp_task_type, addr1);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
    ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE));
 | 
						|
 | 
						|
    ret = TESTTTLMGR.process_tenant_tasks(tenant_id);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
    ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE));
 | 
						|
 | 
						|
    rsp_task_type = static_cast<int64_t>(ObTTLTaskType::OB_TTL_MOVE);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ret = TESTTTLMGR.process_tenant_task_rsp(tenant_id, task_id, rsp_task_type, addr1);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
    ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 1);
 | 
						|
    ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE));
 | 
						|
    task_ref.tasks_.remove(0);
 | 
						|
  }
 | 
						|
 | 
						|
  TESTTTLMGR.reset_local_tenant_task();
 | 
						|
  TESTTTLMGR.delete_tenant(tenant_id);
 | 
						|
  TESTTTLMGR.delete_tenant(tenant_id1);
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestRsTTL, ttl_periodic)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  uint32_t tenant_id = 1001;
 | 
						|
 | 
						|
  ret = TESTTTLMGR.add_tenant(tenant_id);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  ret = TESTTTLMGR.alter_status_and_add_ttl_task(tenant_id);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  
 | 
						|
  rootserver::ObTTLTenantTask* task_ptr = NULL;
 | 
						|
  ret = TESTTTLMGR.get_tenant_tasks_ptr(tenant_id, task_ptr);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  
 | 
						|
  rootserver::ObTTLTenantTask& task_ref = *task_ptr;
 | 
						|
  ASSERT_EQ(task_ref.tasks_.count(), 1);
 | 
						|
 | 
						|
  {
 | 
						|
    rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(0);
 | 
						|
    ASSERT_EQ(rs_task.send_servers_.count(), 0);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.tenant_id_, tenant_id);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.table_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.partition_id_, OB_INVALID_ID);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::PERIODIC_TRIGGER));
 | 
						|
    ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
    ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE));
 | 
						|
 | 
						|
    rs_task.ttl_status_.task_update_time_ -= 360000 * 1000L * 1000L;
 | 
						|
    ret = TESTTTLMGR.alter_status_and_add_ttl_task(tenant_id);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
    ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_RESUME);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
    ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE));
 | 
						|
    ASSERT_EQ(task_ref.tasks_.count(), 1);
 | 
						|
 | 
						|
 | 
						|
    // 0:canceling, 1:create
 | 
						|
    ret = TESTTTLMGR.add_ttl_task(tenant_id, ObTTLTaskType::OB_TTL_TRIGGER);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);    
 | 
						|
    ASSERT_EQ(task_ref.tasks_.count(), 2);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::PERIODIC_TRIGGER));
 | 
						|
    ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
    ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CANCEL));
 | 
						|
    {
 | 
						|
      rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(1);
 | 
						|
      ASSERT_EQ(rs_task.ttl_status_.trigger_type_, static_cast<int64_t>(TRIGGER_TYPE::USER_TRIGGER));
 | 
						|
      ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
      ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE));
 | 
						|
    }
 | 
						|
    
 | 
						|
    // first task set as move
 | 
						|
    rs_task.all_responsed_ = true;
 | 
						|
    ret = TESTTTLMGR.alter_status_and_add_ttl_task(tenant_id);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
    ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE));
 | 
						|
 | 
						|
    // first task moveing, just retry send msg to server
 | 
						|
    ret = TESTTTLMGR.process_tenant_tasks(tenant_id);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
    ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE));
 | 
						|
 | 
						|
    // first task set as moved && delete first
 | 
						|
    rs_task.all_responsed_ = true;
 | 
						|
    ret = TESTTTLMGR.alter_status_and_add_ttl_task(tenant_id);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(task_ref.tasks_.count(), 1);
 | 
						|
    
 | 
						|
    {
 | 
						|
      rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(0);
 | 
						|
      ret = TESTTTLMGR.process_tenant_tasks(tenant_id);
 | 
						|
      ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
      ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
      ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE));
 | 
						|
 | 
						|
      rs_task.all_responsed_ = true;
 | 
						|
      ret = TESTTTLMGR.alter_status_and_add_ttl_task(tenant_id);
 | 
						|
      ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
      ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 0);
 | 
						|
      ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE));
 | 
						|
 | 
						|
      rs_task.all_responsed_ = true;
 | 
						|
      ret = TESTTTLMGR.alter_status_and_add_ttl_task(tenant_id);
 | 
						|
      ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
      ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 1);
 | 
						|
      ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE));
 | 
						|
 | 
						|
 | 
						|
      ret = TESTTTLMGR.process_tenant_tasks(tenant_id);
 | 
						|
      ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
      ASSERT_EQ(EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_), 1);
 | 
						|
      ASSERT_EQ(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_), static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE));
 | 
						|
   }
 | 
						|
}
 | 
						|
  TESTTTLMGR.reset_local_tenant_task();
 | 
						|
  TESTTTLMGR.delete_tenant(tenant_id);
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestRsTTL, ttl_config)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  uint32_t tenant_id = 1001;
 | 
						|
 | 
						|
  ret = TESTTTLMGR.add_tenant(tenant_id);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  ret = TESTTTLMGR.alter_status_and_add_ttl_task(tenant_id);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  
 | 
						|
  rootserver::ObTTLTenantTask* task_ptr = NULL;
 | 
						|
  ret = TESTTTLMGR.get_tenant_tasks_ptr(tenant_id, task_ptr);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  
 | 
						|
  rootserver::ObTTLTenantTask& task_ref = *task_ptr;
 | 
						|
  ASSERT_EQ(task_ref.tasks_.count(), 1);
 | 
						|
 | 
						|
  {
 | 
						|
    rootserver::RsTenantTask& rs_task = task_ref.tasks_.at(0);
 | 
						|
 | 
						|
    uint64_t send_server_cnt = TESTTTLMGR.snd_server_cnt_;
 | 
						|
    ret = TESTTTLMGR.process_tenant_tasks(tenant_id);
 | 
						|
    ASSERT_EQ(TESTTTLMGR.snd_server_cnt_, send_server_cnt + 1);
 | 
						|
 | 
						|
    TESTTTLMGR.active_time_ = false;
 | 
						|
    ret = TESTTTLMGR.process_tenant_tasks(tenant_id);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<ObTTLTaskStatus>(ObTTLTaskStatus::OB_RS_TTL_TASK_SUSPEND));
 | 
						|
 | 
						|
    TESTTTLMGR.active_time_ = true;
 | 
						|
    ret = TESTTTLMGR.process_tenant_tasks(tenant_id);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<ObTTLTaskStatus>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE));
 | 
						|
 | 
						|
    TESTTTLMGR.enable_ttl_ = false;
 | 
						|
    ret = TESTTTLMGR.process_tenant_tasks(tenant_id);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<ObTTLTaskStatus>(ObTTLTaskStatus::OB_RS_TTL_TASK_SUSPEND));
 | 
						|
 | 
						|
    TESTTTLMGR.enable_ttl_ = true;
 | 
						|
    ret = TESTTTLMGR.process_tenant_tasks(tenant_id);
 | 
						|
    ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
    ASSERT_EQ(rs_task.ttl_status_.status_, static_cast<ObTTLTaskStatus>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE));
 | 
						|
 | 
						|
  }
 | 
						|
}
 | 
						|
#endif
 | 
						|
 | 
						|
}  // namespace observer
 | 
						|
}  // namespace oceanbase
 | 
						|
 | 
						|
int main(int argc, char** argv)
 | 
						|
{
 | 
						|
  oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
 | 
						|
  OB_LOGGER.set_log_level("INFO");
 | 
						|
  OB_LOGGER.set_file_name("test_observer.log", true);
 | 
						|
  ::testing::InitGoogleTest(&argc, argv);
 | 
						|
  return RUN_ALL_TESTS();
 | 
						|
}
 | 
						|
 |