595 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			595 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#ifndef OCEANBASE_SLOG_SIMPLE_OB_PARTITION_IMAGE_H_
 | 
						|
#define OCEANBASE_SLOG_SIMPLE_OB_PARTITION_IMAGE_H_
 | 
						|
 | 
						|
#include "lib/list/ob_list.h"
 | 
						|
#include "lib/file/file_directory_utils.h"
 | 
						|
#include "lib/file/ob_file.h"
 | 
						|
#include "lib/random/ob_random.h"
 | 
						|
#include "lib/allocator/ob_mod_define.h"
 | 
						|
#include "storage/blocksstable/slog/ob_base_storage_logger.h"
 | 
						|
#include "storage/blocksstable/ob_block_sstable_struct.h"
 | 
						|
 | 
						|
namespace oceanbase {
 | 
						|
using namespace common;
 | 
						|
using namespace lib;
 | 
						|
namespace blocksstable {
 | 
						|
struct SimpleObPartition : public ObIBaseStorageLogEntry {
 | 
						|
  int64_t table_id_;
 | 
						|
  int64_t partition_id_;
 | 
						|
  int64_t log_seq_num_;
 | 
						|
  int64_t macro_block_cnt_;
 | 
						|
  int64_t macro_blocks_[1024];
 | 
						|
  bool operator==(const SimpleObPartition& partition);
 | 
						|
  bool operator!=(const SimpleObPartition& partition);
 | 
						|
  bool is_valid() const;
 | 
						|
  static void build(int64_t table_id, int64_t partition_id, int64_t macro_block_cnt, SimpleObPartition& partition);
 | 
						|
  TO_STRING_EMPTY();
 | 
						|
 | 
						|
public:
 | 
						|
  NEED_SERIALIZE_AND_DESERIALIZE;
 | 
						|
};
 | 
						|
 | 
						|
struct SimpleObPartitionNode {
 | 
						|
  SimpleObPartitionNode() : partition_(), next_(NULL)
 | 
						|
  {}
 | 
						|
  SimpleObPartition partition_;
 | 
						|
  SimpleObPartitionNode* next_;
 | 
						|
};
 | 
						|
 | 
						|
enum SimpleObPartitionOperation { ADD_PARTITION = 0, REMOVE_PARTITION = 1 };
 | 
						|
 | 
						|
class SimpleObPartitionImage : public ObIRedoModule {
 | 
						|
public:
 | 
						|
  SimpleObPartitionImage();
 | 
						|
  virtual ~SimpleObPartitionImage();
 | 
						|
  int init(const char* data_dir, ObBaseStorageLogger* redo_log);
 | 
						|
  virtual int read_check_point();
 | 
						|
  virtual int do_check_point();
 | 
						|
  // replay the redo log.
 | 
						|
  virtual int replay(const ObRedoModuleReplayParam& param) override;
 | 
						|
  virtual int parse(const int64_t subcmd, const char* buf, const int64_t len, FILE* stream) override;
 | 
						|
  virtual int enable_write_log() override
 | 
						|
  {
 | 
						|
    return common::OB_SUCCESS;
 | 
						|
  }
 | 
						|
  int add_partition(SimpleObPartition& partition);
 | 
						|
  int add_partition_no_commit(SimpleObPartition& partition);
 | 
						|
  int remove_partition(const int64_t table_id, const int64_t partition_id);
 | 
						|
  bool exist_partition(const int64_t table_id, const int64_t partition_id, int64_t* log_seq_num = NULL);
 | 
						|
  bool operator==(SimpleObPartitionImage& image);
 | 
						|
  bool operator!=(SimpleObPartitionImage& image);
 | 
						|
 | 
						|
public:
 | 
						|
  static const int64_t BUCKET_NUM = 1523;
 | 
						|
  static int64_t hash(const int64_t table_id, const int64_t partition_id);
 | 
						|
  int do_add_partition(const SimpleObPartition& partition);
 | 
						|
  void do_remove_partition(const int64_t table_id, const int64_t partition_id);
 | 
						|
  bool inited_;
 | 
						|
  common::ObArenaAllocator allocator_;
 | 
						|
  common::ObConcurrentFIFOAllocator pt_allocator_;
 | 
						|
  common::ObBucketLock buckets_lock_;
 | 
						|
  SimpleObPartitionNode** buckets_;
 | 
						|
  char data_file_[OB_MAX_FILE_NAME_LENGTH];
 | 
						|
  ObBaseStorageLogger* redo_log_;
 | 
						|
};
 | 
						|
 | 
						|
bool SimpleObPartition::operator==(const SimpleObPartition& partition)
 | 
						|
{
 | 
						|
  bool bret = true;
 | 
						|
 | 
						|
  if (table_id_ != partition.table_id_) {
 | 
						|
    bret = false;
 | 
						|
  } else if (partition_id_ != partition.partition_id_) {
 | 
						|
    bret = false;
 | 
						|
  } else if (macro_block_cnt_ != partition.macro_block_cnt_) {
 | 
						|
    bret = false;
 | 
						|
  } else {
 | 
						|
    for (int64_t i = 0; bret && i < macro_block_cnt_; i++) {
 | 
						|
      if (macro_blocks_[i] != partition.macro_blocks_[i]) {
 | 
						|
        bret = false;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return bret;
 | 
						|
}
 | 
						|
 | 
						|
bool SimpleObPartition::operator!=(const SimpleObPartition& partition)
 | 
						|
{
 | 
						|
  return !(*this == partition);
 | 
						|
}
 | 
						|
 | 
						|
bool SimpleObPartition::is_valid() const
 | 
						|
{
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
DEFINE_SERIALIZE(SimpleObPartition)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
 | 
						|
  if (OB_SUCCESS != (ret = serialization::encode_vi64(buf, buf_len, pos, table_id_))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to encode ", "table_id_", table_id_, "ret", ret);
 | 
						|
  } else if (OB_SUCCESS != (ret = serialization::encode_vi64(buf, buf_len, pos, partition_id_))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to encode ", "partition_id_", partition_id_, "ret", ret);
 | 
						|
  } else if (OB_SUCCESS != (ret = serialization::encode_vi64(buf, buf_len, pos, log_seq_num_))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to encode ", "log_seq_num_", log_seq_num_, "ret", ret);
 | 
						|
  } else if (OB_SUCCESS != (ret = serialization::encode_vi64(buf, buf_len, pos, macro_block_cnt_))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to encode ", "macro_block_cnt_", macro_block_cnt_, "ret", ret);
 | 
						|
  } else {
 | 
						|
    for (int64_t i = 0; OB_SUCC(ret) && i < macro_block_cnt_; ++i) {
 | 
						|
      if (OB_SUCCESS != (ret = serialization::encode_vi64(buf, buf_len, pos, macro_blocks_[i]))) {
 | 
						|
        STORAGE_REDO_LOG(WARN, "Fail to encode ", "macro_block", macro_blocks_[i], "ret", ret);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
DEFINE_DESERIALIZE(SimpleObPartition)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
 | 
						|
  if (OB_SUCCESS != (ret = serialization::decode_vi64(buf, data_len, pos, &table_id_))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to decode table_id_", "ret", ret);
 | 
						|
  } else if (OB_SUCCESS != (ret = serialization::decode_vi64(buf, data_len, pos, &partition_id_))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to decode partition_id_", "ret", ret);
 | 
						|
  } else if (OB_SUCCESS != (ret = serialization::decode_vi64(buf, data_len, pos, &log_seq_num_))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to decode log_seq_num_", "ret", ret);
 | 
						|
  } else if (OB_SUCCESS != (ret = serialization::decode_vi64(buf, data_len, pos, ¯o_block_cnt_))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to decode macro_block_cnt_", "ret", ret);
 | 
						|
  } else {
 | 
						|
    for (int64_t i = 0; OB_SUCC(ret) && i < macro_block_cnt_; ++i) {
 | 
						|
      if (OB_SUCCESS != (ret = serialization::decode_vi64(buf, data_len, pos, ¯o_blocks_[i]))) {
 | 
						|
        STORAGE_REDO_LOG(WARN, "Fail to decode macro_block", "ret", ret);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
DEFINE_GET_SERIALIZE_SIZE(SimpleObPartition)
 | 
						|
{
 | 
						|
  int64_t len = 0;
 | 
						|
  len += serialization::encoded_length(table_id_);
 | 
						|
  len += serialization::encoded_length(partition_id_);
 | 
						|
  len += serialization::encoded_length(log_seq_num_);
 | 
						|
  len += serialization::encoded_length(macro_block_cnt_);
 | 
						|
 | 
						|
  for (int64_t i = 0; i < macro_block_cnt_; ++i) {
 | 
						|
    len += serialization::encoded_length(macro_blocks_[i]);
 | 
						|
  }
 | 
						|
 | 
						|
  len += ObRandom::rand(0, 100);
 | 
						|
 | 
						|
  return len;
 | 
						|
}
 | 
						|
 | 
						|
void SimpleObPartition::build(
 | 
						|
    int64_t table_id, int64_t partition_id, int64_t macro_block_cnt, SimpleObPartition& partition)
 | 
						|
{
 | 
						|
  partition.table_id_ = table_id;
 | 
						|
  partition.partition_id_ = partition_id;
 | 
						|
  partition.macro_block_cnt_ = macro_block_cnt;
 | 
						|
  for (int64_t i = 0; i < partition.macro_block_cnt_; ++i) {
 | 
						|
    partition.macro_blocks_[i] = (table_id % (partition_id * macro_block_cnt + 1) + i);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
SimpleObPartitionImage::SimpleObPartitionImage()
 | 
						|
    : inited_(false), allocator_(ObModIds::TEST), buckets_(NULL), redo_log_(NULL)
 | 
						|
{}
 | 
						|
 | 
						|
SimpleObPartitionImage::~SimpleObPartitionImage()
 | 
						|
{
 | 
						|
  if (NULL != buckets_) {
 | 
						|
    SimpleObPartitionNode* iter = NULL;
 | 
						|
    SimpleObPartitionNode* tmp = NULL;
 | 
						|
 | 
						|
    for (int64_t pos = 0; pos < BUCKET_NUM; ++pos) {
 | 
						|
      for (iter = buckets_[pos]; iter != NULL; iter = tmp) {
 | 
						|
        tmp = iter->next_;
 | 
						|
        pt_allocator_.free(iter);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
int SimpleObPartitionImage::init(const char* data_dir, ObBaseStorageLogger* redo_log)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  void* tmp = NULL;
 | 
						|
 | 
						|
  if (inited_) {
 | 
						|
    ret = OB_INIT_TWICE;
 | 
						|
    STORAGE_REDO_LOG(WARN, "Init twice.");
 | 
						|
  } else if (NULL == data_dir || NULL == redo_log) {
 | 
						|
    ret = OB_INVALID_ARGUMENT;
 | 
						|
    STORAGE_REDO_LOG(WARN, "Invalid arguments, ", "data_dir", data_dir, "redo_log", redo_log);
 | 
						|
  } else if (OB_FAIL(redo_log->register_redo_module(OB_REDO_LOG_PARTITION, this))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to register redo log, ", "ret", ret);
 | 
						|
  } else if (OB_FAIL(buckets_lock_.init(BUCKET_NUM))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to init buckets lock, ", K(ret));
 | 
						|
  } else if (NULL == (tmp = allocator_.alloc(sizeof(SimpleObPartitionNode*) * BUCKET_NUM))) {
 | 
						|
    ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to allocate memory, ", K(ret));
 | 
						|
  } else if (OB_FAIL(pt_allocator_.init(1024 * 1024 * 1024, 16 * 1024 * 1024, 64 * 1024))) {
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to init pt allocator, ", K(ret));
 | 
						|
  } else {
 | 
						|
    buckets_ = new (tmp) SimpleObPartitionNode*[BUCKET_NUM];
 | 
						|
    MEMSET(buckets_, 0, sizeof(SimpleObPartitionNode*) * BUCKET_NUM);
 | 
						|
    snprintf(data_file_, OB_MAX_FILE_NAME_LENGTH, "%s/data", data_dir);
 | 
						|
    redo_log_ = redo_log;
 | 
						|
    inited_ = true;
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int SimpleObPartitionImage::read_check_point()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  bool exist = false;
 | 
						|
  if (OB_SUCCESS == FileDirectoryUtils::is_exists(data_file_, exist) && exist) {
 | 
						|
    ObString str;
 | 
						|
    ModuleArena allocator;
 | 
						|
    SimpleObPartition partition;
 | 
						|
    int64_t pos = 0;
 | 
						|
 | 
						|
    if (OB_SUCCESS != (ret = load_file_to_string(data_file_, allocator, str))) {
 | 
						|
      STORAGE_REDO_LOG(WARN, "Fail to load file to string, ", "ret", ret);
 | 
						|
    } else {
 | 
						|
      while (OB_SUCCESS == ret && pos < str.length()) {
 | 
						|
        if (OB_SUCCESS != (ret = partition.deserialize(str.ptr(), str.length(), pos))) {
 | 
						|
          STORAGE_REDO_LOG(WARN, "Fail to deserialize partition, ", "ret", ret);
 | 
						|
        } else if (OB_SUCCESS != (ret = do_add_partition(partition))) {
 | 
						|
          STORAGE_REDO_LOG(WARN, "Fail to add partition, ", "ret", ret);
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int SimpleObPartitionImage::do_check_point()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  ObFileAppender data_file;
 | 
						|
  int64_t pos = 0;
 | 
						|
  const int64_t len = 1024 * 16;
 | 
						|
  char buf[len];
 | 
						|
 | 
						|
  if (OB_SUCCESS != (ret = data_file.open(ObString(0, (int32_t)strlen(data_file_), data_file_), false, true, true))) {
 | 
						|
    ret = OB_IO_ERROR;
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to open checkpoint file, ", "ret", ret);
 | 
						|
  } else {
 | 
						|
    for (int64_t i = 0; OB_SUCC(ret) && i < BUCKET_NUM; ++i) {
 | 
						|
      common::ObBucketWLockGuard guard(buckets_lock_, i);
 | 
						|
      SimpleObPartitionNode* iter = NULL;
 | 
						|
      for (iter = buckets_[i]; OB_SUCC(ret) && iter != NULL; iter = iter->next_) {
 | 
						|
        pos = 0;
 | 
						|
        if (OB_SUCCESS != (ret = iter->partition_.serialize(buf, len, pos))) {
 | 
						|
          STORAGE_REDO_LOG(WARN, "Fail to serialize, ", "ret", ret);
 | 
						|
        } else if (OB_SUCCESS != (ret = data_file.append(buf, pos, true))) {
 | 
						|
          STORAGE_REDO_LOG(WARN, "Fail to append to data file, ", "ret", ret);
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    data_file.close();
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int SimpleObPartitionImage::replay(const ObRedoModuleReplayParam& param)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  const int64_t log_seq_num = param.log_seq_num_;
 | 
						|
  const int64_t subcmd = param.subcmd_;
 | 
						|
  const char* buf = param.buf_;
 | 
						|
  const int64_t len = param.buf_len_;
 | 
						|
  enum ObRedoLogMainType main_type;
 | 
						|
  int32_t sub_type = 0;
 | 
						|
  ObIRedoModule::parse_subcmd(param.subcmd_, main_type, sub_type);
 | 
						|
  UNUSED(log_seq_num);
 | 
						|
 | 
						|
  if (OB_REDO_LOG_PARTITION != main_type) {
 | 
						|
    ret = OB_ERR_UNEXPECTED;
 | 
						|
  } else {
 | 
						|
    switch (static_cast<enum SimpleObPartitionOperation>(sub_type)) {
 | 
						|
      case ADD_PARTITION: {
 | 
						|
        SimpleObPartition partition;
 | 
						|
        int64_t pos = 0;
 | 
						|
        if (OB_SUCCESS != (ret = partition.deserialize(buf, len, pos))) {
 | 
						|
          STORAGE_REDO_LOG(WARN, "Fail to deserialize partition, ", "ret", ret);
 | 
						|
        } else if (!exist_partition(partition.table_id_, partition.partition_id_)) {
 | 
						|
          partition.log_seq_num_ = log_seq_num;
 | 
						|
          if (OB_SUCCESS != (ret = do_add_partition(partition))) {
 | 
						|
            STORAGE_REDO_LOG(WARN, "Fail to do add partition, ", "ret", ret);
 | 
						|
          }
 | 
						|
        } else {
 | 
						|
          STORAGE_REDO_LOG(DEBUG,
 | 
						|
              "Recovery add partition: ",
 | 
						|
              "table_id",
 | 
						|
              partition.table_id_,
 | 
						|
              "partition_id",
 | 
						|
              partition.partition_id_);
 | 
						|
        }
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      case REMOVE_PARTITION: {
 | 
						|
        int64_t pos = 0;
 | 
						|
        int64_t table_id = 0;
 | 
						|
        int64_t partition_id = 0;
 | 
						|
        int64_t plog_seq_num = 0;
 | 
						|
        if (OB_SUCCESS != (ret = serialization::decode_vi64(buf, len, pos, &table_id))) {
 | 
						|
          STORAGE_REDO_LOG(WARN, "Fail to decode table_id, ", "ret", ret);
 | 
						|
        } else if (OB_SUCCESS != (ret = serialization::decode_vi64(buf, len, pos, &partition_id))) {
 | 
						|
          STORAGE_REDO_LOG(WARN, "Fail to decode partition_id, ", "ret", ret);
 | 
						|
        } else if (exist_partition(table_id, partition_id, &plog_seq_num)) {
 | 
						|
          if (log_seq_num > plog_seq_num) {
 | 
						|
            do_remove_partition(table_id, partition_id);
 | 
						|
          }
 | 
						|
        } else {
 | 
						|
          STORAGE_REDO_LOG(DEBUG, "recovery remove partition: ", "table_id", table_id, "partition_id", partition_id);
 | 
						|
        }
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      default: {
 | 
						|
        ret = OB_ERR_UNEXPECTED;
 | 
						|
        STORAGE_REDO_LOG(WARN, "Can not recognize the subcmd.");
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int SimpleObPartitionImage::parse(const int64_t subcmd, const char* buf, const int64_t len, FILE* stream)
 | 
						|
{
 | 
						|
  int ret = OB_NOT_SUPPORTED;
 | 
						|
  UNUSED(subcmd);
 | 
						|
  UNUSED(buf);
 | 
						|
  UNUSED(len);
 | 
						|
  UNUSED(stream);
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int SimpleObPartitionImage::add_partition(SimpleObPartition& partition)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  int64_t subcmd = ObIRedoModule::gen_subcmd(OB_REDO_LOG_PARTITION, ADD_PARTITION);
 | 
						|
  int64_t log_seq_num = 0;
 | 
						|
 | 
						|
  if (!inited_) {
 | 
						|
    ret = OB_NOT_INIT;
 | 
						|
    STORAGE_REDO_LOG(WARN, "Not init.");
 | 
						|
  } else {
 | 
						|
    int64_t pos = hash(partition.table_id_, partition.partition_id_) % BUCKET_NUM;
 | 
						|
    const ObStorageLogAttribute log_attr(OB_SYS_TENANT_ID, OB_VIRTUAL_DATA_FILE_ID);
 | 
						|
    common::ObBucketWLockGuard guard(buckets_lock_, pos);
 | 
						|
 | 
						|
    if (!exist_partition(partition.table_id_, partition.partition_id_)) {
 | 
						|
      if (OB_SUCCESS != (ret = redo_log_->begin(OB_LOG_CS_CREATE_PARTITION))) {
 | 
						|
        STORAGE_REDO_LOG(WARN, "Fail to start transaction, ", "ret", ret);
 | 
						|
      } else if (OB_SUCCESS != (ret = redo_log_->write_log(subcmd, log_attr, partition))) {
 | 
						|
        STORAGE_REDO_LOG(WARN, "Fail to write log, ", "ret", ret);
 | 
						|
      } else if (OB_SUCCESS != (ret = redo_log_->commit(log_seq_num))) {
 | 
						|
        STORAGE_REDO_LOG(WARN, "Fail to commit, ", "ret", ret);
 | 
						|
      } else {
 | 
						|
        partition.log_seq_num_ = log_seq_num;
 | 
						|
        if (OB_SUCCESS != (ret = do_add_partition(partition))) {
 | 
						|
          STORAGE_REDO_LOG(ERROR, "Fail to add partition, ", "ret", ret);
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      if (OB_FAIL(ret)) {
 | 
						|
        redo_log_->abort();
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int SimpleObPartitionImage::add_partition_no_commit(SimpleObPartition& partition)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  int64_t subcmd = ObIRedoModule::gen_subcmd(OB_REDO_LOG_PARTITION, ADD_PARTITION);
 | 
						|
  if (!inited_) {
 | 
						|
    ret = OB_NOT_INIT;
 | 
						|
    STORAGE_REDO_LOG(WARN, "Not init.");
 | 
						|
  } else {
 | 
						|
    const ObStorageLogAttribute log_attr(OB_SYS_TENANT_ID, OB_VIRTUAL_DATA_FILE_ID);
 | 
						|
    if (OB_SUCCESS != (ret = redo_log_->write_log(subcmd, log_attr, partition))) {
 | 
						|
      STORAGE_REDO_LOG(WARN, "Fail to write log, ", "ret", ret);
 | 
						|
    } else {
 | 
						|
      if (OB_SUCCESS != (ret = do_add_partition(partition))) {
 | 
						|
        STORAGE_REDO_LOG(WARN, "Fail to add partition, ", "ret", ret);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int SimpleObPartitionImage::remove_partition(const int64_t table_id, const int64_t partition_id)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  int64_t subcmd = ObIRedoModule::gen_subcmd(OB_REDO_LOG_PARTITION, REMOVE_PARTITION);
 | 
						|
 | 
						|
  int64_t log_seq_num = 0;
 | 
						|
 | 
						|
  if (!inited_) {
 | 
						|
    ret = OB_NOT_INIT;
 | 
						|
    STORAGE_REDO_LOG(WARN, "Not init.");
 | 
						|
  } else {
 | 
						|
    SimpleObPartition partition;
 | 
						|
    SimpleObPartition::build(table_id, partition_id, 1, partition);
 | 
						|
    const ObStorageLogAttribute log_attr(OB_SYS_TENANT_ID, OB_VIRTUAL_DATA_FILE_ID);
 | 
						|
    if (OB_SUCCESS != (ret = redo_log_->begin(OB_LOG_CS_DEL_PARTITION))) {
 | 
						|
      STORAGE_REDO_LOG(WARN, "Fail to start transaction, ", "ret", ret);
 | 
						|
    } else if (OB_SUCCESS != (ret = redo_log_->write_log(subcmd, log_attr, partition))) {
 | 
						|
      STORAGE_REDO_LOG(WARN, "Fail to write log, ", "ret", ret);
 | 
						|
    } else {
 | 
						|
      int64_t pos = hash(table_id, partition_id) % BUCKET_NUM;
 | 
						|
      common::ObBucketWLockGuard guard(buckets_lock_, pos);
 | 
						|
 | 
						|
      if (!exist_partition(table_id, partition_id)) {
 | 
						|
        ret = OB_ENTRY_NOT_EXIST;
 | 
						|
      } else if (OB_SUCCESS != (ret = redo_log_->commit(log_seq_num))) {
 | 
						|
        STORAGE_REDO_LOG(WARN, "Fail to commit trans, ", "ret", ret);
 | 
						|
      } else {
 | 
						|
        do_remove_partition(table_id, partition_id);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    if (OB_FAIL(ret)) {
 | 
						|
      redo_log_->abort();
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
bool SimpleObPartitionImage::exist_partition(const int64_t table_id, const int64_t partition_id, int64_t* log_seq_num)
 | 
						|
{
 | 
						|
  bool bret = false;
 | 
						|
  int64_t pos = hash(table_id, partition_id) % BUCKET_NUM;
 | 
						|
  SimpleObPartitionNode* iter = NULL;
 | 
						|
 | 
						|
  for (iter = buckets_[pos]; iter != NULL; iter = iter->next_) {
 | 
						|
    if (iter->partition_.table_id_ == table_id && iter->partition_.partition_id_ == partition_id) {
 | 
						|
      bret = true;
 | 
						|
      if (NULL != log_seq_num) {
 | 
						|
        *log_seq_num = iter->partition_.log_seq_num_;
 | 
						|
      }
 | 
						|
      break;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return bret;
 | 
						|
}
 | 
						|
 | 
						|
bool SimpleObPartitionImage::operator==(SimpleObPartitionImage& image)
 | 
						|
{
 | 
						|
  bool bret = true;
 | 
						|
  int64_t i = 0;
 | 
						|
  SimpleObPartitionNode* iter1 = NULL;
 | 
						|
  SimpleObPartitionNode* iter2 = NULL;
 | 
						|
 | 
						|
  for (i = 0; bret && i < BUCKET_NUM; ++i) {
 | 
						|
    for (iter1 = buckets_[i], iter2 = image.buckets_[i]; bret && iter1 != NULL && iter2 != NULL;
 | 
						|
         iter1 = iter1->next_, iter2 = iter2->next_) {
 | 
						|
      if ((iter1->partition_) != (iter2->partition_)) {
 | 
						|
        bret = false;
 | 
						|
        STORAGE_LOG(ERROR,
 | 
						|
            "iter different, ",
 | 
						|
            K(iter1->partition_.partition_id_),
 | 
						|
            K(iter2->partition_.partition_id_),
 | 
						|
            K(iter1->partition_.macro_block_cnt_),
 | 
						|
            K(iter2->partition_.macro_block_cnt_));
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    if (iter1 != NULL || iter2 != NULL) {
 | 
						|
      bret = false;
 | 
						|
    }
 | 
						|
    OB_ASSERT(bret);
 | 
						|
  }
 | 
						|
 | 
						|
  return bret;
 | 
						|
}
 | 
						|
 | 
						|
bool SimpleObPartitionImage::operator!=(SimpleObPartitionImage& image)
 | 
						|
{
 | 
						|
  return !(*this == image);
 | 
						|
}
 | 
						|
 | 
						|
int64_t SimpleObPartitionImage::hash(const int64_t table_id, const int64_t partition_id)
 | 
						|
{
 | 
						|
  return table_id + (partition_id << 16);
 | 
						|
}
 | 
						|
 | 
						|
int SimpleObPartitionImage::do_add_partition(const SimpleObPartition& partition)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  int64_t pos = hash(partition.table_id_, partition.partition_id_) % BUCKET_NUM;
 | 
						|
  SimpleObPartitionNode* iter = NULL;
 | 
						|
  SimpleObPartitionNode* node = NULL;
 | 
						|
  void* tmp = NULL;
 | 
						|
 | 
						|
  for (iter = buckets_[pos]; iter != NULL; iter = iter->next_) {
 | 
						|
    if (iter->partition_.table_id_ == partition.table_id_ &&
 | 
						|
        iter->partition_.partition_id_ == partition.partition_id_) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (iter != NULL) {
 | 
						|
    ret = OB_ENTRY_EXIST;
 | 
						|
  } else if (NULL == (tmp = pt_allocator_.alloc(sizeof(SimpleObPartitionNode)))) {
 | 
						|
    ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
						|
    STORAGE_REDO_LOG(WARN, "Fail to allocate memory, ", K(ret));
 | 
						|
  } else {
 | 
						|
    node = new (tmp) SimpleObPartitionNode();
 | 
						|
    node->partition_ = partition;
 | 
						|
    node->next_ = NULL;
 | 
						|
    for (iter = buckets_[pos]; iter != NULL && iter->next_ != NULL; iter = iter->next_) {}
 | 
						|
    if (NULL == iter) {
 | 
						|
      buckets_[pos] = node;
 | 
						|
    } else {
 | 
						|
      iter->next_ = node;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
void SimpleObPartitionImage::do_remove_partition(const int64_t table_id, const int64_t partition_id)
 | 
						|
{
 | 
						|
  int64_t pos = hash(table_id, partition_id) % BUCKET_NUM;
 | 
						|
  SimpleObPartitionNode* iter = NULL;
 | 
						|
  SimpleObPartitionNode* prev = NULL;
 | 
						|
 | 
						|
  for (iter = buckets_[pos]; iter != NULL; iter = iter->next_) {
 | 
						|
    if (iter->partition_.table_id_ == table_id && iter->partition_.partition_id_ == partition_id) {
 | 
						|
      break;
 | 
						|
    } else {
 | 
						|
      prev = iter;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (iter != NULL) {
 | 
						|
    if (NULL == prev) {
 | 
						|
      buckets_[pos] = iter->next_;
 | 
						|
    } else {
 | 
						|
      prev->next_ = iter->next_;
 | 
						|
    }
 | 
						|
    pt_allocator_.free(iter);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace blocksstable
 | 
						|
}  // end namespace oceanbase
 | 
						|
 | 
						|
#endif  // OCEANBASE_SLOG_SIMPLE_OB_PARTITION_IMAGE_H_
 |