fix meta cache and fix problem about table store serialize size exceeds 2MB

This commit is contained in:
obdev
2024-02-07 04:22:59 +00:00
committed by ob-robot
parent c8d8bb4c25
commit 18ec56fc76
13 changed files with 438 additions and 271 deletions

View File

@ -61,17 +61,17 @@ int ObSSTableMetaHandle::get_sstable_meta(const ObSSTableMeta *&sstable_meta) co
ObSSTableMetaCache::ObSSTableMetaCache()
: version_(0),
upper_trans_version_(0),
max_merged_trans_version_(0),
: header_(0),
data_macro_block_count_(0),
row_count_(0),
occupy_size_(0),
data_checksum_(0),
total_macro_block_count_(0),
reuse_macro_block_count_(0),
nested_size_(0),
nested_offset_(0),
total_macro_block_count_(0),
total_use_old_macro_block_count_(0),
row_count_(0),
occupy_size_(0),
max_merged_trans_version_(0),
data_checksum_(0),
upper_trans_version_(0),
filled_tx_scn_(share::SCN::min_scn()),
contain_uncommitted_row_(false)
{
@ -79,22 +79,24 @@ ObSSTableMetaCache::ObSSTableMetaCache()
void ObSSTableMetaCache::reset()
{
version_ = 0;
upper_trans_version_ = 0;
max_merged_trans_version_ = 0;
header_ = 0;
data_macro_block_count_ = 0;
row_count_ = 0;
occupy_size_ = 0;
data_checksum_ = 0;
total_macro_block_count_ = 0;
reuse_macro_block_count_ = 0;
nested_size_ = 0;
nested_offset_ = 0;
filled_tx_scn_ = share::SCN::min_scn();
total_macro_block_count_ = 0;
total_use_old_macro_block_count_ = 0;
row_count_ = 0;
occupy_size_ = 0;
max_merged_trans_version_ = 0;
data_checksum_ = 0;
upper_trans_version_ = 0;
filled_tx_scn_.set_min();
contain_uncommitted_row_ = false;
}
int ObSSTableMetaCache::init(blocksstable::ObSSTableMeta *meta)
int ObSSTableMetaCache::init(
const blocksstable::ObSSTableMeta *meta,
const bool has_multi_version_row)
{
int ret = OB_SUCCESS;
@ -102,19 +104,23 @@ int ObSSTableMetaCache::init(blocksstable::ObSSTableMeta *meta)
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid argument", K(ret), KPC(meta));
} else {
reset();
version_ = SSTABLE_META_CACHE_VERSION;
upper_trans_version_ = meta->get_upper_trans_version();
max_merged_trans_version_ = meta->get_max_merged_trans_version();
data_macro_block_count_ = meta->get_data_macro_block_count();
has_multi_version_row_ = has_multi_version_row;
status_ = NORMAL;
data_macro_block_count_ = (int32_t) meta->get_data_macro_block_count();
nested_size_ = (int32_t) meta->get_macro_info().get_nested_size();
nested_offset_ = (int32_t) meta->get_macro_info().get_nested_offset();
total_macro_block_count_ = (int32_t) meta->get_total_macro_block_count();
total_use_old_macro_block_count_ = (int32_t) meta->get_total_use_old_macro_block_count();
row_count_ = meta->get_row_count();
occupy_size_ = meta->get_occupy_size();
max_merged_trans_version_ = meta->get_max_merged_trans_version();
data_checksum_ = meta->get_data_checksum();
total_macro_block_count_ = (int32_t) meta->get_total_macro_block_count();
reuse_macro_block_count_ = (int32_t) meta->get_total_use_old_macro_block_count();
contain_uncommitted_row_ = meta->contain_uncommitted_row();
nested_size_ = meta->get_macro_info().get_nested_size();
nested_offset_ = meta->get_macro_info().get_nested_offset();
upper_trans_version_ = meta->get_upper_trans_version();
filled_tx_scn_ = meta->get_filled_tx_scn();
contain_uncommitted_row_ = meta->contain_uncommitted_row();
}
return ret;
}
@ -123,19 +129,24 @@ OB_DEF_SERIALIZE_SIMPLE(ObSSTableMetaCache)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_ENCODE,
version_,
upper_trans_version_,
max_merged_trans_version_,
data_macro_block_count_,
row_count_,
occupy_size_,
data_checksum_,
total_macro_block_count_,
reuse_macro_block_count_,
nested_size_,
nested_offset_,
filled_tx_scn_,
contain_uncommitted_row_);
header_,
data_macro_block_count_,
nested_size_,
nested_offset_,
total_macro_block_count_,
total_use_old_macro_block_count_,
row_count_,
occupy_size_,
max_merged_trans_version_);
if (has_multi_version_row_) {
LST_DO_CODE(OB_UNIS_ENCODE,
upper_trans_version_,
filled_tx_scn_,
contain_uncommitted_row_);
} else {
LST_DO_CODE(OB_UNIS_ENCODE, data_checksum_);
}
return ret;
}
@ -143,19 +154,24 @@ OB_DEF_DESERIALIZE_SIMPLE(ObSSTableMetaCache)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_DECODE,
version_,
upper_trans_version_,
max_merged_trans_version_,
data_macro_block_count_,
row_count_,
occupy_size_,
data_checksum_,
total_macro_block_count_,
reuse_macro_block_count_,
nested_size_,
nested_offset_,
filled_tx_scn_,
contain_uncommitted_row_);
header_,
data_macro_block_count_,
nested_size_,
nested_offset_,
total_macro_block_count_,
total_use_old_macro_block_count_,
row_count_,
occupy_size_,
max_merged_trans_version_);
if (has_multi_version_row_) {
LST_DO_CODE(OB_UNIS_DECODE,
upper_trans_version_,
filled_tx_scn_,
contain_uncommitted_row_);
} else {
LST_DO_CODE(OB_UNIS_DECODE, data_checksum_);
}
return ret;
}
@ -163,22 +179,67 @@ OB_DEF_SERIALIZE_SIZE_SIMPLE(ObSSTableMetaCache)
{
int64_t len = 0;
LST_DO_CODE(OB_UNIS_ADD_LEN,
version_,
upper_trans_version_,
max_merged_trans_version_,
data_macro_block_count_,
row_count_,
occupy_size_,
data_checksum_,
total_macro_block_count_,
reuse_macro_block_count_,
nested_size_,
nested_offset_,
filled_tx_scn_,
contain_uncommitted_row_);
header_,
data_macro_block_count_,
nested_size_,
nested_offset_,
total_macro_block_count_,
total_use_old_macro_block_count_,
row_count_,
occupy_size_,
max_merged_trans_version_);
if (has_multi_version_row_) {
LST_DO_CODE(OB_UNIS_ADD_LEN,
upper_trans_version_,
filled_tx_scn_,
contain_uncommitted_row_);
} else {
LST_DO_CODE(OB_UNIS_ADD_LEN, data_checksum_);
}
return len;
}
int ObSSTableMetaCache::deserialize_for_compat(
const bool has_multi_version_row,
const char *buf,
const int64_t data_len,
int64_t &pos)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(nullptr == buf || data_len < 0 || data_len < pos)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
// SSTABLE_VERSION, need do a new ckpt to rewrite to SSTABLE_VERSION_V2
int64_t data_macro_block_count = 0;
int64_t nested_size = 0;
int64_t nested_offset = 0;
LST_DO_CODE(OB_UNIS_DECODE,
upper_trans_version_,
max_merged_trans_version_,
data_macro_block_count,
nested_size,
nested_offset,
contain_uncommitted_row_,
filled_tx_scn_);
if (OB_SUCC(ret)) {
version_ = SSTABLE_META_CACHE_VERSION;
has_multi_version_row_ = has_multi_version_row;
data_macro_block_count_ = static_cast<int32_t>(data_macro_block_count);
nested_size_ = static_cast<int32_t>(nested_size);
nested_offset_ = static_cast<int32_t>(nested_offset);
status_ = PADDING;
}
}
return ret;
}
ObSSTable::ObSSTable()
: addr_(),
@ -1161,7 +1222,7 @@ int ObSSTable::deserialize(common::ObArenaAllocator &allocator,
LOG_WARN("fail to transform root block data", K(ret));
} else if (OB_FAIL(check_valid_for_reading())) {
LOG_WARN("fail to check valid for reading", K(ret));
} else if (OB_FAIL(meta_cache_.init(meta_))) { // for compat
} else if (OB_FAIL(meta_cache_.init(meta_, is_multi_version_table()))) { // for compat
LOG_WARN("fail to init meta cache with meta", K(ret));
}
} else {
@ -1245,7 +1306,6 @@ int ObSSTable::serialize_fixed_struct(char *buf, const int64_t buf_len, int64_t
int ObSSTable::deserialize_fixed_struct(const char *buf, const int64_t data_len, int64_t &pos)
{
int ret = OB_SUCCESS;
int64_t old_pos = pos;
if (OB_ISNULL(buf) || OB_UNLIKELY(data_len < 0 || data_len < pos)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
@ -1261,22 +1321,17 @@ int ObSSTable::deserialize_fixed_struct(const char *buf, const int64_t data_len,
} else if (OB_UNLIKELY(pos + payload_size > data_len)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("semi deserialize buffer not enough", K(ret), K(pos), K(payload_size), K(data_len));
} else if (version == SSTABLE_VERSION_V2) {
LST_DO_CODE(OB_UNIS_DECODE,
addr_,
meta_cache_);
} else {
// SSTABLE_VERSION, need do a new ckpt to rewrite to SSTABLE_VERSION_V2
LST_DO_CODE(OB_UNIS_DECODE,
addr_,
meta_cache_.upper_trans_version_,
meta_cache_.max_merged_trans_version_,
meta_cache_.data_macro_block_count_,
meta_cache_.nested_size_,
meta_cache_.nested_offset_,
meta_cache_.contain_uncommitted_row_,
meta_cache_.filled_tx_scn_);
LST_DO_CODE(OB_UNIS_DECODE, addr_);
}
if (OB_FAIL(ret)) {
} else if (version == SSTABLE_VERSION_V2) {
LST_DO_CODE(OB_UNIS_DECODE, meta_cache_);
} else if (OB_FAIL(meta_cache_.deserialize_for_compat(is_multi_version_table(), buf, data_len, pos))) {
LOG_WARN("failed to deserialize meta cache for compat", K(ret));
}
if (OB_SUCC(ret)) {
valid_for_reading_ = key_.is_valid();
}
@ -1883,7 +1938,7 @@ int ObSSTable::init_sstable_meta(
LOG_WARN("fail to init sstable meta", K(ret));
} else if (OB_FAIL(meta_->transform_root_block_extra_buf(*allocator))) {
LOG_WARN("fail to transform root block data", K(ret));
} else if (OB_FAIL(meta_cache_.init(meta_))) {
} else if (OB_FAIL(meta_cache_.init(meta_, is_multi_version_table()))) {
LOG_WARN("fail to init meta cache with meta", K(ret));
}
}

View File

@ -13,6 +13,7 @@
#ifndef OCEANBASE_STORAGE_BLOCKSSTABLE_OB_SSTABLE_H
#define OCEANBASE_STORAGE_BLOCKSSTABLE_OB_SSTABLE_H
#include "lib/oblog/ob_log_module.h"
#include "storage/meta_mem/ob_storage_meta_cache.h"
#include "storage/blocksstable/ob_sstable_meta.h"
#include "storage/blocksstable/ob_macro_block_meta.h"
@ -49,15 +50,15 @@ public:
void reset();
int get_sstable_meta(const ObSSTableMeta *&sstable_meta) const;
OB_INLINE bool is_valid() { return nullptr != meta_ && meta_->is_valid(); }
OB_INLINE bool is_valid() const { return nullptr != meta_ && meta_->is_valid(); }
OB_INLINE const ObSSTableMeta &get_sstable_meta() const
{
OB_ASSERT(nullptr != meta_);
return *meta_;
}
OB_INLINE const ObStorageMetaHandle &get_storage_handle() const { return handle_; }
TO_STRING_KV(K_(handle), KPC_(meta));
private:
friend class ObSSTable;
public:
ObStorageMetaHandle handle_;
const ObSSTableMeta *meta_;
};
@ -67,31 +68,48 @@ private:
struct ObSSTableMetaCache
{
public:
enum SSTableCacheStatus: uint8_t {
INVALID = 0,
PADDING = 1,
NORMAL = 2
};
static const int SSTABLE_META_CACHE_VERSION = 1;
ObSSTableMetaCache();
~ObSSTableMetaCache() = default;
void reset();
int init(blocksstable::ObSSTableMeta *meta);
int init(const blocksstable::ObSSTableMeta *meta, const bool has_multi_version_row = false);
bool is_valid() const { return version_ >= SSTABLE_META_CACHE_VERSION; }
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const;
int deserialize(const char *buf, const int64_t data_len, int64_t &pos);
int deserialize_for_compat(const bool has_multi_version_row, const char *buf, const int64_t data_len, int64_t &pos);
int64_t get_serialize_size() const;
TO_STRING_KV(K_(upper_trans_version), K_(max_merged_trans_version), K_(data_macro_block_count),
K_(row_count), K_(occupy_size), K_(total_macro_block_count), K_(reuse_macro_block_count),
K_(data_checksum), K_(filled_tx_scn), K_(contain_uncommitted_row));
TO_STRING_KV(K_(version), K_(has_multi_version_row), K_(status), K_(data_macro_block_count), K_(nested_size), K_(nested_offset),
K_(total_macro_block_count), K_(total_use_old_macro_block_count), K_(row_count), K_(occupy_size), K_(data_checksum),
K_(max_merged_trans_version), K_(upper_trans_version), K_(filled_tx_scn), K_(contain_uncommitted_row));
public:
int64_t version_; // maybe we could use version to help use to see how many members could be accessed
int64_t upper_trans_version_;
int64_t max_merged_trans_version_;
int64_t data_macro_block_count_;
union {
uint32_t header_;
struct {
uint32_t version_ : 8;
uint32_t has_multi_version_row_ : 1;
uint32_t status_ : 2;
uint32_t reserved_ : 21;
};
};
int32_t data_macro_block_count_;
int32_t nested_size_;
int32_t nested_offset_;
int32_t total_macro_block_count_;
int32_t total_use_old_macro_block_count_;
int64_t row_count_;
int64_t occupy_size_;
int64_t max_merged_trans_version_;
// major sstable fields
int64_t data_checksum_;
int32_t total_macro_block_count_;
int32_t reuse_macro_block_count_;
int64_t nested_size_;
int64_t nested_offset_;
// mini sstable fields
int64_t upper_trans_version_;
share::SCN filled_tx_scn_;
bool contain_uncommitted_row_;
};
@ -206,13 +224,24 @@ public:
int set_addr(const ObMetaDiskAddr &addr);
OB_INLINE const ObMetaDiskAddr &get_addr() const { return addr_; }
OB_INLINE int64_t get_data_macro_block_count() const { return meta_cache_.data_macro_block_count_; }
OB_INLINE int64_t get_row_count() const { return meta_cache_.row_count_; }
OB_INLINE int64_t get_occupy_size() const { return meta_cache_.occupy_size_; }
OB_INLINE int64_t get_data_checksum() const { return meta_cache_.data_checksum_; }
OB_INLINE int64_t get_total_macro_block_count() const { return meta_cache_.total_macro_block_count_; }
OB_INLINE int64_t get_total_use_old_macro_block_count() const { return meta_cache_.reuse_macro_block_count_; }
OB_INLINE int64_t get_macro_offset() const { return meta_cache_.nested_offset_; }
OB_INLINE int64_t get_macro_read_size() const { return meta_cache_.nested_size_; }
#define GET_SSTABLE_META_DEFINE_FUNC(var_type, var_name) \
var_type get_##var_name() const { \
var_type val = meta_cache_. var_name##_; \
if (OB_UNLIKELY(ObSSTableMetaCache::NORMAL != meta_cache_.status_)) { \
COMMON_LOG_RET(ERROR, OB_ERR_UNEXPECTED, \
"sstable meta cache not valid", K(meta_cache_), KPC(this)); \
} \
return val; \
}
GET_SSTABLE_META_DEFINE_FUNC(int64_t, row_count);
GET_SSTABLE_META_DEFINE_FUNC(int64_t, occupy_size);
GET_SSTABLE_META_DEFINE_FUNC(int64_t, data_checksum);
GET_SSTABLE_META_DEFINE_FUNC(int64_t, total_macro_block_count);
GET_SSTABLE_META_DEFINE_FUNC(int64_t, total_use_old_macro_block_count);
OB_INLINE bool is_small_sstable() const
{
return OB_DEFAULT_MACRO_BLOCK_SIZE != meta_cache_.nested_size_ && 0 < meta_cache_.nested_offset_;

View File

@ -379,6 +379,7 @@ ObSSTableMeta::ObSSTableMeta()
: basic_meta_(),
data_root_info_(),
macro_info_(),
cg_sstables_(),
column_checksums_(nullptr),
column_checksum_count_(0),
is_inited_(false)
@ -536,7 +537,15 @@ int ObSSTableMeta::init(
} else if (OB_UNLIKELY(!check_meta())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to check meta", K(ret), K(*this));
} else {
} else if (param.table_key_.is_co_sstable()) {
if (param.is_empty_co_table_) {
// empty co sstable no need to init cg
} else if (OB_FAIL(cg_sstables_.init_empty_array_for_cg(allocator, param.column_group_cnt_ - 1/*exclude basic cg*/))) {
LOG_WARN("failed to alloc memory for cg sstable array", K(ret), K(param));
}
}
if (OB_SUCC(ret)) {
if (param.is_ready_for_read_) {
basic_meta_.status_ = SSTABLE_READY_FOR_READ;
}
@ -548,6 +557,20 @@ int ObSSTableMeta::init(
return ret;
}
int ObSSTableMeta::fill_cg_sstables(
common::ObArenaAllocator &allocator,
const common::ObIArray<ObITable *> &cg_tables)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(cg_sstables_.count() != cg_tables.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cg table count unexpected not match", K(ret), K(cg_sstables_), K(cg_tables.count()));
} else if (OB_FAIL(cg_sstables_.add_tables_for_cg(allocator, cg_tables))) {
LOG_WARN("failed to add cg sstables", K(ret), K(cg_tables));
}
return ret;
}
int ObSSTableMeta::serialize(char *buf, const int64_t buf_len, int64_t &pos) const
{
int ret = OB_SUCCESS;
@ -584,6 +607,8 @@ int ObSSTableMeta::serialize_(char *buf, const int64_t buf_len, int64_t &pos) co
LOG_WARN("fail to serialize data root info", K(ret), K(buf_len), K(pos), K(data_root_info_));
} else if (OB_FAIL(macro_info_.serialize(buf, buf_len, pos))) {
LOG_WARN("fail to serialize macro info", K(ret), K(buf_len), K(pos), K(macro_info_));
} else if (OB_FAIL(cg_sstables_.serialize(buf, buf_len, pos))) {
LOG_WARN("fail to serialize cg sstables", K(ret), K(buf_len), K(pos), K(cg_sstables_));
}
}
return ret;
@ -612,7 +637,7 @@ int ObSSTableMeta::deserialize(
} else if (OB_UNLIKELY(version != SSTABLE_META_VERSION)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("object version mismatch", K(ret), K(version));
} else if (OB_FAIL(deserialize_(allocator, buf + pos, data_len, tmp_pos))) {
} else if (OB_FAIL(deserialize_(allocator, buf + pos, len, tmp_pos))) {
LOG_WARN("fail to deserialize_", K(ret), K(data_len), K(tmp_pos), K(pos));
} else if (OB_UNLIKELY(len != tmp_pos)) {
ret = OB_ERR_UNEXPECTED;
@ -657,6 +682,8 @@ int ObSSTableMeta::deserialize_(
LOG_WARN("fail to deserialize data root info", K(ret), K(data_len), K(pos), K(des_meta));
} else if (OB_FAIL(macro_info_.deserialize(allocator, des_meta, buf, data_len, pos))) {
LOG_WARN("fail to deserialize macro info", K(ret), K(data_len), K(pos), K(des_meta));
} else if (pos < data_len && OB_FAIL(cg_sstables_.deserialize(allocator, buf, data_len, pos))) {
LOG_WARN("fail to deserialize cg sstables", K(ret), K(data_len), K(pos));
}
}
return ret;
@ -680,6 +707,7 @@ int64_t ObSSTableMeta::get_serialize_size_() const
OB_UNIS_ADD_LEN_ARRAY(column_checksums_, column_checksum_count_);
len += data_root_info_.get_serialize_size();
len += macro_info_.get_serialize_size();
len += cg_sstables_.get_serialize_size();
return len;
}
@ -687,7 +715,8 @@ int64_t ObSSTableMeta::get_variable_size() const
{
return sizeof(int64_t) * column_checksum_count_ // column checksums
+ data_root_info_.get_variable_size()
+ macro_info_.get_variable_size();
+ macro_info_.get_variable_size()
+ cg_sstables_.get_deep_copy_size();
}
int ObSSTableMeta::deep_copy(
@ -715,6 +744,8 @@ int ObSSTableMeta::deep_copy(
LOG_WARN("fail to deep copy data root info", K(ret), KP(buf), K(buf_len), K(pos), K(data_root_info_));
} else if (OB_FAIL(macro_info_.deep_copy(buf, buf_len, pos, dest->macro_info_))) {
LOG_WARN("fail to deep copy macro info", K(ret), KP(buf), K(buf_len), K(pos), K(macro_info_));
} else if (OB_FAIL(cg_sstables_.deep_copy(buf, buf_len, pos, dest->cg_sstables_))) {
LOG_WARN("fail to deep copy cg sstables", K(ret), KP(buf), K(buf_len), K(pos), K(cg_sstables_));
} else {
dest->is_inited_ = is_inited_;
}

View File

@ -18,6 +18,7 @@
#include "storage/ob_i_table.h"
#include "storage/blocksstable/index_block/ob_sstable_meta_info.h"
#include "share/scn.h"
#include "storage/tablet/ob_table_store_util.h"
namespace oceanbase
{
@ -140,9 +141,12 @@ public:
ObSSTableMeta();
~ObSSTableMeta();
int init(const storage::ObTabletCreateSSTableParam &param, common::ObArenaAllocator &allocator);
int fill_cg_sstables(common::ObArenaAllocator &allocator, const common::ObIArray<ObITable *> &cg_tables);
void reset();
OB_INLINE bool is_valid() const { return is_inited_; }
OB_INLINE bool contain_uncommitted_row() const { return basic_meta_.contain_uncommitted_row_; }
OB_INLINE ObSSTableArray &get_cg_sstables() { return cg_sstables_; }
OB_INLINE const ObSSTableArray &get_cg_sstables() const { return cg_sstables_; }
OB_INLINE bool is_empty() const {
return 0 == basic_meta_.data_macro_block_count_;
}
@ -238,7 +242,7 @@ public:
const int64_t buf_len,
int64_t &pos,
ObSSTableMeta *&dest) const;
TO_STRING_KV(K_(basic_meta), KP_(column_checksums), K_(column_checksum_count), K_(data_root_info), K_(macro_info));
TO_STRING_KV(K_(basic_meta), KP_(column_checksums), K_(column_checksum_count), K_(data_root_info), K_(macro_info), K_(cg_sstables));
private:
bool check_meta() const;
int init_base_meta(const ObTabletCreateSSTableParam &param, common::ObArenaAllocator &allocator);
@ -262,6 +266,7 @@ private:
ObSSTableBasicMeta basic_meta_;
ObRootBlockInfo data_root_info_;
ObSSTableMacroInfo macro_info_;
ObSSTableArray cg_sstables_;
int64_t *column_checksums_;
int64_t column_checksum_count_;
// The following fields don't to persist

View File

@ -147,7 +147,6 @@ int ObCOSSTableMeta::deserialize(const char *buf, const int64_t data_len, int64_
/************************************* ObCOSSTableV2 *************************************/
ObCOSSTableV2::ObCOSSTableV2()
: ObSSTable(),
cg_sstables_(),
cs_meta_(),
base_type_(ObCOSSTableBaseType::INVALID_TYPE),
is_empty_co_(false),
@ -165,7 +164,6 @@ void ObCOSSTableV2::reset()
{
ObSSTable::reset();
cs_meta_.reset();
cg_sstables_.reset();
valid_for_cs_reading_ = false;
tmp_allocator_.reset();
}
@ -185,13 +183,9 @@ int ObCOSSTableV2::init(
LOG_WARN("get invalid arguments", K(ret), K(param), K(allocator));
} else if (OB_FAIL(ObSSTable::init(param, allocator))) {
LOG_WARN("failed to init basic ObSSTable", K(ret), K(param));
// TODO(@zhouxinlan.zxl) use allocator in param
} else if (param.is_empty_co_table_) {
// current co sstable is empty, no need to init cg sstable
cs_meta_.column_group_cnt_ = param.column_group_cnt_;
} else if (OB_FAIL(cg_sstables_.init_empty_array_for_cg(
tmp_allocator_, param.column_group_cnt_ - 1/*should reduce the basic cg*/))) {
LOG_WARN("failed to alloc memory for cg sstable array", K(ret), K(param));
// empty co sstable, no need to init cg sstable
cs_meta_.column_group_cnt_ = param.column_group_cnt_; // other cs meta is zero.
}
if (OB_SUCC(ret)) {
@ -209,20 +203,20 @@ int ObCOSSTableV2::fill_cg_sstables(const common::ObIArray<ObITable *> &cg_table
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!valid_for_reading_ || valid_for_cs_reading_ || 0 == cg_sstables_.count() || is_empty_co_)) {
if (OB_UNLIKELY(!valid_for_reading_ || valid_for_cs_reading_ || is_empty_co_)) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("this co sstable can't init cg sstables", K(ret),
K(valid_for_reading_), K(valid_for_cs_reading_), K(cg_sstables_), K(is_empty_co_));
} else if (OB_UNLIKELY(cg_sstables_.count() != cg_tables.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arguments", K(ret), K(cg_sstables_), K(cg_tables.count()));
} else if (OB_FAIL(cg_sstables_.add_tables_for_cg(tmp_allocator_, cg_tables))) {
LOG_WARN("failed to add cg sstables", K(ret), K(cg_tables));
K(valid_for_reading_), K(valid_for_cs_reading_), K(is_empty_co_));
} else if (OB_UNLIKELY(!is_loaded())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("co sstable must be loaded before persist", K(ret), KPC(this));
} else if (OB_FAIL(meta_->fill_cg_sstables(tmp_allocator_, cg_tables))) {
LOG_WARN("failed to fill cg sstables to sstable meta", K(ret), KPC(this), KPC(meta_));
} else if (OB_FAIL(build_cs_meta())) {
LOG_WARN("failed to build cs meta", K(ret), KPC(this));
} else {
valid_for_cs_reading_ = true;
FLOG_INFO("success to init co sstable", K(ret), K_(cs_meta), K_(cg_sstables), KPC(this)); // tmp debug code
FLOG_INFO("success to init co sstable", K(ret), K_(cs_meta), KPC(this)); // tmp debug code
}
return ret;
}
@ -230,20 +224,20 @@ int ObCOSSTableV2::fill_cg_sstables(const common::ObIArray<ObITable *> &cg_table
int ObCOSSTableV2::build_cs_meta()
{
int ret = OB_SUCCESS;
ObSSTableMetaHandle co_meta_handle;
blocksstable::ObSSTableMeta *meta = nullptr;
const int64_t cg_table_cnt = cg_sstables_.count() + 1/*base_cg_table*/;
cs_meta_.column_group_cnt_ = cg_table_cnt;
if (OB_UNLIKELY(is_empty_co_)) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("no need to build cs meta for empty co table", K(ret), KPC(this));
} else if (OB_FAIL(get_meta(co_meta_handle))) {
LOG_WARN("Failed to get co sstable meta", K(ret), KPC(this));
} else if (OB_UNLIKELY(!is_loaded())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("co table is unexpected not loaded", K(ret), KPC(this));
} else {
const blocksstable::ObSSTableMeta &meta = co_meta_handle.get_sstable_meta();
const ObSSTableArray &cg_sstables = meta_->get_cg_sstables();
const int64_t cg_table_cnt = cg_sstables.count() + 1/*base_cg_table*/;
cs_meta_.column_group_cnt_ = cg_table_cnt;
for (int64_t idx = 0; OB_SUCC(ret) && idx < cg_table_cnt; ++idx) {
ObSSTable *cg_sstable = (cg_table_cnt - 1 == idx) ? this : cg_sstables_[idx];
ObSSTable *cg_sstable = (cg_table_cnt - 1 == idx) ? this : cg_sstables[idx];
ObSSTableMetaHandle cg_meta_handle;
if (OB_ISNULL(cg_sstable)) {
ret = OB_ERR_UNEXPECTED;
@ -257,12 +251,12 @@ int ObCOSSTableV2::build_cs_meta()
LOG_WARN("the snapshot version of cg sstables must be equal", K(ret));
} else if (OB_FAIL(cg_sstable->get_meta(cg_meta_handle))) {
LOG_WARN("Failed to get cg sstable meta", K(ret), KPC(cg_sstable));
} else if (OB_UNLIKELY(cg_meta_handle.get_sstable_meta().get_schema_version() != meta.get_schema_version())) {
} else if (OB_UNLIKELY(cg_meta_handle.get_sstable_meta().get_schema_version() != meta_->get_schema_version())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the schema version of cg sstables must be equal", K(ret), K(meta), K(cg_meta_handle));
} else if (OB_UNLIKELY(cg_meta_handle.get_sstable_meta().get_row_count() != meta.get_row_count())) {
LOG_WARN("the schema version of cg sstables must be equal", K(ret), KPC(meta_), K(cg_meta_handle));
} else if (OB_UNLIKELY(cg_meta_handle.get_sstable_meta().get_row_count() != meta_->get_row_count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the row count of cg sstables must be equal", K(ret), KPC(cg_sstable), K(meta), K(cg_meta_handle));
LOG_WARN("the row count of cg sstables must be equal", K(ret), KPC(cg_sstable), KPC(meta_), K(cg_meta_handle));
} else {
cs_meta_.data_macro_block_cnt_ += cg_meta_handle.get_sstable_meta().get_basic_meta().data_macro_block_count_;
cs_meta_.use_old_macro_block_cnt_ += cg_meta_handle.get_sstable_meta().get_basic_meta().use_old_macro_block_count_;
@ -284,9 +278,6 @@ int64_t ObCOSSTableV2::get_serialize_size() const
len += serialization::encoded_length_i32(base_type_);
len += serialization::encoded_length_bool(is_empty_co_);
len += cs_meta_.get_serialize_size();
if (!is_empty_co_) {
len += cg_sstables_.get_serialize_size();
}
return len;
}
@ -309,10 +300,6 @@ int ObCOSSTableV2::serialize(char *buf, const int64_t buf_len, int64_t &pos) con
LOG_WARN("failed to serialize is empty co", K(ret), KP(buf), K(buf_len), K(pos));
} else if (OB_FAIL(cs_meta_.serialize(buf, buf_len, pos))) {
LOG_WARN("failed to serialize cs meta", K(ret), KP(buf), K(buf_len), K(pos));
} else if (is_empty_co_) {
// no need to serialize cg sstable
} else if (OB_FAIL(cg_sstables_.serialize(buf, buf_len, pos))) {
LOG_WARN("failed to serialize cg sstables", K(ret), KP(buf), K(buf_len), K(pos));
}
FLOG_INFO("chaser debug serialize co sstable", K(ret), KPC(this), K(buf_len), K(old_pos), K(pos)); // tmp debug code
return ret;
@ -341,13 +328,7 @@ int ObCOSSTableV2::deserialize(
LOG_WARN("failed to decode is empty co", K(ret), KP(buf),K(data_len), K(pos));
} else if (OB_FAIL(cs_meta_.deserialize(buf, data_len, pos))) {
LOG_WARN("failed to deserialize cs meta", K(ret), KP(buf), K(data_len), K(pos));
} else if (is_empty_co_) {
// no need to deserialize cg sstable
} else if (OB_FAIL(cg_sstables_.deserialize(allocator, buf, data_len, pos))) {
LOG_WARN("failed to deserialize cg sstable", K(ret), KP(buf), K(data_len), K(pos));
}
if (OB_SUCC(ret)) {
} else {
valid_for_cs_reading_ = true;
FLOG_INFO("success to deserialize co sstable", K(ret), KPC(this), K(data_len), K(pos), K(old_pos)); // tmp debug code
}
@ -373,10 +354,6 @@ int ObCOSSTableV2::serialize_full_table(char *buf, const int64_t buf_len, int64_
LOG_WARN("failed to serialize is empty co", K(ret), KP(buf), K(buf_len), K(pos));
} else if (OB_FAIL(cs_meta_.serialize(buf, buf_len, pos))) {
LOG_WARN("failed to deserialize cs meta", K(ret), KP(buf), K(buf_len), K(pos));
} else if (is_empty_co_) {
// no need to serialize cg sstable
} else if (OB_FAIL(cg_sstables_.serialize(buf, buf_len, pos))) {
LOG_WARN("failed to serialize cg sstables", K(ret), KP(buf), K(buf_len), K(pos));
}
FLOG_INFO("chaser debug serialize co sstable", K(ret), KPC(this), K(buf_len), K(old_pos), K(pos)); // tmp debug code
return ret;
@ -390,9 +367,6 @@ int64_t ObCOSSTableV2::get_full_serialize_size() const
len += serialization::encoded_length_i32(base_type_);
len += serialization::encoded_length_bool(is_empty_co_);
len += cs_meta_.get_serialize_size();
if (!is_empty_co_) {
len += cg_sstables_.get_serialize_size();
}
}
return len;
}
@ -420,10 +394,7 @@ int ObCOSSTableV2::deep_copy(char *buf, const int64_t buf_len, ObIStorageMetaObj
}
}
if (OB_FAIL(ret)) {
} else if (!is_empty_co_ && OB_FAIL(cg_sstables_.deep_copy(buf, buf_len, pos, new_co_table->cg_sstables_))) {
LOG_WARN("failed to deep copy cg sstables", K(ret), KP(buf), K(buf_len), K(pos));
} else {
if (OB_SUCC(ret)) {
MEMCPY(&new_co_table->cs_meta_, &cs_meta_, sizeof(ObCOSSTableMeta));
new_co_table->is_empty_co_ = is_empty_co_;
new_co_table->base_type_ = base_type_;
@ -435,6 +406,7 @@ int ObCOSSTableV2::deep_copy(char *buf, const int64_t buf_len, ObIStorageMetaObj
return ret;
}
// co sstable must be full memory
int ObCOSSTableV2::deep_copy(
common::ObArenaAllocator &allocator,
const common::ObIArray<ObMetaDiskAddr> &cg_addrs,
@ -445,13 +417,14 @@ int ObCOSSTableV2::deep_copy(
char *buf = nullptr;
ObCOSSTableV2 *new_co_table = nullptr;
ObIStorageMetaObj *meta_obj = nullptr;
ObSSTableArray &cg_sstables = meta_->get_cg_sstables();
if (OB_UNLIKELY(!valid_for_cs_reading_ || !cg_sstables_.is_valid())) {
if (OB_UNLIKELY(!valid_for_cs_reading_ || !cg_sstables.is_valid())) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("this co sstable can't set cg table addr", K(ret), K_(valid_for_cs_reading), K_(cg_sstables));
} else if (OB_UNLIKELY(cg_addrs.count() != cg_sstables_.count())) {
LOG_WARN("this co sstable can't set cg table addr", K(ret), K_(valid_for_cs_reading), K(cg_sstables));
} else if (OB_UNLIKELY(cg_addrs.count() != cg_sstables.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arguments", K(ret), K(cg_addrs.count()), K_(cg_sstables));
LOG_WARN("get invalid arguments", K(ret), K(cg_addrs.count()), K(cg_sstables));
} else if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(deep_copy_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for deep copy sstable", K(ret), K(deep_copy_size));
@ -462,8 +435,9 @@ int ObCOSSTableV2::deep_copy(
}
// set cg sstable addr
for (int64_t idx = 0; OB_SUCC(ret) && idx < new_co_table->cg_sstables_.count(); ++idx) {
ObSSTable *cg_table = new_co_table->cg_sstables_[idx];
ObSSTableArray &new_cg_sstables = new_co_table->meta_->get_cg_sstables();
for (int64_t idx = 0; OB_SUCC(ret) && idx < new_cg_sstables.count(); ++idx) {
ObSSTable *cg_table = new_cg_sstables[idx];
const ObMetaDiskAddr &cg_addr = cg_addrs.at(idx);
if (OB_ISNULL(cg_table)) {
ret = OB_ERR_UNEXPECTED;
@ -513,6 +487,7 @@ int ObCOSSTableV2::get_cg_sstable(
{
int ret = OB_SUCCESS;
cg_sstable = nullptr;
ObSSTableMetaHandle meta_handle;
if (OB_UNLIKELY(!is_cs_valid())) {
ret = OB_NOT_INIT;
@ -525,10 +500,13 @@ int ObCOSSTableV2::get_cg_sstable(
} else if (OB_UNLIKELY(is_empty_co_)) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("co sstable is empty, cannot fetch normal cg sstable", K(ret), K(cg_idx), KPC(this));
} else if (cg_idx < key_.column_group_idx_) {
cg_sstable = cg_sstables_[cg_idx];
} else if (OB_FAIL(get_meta(meta_handle))) {
LOG_WARN("failed to get meta handle", K(ret), KPC(this));
} else {
cg_sstable = cg_sstables_[cg_idx - 1];
const ObSSTableArray &cg_sstables = meta_handle.get_sstable_meta().get_cg_sstables();
cg_sstable = cg_idx < key_.column_group_idx_
? cg_sstables[cg_idx]
: cg_sstables[cg_idx - 1];
}
return ret;
}
@ -536,13 +514,17 @@ int ObCOSSTableV2::get_cg_sstable(
int ObCOSSTableV2::get_all_tables(common::ObIArray<ObITable *> &tables) const
{
int ret = OB_SUCCESS;
ObSSTableMetaHandle meta_handle;
if (is_empty_co_) {
if (OB_FAIL(tables.push_back(const_cast<ObCOSSTableV2 *>(this)))) {
LOG_WARN("failed to push back", K(ret), K(is_empty_co_));
}
} else if (OB_FAIL(get_meta(meta_handle))) {
LOG_WARN("failed to get meta handle", K(ret), KPC(this));
} else {
for (int64_t cg_idx = 0; OB_SUCC(ret) && cg_idx <= cg_sstables_.count(); ++cg_idx) {
const ObSSTableArray &cg_sstables = meta_handle.get_sstable_meta().get_cg_sstables();
for (int64_t cg_idx = 0; OB_SUCC(ret) && cg_idx <= cg_sstables.count(); ++cg_idx) {
ObSSTable *cg_sstable = nullptr;
if (OB_FAIL(get_cg_sstable(cg_idx, cg_sstable))) {
LOG_WARN("failed to get cg sstable", K(ret), K(cg_idx));

View File

@ -108,8 +108,6 @@ public:
bool is_empty_co_table() const { return is_empty_co_; }
int fill_cg_sstables(const common::ObIArray<ObITable *> &cg_tables);
OB_INLINE const ObCOSSTableMeta &get_cs_meta() const { return cs_meta_; }
OB_INLINE ObSSTableArray &get_cg_sstables() { return cg_sstables_; }
OB_INLINE const ObSSTableArray &get_cg_sstables() const { return cg_sstables_; }
OB_INLINE bool is_all_cg_base() const { return ObCOSSTableBaseType::ALL_CG_TYPE == base_type_; }
OB_INLINE bool is_rowkey_cg_base() const { return ObCOSSTableBaseType::ROWKEY_CG_TYPE == base_type_; }
OB_INLINE bool is_inited() const { return is_empty_co_table() || is_cs_valid(); }
@ -145,7 +143,6 @@ public:
{
int64_t size = sizeof(ObCOSSTableV2);
size += ObSSTable::get_deep_copy_size();
size += cg_sstables_.get_deep_copy_size();
return size;
}
@ -176,13 +173,11 @@ public:
ObTableAccessContext &context,
const common::ObIArray<blocksstable::ObDatumRowkey> &rowkeys,
ObStoreRowIterator *&row_iter) override;
INHERIT_TO_STRING_KV("ObSSTable", ObSSTable, KP(this), K_(cs_meta), K_(cg_sstables),
INHERIT_TO_STRING_KV("ObSSTable", ObSSTable, KP(this), K_(cs_meta),
K_(base_type), K_(is_empty_co), K_(valid_for_cs_reading));
private:
int prepare_cg_sstable_array(const int64_t column_group_cnt);
int build_cs_meta();
protected:
ObSSTableArray cg_sstables_;
ObCOSSTableMeta cs_meta_;
ObCOSSTableBaseType base_type_;
bool is_empty_co_; // no need to create cg sstable when co sstable is empty

View File

@ -300,7 +300,9 @@ int ObTenantCompactionMemPool::init()
} else {
chunk_allocator_.set_tenant_id(MTL_ID());
piece_allocator_.set_tenant_id(MTL_ID());
max_block_num_ = MAX_MEMORY_LIMIT / ObCompactionBufferChunk::DEFAULT_BLOCK_SIZE;
max_block_num_ = MTL_IS_MINI_MODE()
? MAX_MEMORY_LIMIT / ObCompactionBufferChunk::DEFAULT_BLOCK_SIZE
: MAX_MEMORY_LIMIT / (ObCompactionBufferChunk::DEFAULT_BLOCK_SIZE * 2);
total_block_num_ = 0;
is_inited_ = true;
}

View File

@ -437,7 +437,6 @@ public:
OB_INLINE ObITable *get_table() { return table_; }
OB_INLINE const ObITable *get_table() const { return table_; }
OB_INLINE common::ObIAllocator *get_allocator() { return allocator_; }
OB_INLINE const ObStorageMetaHandle &get_meta_handle() { return meta_handle_; }
int get_sstable(blocksstable::ObSSTable *&sstable);
int get_sstable(const blocksstable::ObSSTable *&sstable) const;

View File

@ -1066,7 +1066,7 @@ int ObTabletPersister::convert_macro_info_map(SharedMacroMap &shared_macro_map,
int ObTabletPersister::fetch_and_persist_co_sstable(
common::ObArenaAllocator &allocator,
ObCOSSTableV2 *co_sstable,
common::ObIArray<ObSharedBlocksWriteCtx> &meta_write_ctxs,
common::ObIArray<ObSharedBlocksWriteCtx> &sstable_meta_write_ctxs,
common::ObIArray<ObMetaDiskAddr> &cg_addrs,
int64_t &total_tablet_meta_size,
ObBlockInfoSet &block_info_set,
@ -1082,6 +1082,7 @@ int ObTabletPersister::fetch_and_persist_co_sstable(
cg_write_infos.set_attr(lib::ObMemAttr(MTL_ID(), "CGWriteInfos", ctx_id));
int64_t total_size = 0;
cg_addrs.reset();
ObSSTableMetaHandle co_meta_handle;
if (OB_ISNULL(co_sstable)) {
ret = OB_INVALID_ARGUMENT;
@ -1089,8 +1090,10 @@ int ObTabletPersister::fetch_and_persist_co_sstable(
} else if (FALSE_IT(total_size = co_sstable->get_serialize_size())) {
} else if (total_size < SSTABLE_MAX_SERIALIZE_SIZE) {
// do nothing
} else if (OB_FAIL(co_sstable->get_meta(co_meta_handle))) {
LOG_WARN("failed to get co meta handle", K(ret), KPC(co_sstable));
} else {
ObSSTableArray &cg_sstables = co_sstable->get_cg_sstables();
const ObSSTableArray &cg_sstables = co_meta_handle.get_sstable_meta().get_cg_sstables();
ObSSTable *cg_sstable = nullptr;
for (int64_t idx = 0; OB_SUCC(ret) && idx < cg_sstables.count(); ++idx) {
cg_sstable = cg_sstables[idx];
@ -1104,7 +1107,7 @@ int ObTabletPersister::fetch_and_persist_co_sstable(
if (OB_FAIL(ret)) {
} else if (0 < cg_write_infos.count()
&& OB_FAIL(batch_write_sstable_info(cg_write_infos, cg_write_ctxs, cg_addrs, meta_write_ctxs, block_info_set))) {
&& OB_FAIL(batch_write_sstable_info(cg_write_infos, cg_write_ctxs, cg_addrs, sstable_meta_write_ctxs, block_info_set))) {
LOG_WARN("failed to batch write sstable", K(ret));
} else if (OB_UNLIKELY(cg_addrs.count() != cg_sstables.count())) {
ret = OB_ERR_UNEXPECTED;
@ -1124,7 +1127,7 @@ int ObTabletPersister::fetch_and_persist_co_sstable(
int ObTabletPersister::fetch_and_persist_sstable(
ObTableStoreIterator &table_iter,
ObTabletTableStore &new_table_store,
common::ObIArray<ObSharedBlocksWriteCtx> &meta_write_ctxs,
common::ObIArray<ObSharedBlocksWriteCtx> &sstable_meta_write_ctxs,
int64_t &total_tablet_meta_size,
ObBlockInfoSet &block_info_set)
{
@ -1164,19 +1167,22 @@ int ObTabletPersister::fetch_and_persist_sstable(
} else if (table->is_co_sstable() && table->get_serialize_size() > SSTABLE_MAX_SERIALIZE_SIZE) {
large_co_sstable_cnt++;
ObCOSSTableV2 *co_sstable = static_cast<ObCOSSTableV2 *>(table);
cg_sstable_cnt += co_sstable->get_cg_sstables().count();
// serialize full co sstable and shell cg sstables when the serialize size of CO reached the limit.
FLOG_INFO("cannot full serialize CO within 2MB buffer, should serialize CO with Shell CG", K(ret), KPC(table));
cg_addrs.reset();
tmp_allocator.reuse();
ObCOSSTableV2 *tmp_co_sstable = nullptr;
ObSSTableMetaHandle co_meta_handle;
if (OB_FAIL(fetch_and_persist_co_sstable(
allocator_, co_sstable, meta_write_ctxs, cg_addrs, total_tablet_meta_size, block_info_set, shared_macro_map))) {
if (OB_FAIL(co_sstable->get_meta(co_meta_handle))) {
LOG_WARN("failed to get co meta handle", K(ret), KPC(co_sstable));
} else if (OB_FAIL(fetch_and_persist_co_sstable(
allocator_, co_sstable, sstable_meta_write_ctxs, cg_addrs, total_tablet_meta_size, block_info_set, shared_macro_map))) {
LOG_WARN("fail to persist co sstable", K(ret));
} else if (OB_FAIL(co_sstable->deep_copy(tmp_allocator, cg_addrs, tmp_co_sstable))) {
LOG_WARN("failed to deep copy co sstable", K(ret), KPC(co_sstable));
} else {
cg_sstable_cnt += co_meta_handle.get_sstable_meta().get_cg_sstables().count();
ObSSTablePersistWrapper wrapper(tmp_co_sstable);
if (OB_FAIL(fill_write_info(allocator_, &wrapper, write_infos))) {
LOG_WARN("failed to fill sstable write info", K(ret));
@ -1192,22 +1198,29 @@ int ObTabletPersister::fetch_and_persist_sstable(
if (table->is_co_sstable()) {
small_co_sstable_cnt++;
ObCOSSTableV2 *co_sstable = static_cast<ObCOSSTableV2 *>(table);
ObSSTableArray &cg_sstables = co_sstable->get_cg_sstables();
cg_sstable_cnt += cg_sstables.count();
ObSSTable *cg_sstable = nullptr;
for (int64_t idx = 0; OB_SUCC(ret) && idx < cg_sstables.count(); ++idx) {
cg_sstable = cg_sstables[idx];
const ObMetaDiskAddr &sstable_addr = cg_sstable->get_addr();
if (OB_FAIL(copy_sstable_macro_info(*cg_sstable, shared_macro_map, block_info_set))) {
LOG_WARN("fail to call sstable macro info", K(ret));
} else if (sstable_addr.is_block()) {
// this cg sstable has been persisted before
cg_sstable_meta_size += sstable_addr.size();
if (OB_FAIL(block_info_set.shared_meta_block_info_set_.set_refactored(sstable_addr.block_id(), 0 /*whether to overwrite*/))) {
if (OB_HASH_EXIST != ret) {
LOG_WARN("fail to push macro id into set", K(ret), K(sstable_addr));
} else {
ret = OB_SUCCESS;
ObSSTableMetaHandle co_meta_handle;
if (OB_FAIL(co_sstable->get_meta(co_meta_handle))) {
LOG_WARN("failed to get co meta handle", K(ret), KPC(co_sstable));
} else {
const ObSSTableArray &cg_sstables = co_meta_handle.get_sstable_meta().get_cg_sstables();
cg_sstable_cnt += cg_sstables.count();
ObSSTable *cg_sstable = nullptr;
for (int64_t idx = 0; OB_SUCC(ret) && idx < cg_sstables.count(); ++idx) {
cg_sstable = cg_sstables[idx];
const ObMetaDiskAddr &sstable_addr = cg_sstable->get_addr();
if (OB_FAIL(copy_sstable_macro_info(*cg_sstable, shared_macro_map, block_info_set))) {
LOG_WARN("fail to call sstable macro info", K(ret));
} else if (sstable_addr.is_block()) {
// this cg sstable has been persisted before
cg_sstable_meta_size += sstable_addr.size();
if (OB_FAIL(block_info_set.shared_meta_block_info_set_.set_refactored(sstable_addr.block_id(), 0 /*whether to overwrite*/))) {
if (OB_HASH_EXIST != ret) {
LOG_WARN("fail to push macro id into set", K(ret), K(sstable_addr));
} else {
ret = OB_SUCCESS;
}
}
}
}
@ -1228,12 +1241,12 @@ int ObTabletPersister::fetch_and_persist_sstable(
}
}
}
}
} // end while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(time_stats->set_extra_info("%s:%ld,%s:%ld,%s:%ld,%s:%ld",
if (FAILEDx(time_stats->set_extra_info("%s:%ld,%s:%ld,%s:%ld,%s:%ld",
"large_co_sst_cnt", large_co_sstable_cnt, "small_co_sst_cnt", small_co_sstable_cnt,
"normal_sst_cnt", normal_sstable_cnt, "cg_sst_cnt", cg_sstable_cnt))) {
LOG_WARN("fail to set time stats extra info", K(ret));
@ -1241,7 +1254,7 @@ int ObTabletPersister::fetch_and_persist_sstable(
} else if (OB_FAIL(convert_macro_info_map(shared_macro_map, block_info_set.shared_data_block_info_map_))) {
LOG_WARN("fail to convert shared data block info map", K(ret));
} else if (write_infos.count() > 0
&& OB_FAIL(batch_write_sstable_info(write_infos, write_ctxs, addrs, meta_write_ctxs, block_info_set))) {
&& OB_FAIL(batch_write_sstable_info(write_infos, write_ctxs, addrs, sstable_meta_write_ctxs, block_info_set))) {
LOG_WARN("failed to batch write sstable", K(ret));
} else if (FALSE_IT(time_stats->click("batch_write_sstable_info"))) {
} else if (OB_FAIL(new_table_store.init(allocator_, tables, addrs))) {
@ -1671,7 +1684,7 @@ int ObTabletPersister::fetch_table_store_and_write_info(
const ObTablet &tablet,
ObTabletMemberWrapper<ObTabletTableStore> &wrapper,
common::ObIArray<ObSharedBlockWriteInfo> &write_infos,
common::ObIArray<ObSharedBlocksWriteCtx> &meta_write_ctxs,
common::ObIArray<ObSharedBlocksWriteCtx> &sstable_meta_write_ctxs,
int64_t &total_tablet_meta_size,
ObBlockInfoSet &block_info_set)
{
@ -1691,7 +1704,7 @@ int ObTabletPersister::fetch_table_store_and_write_info(
LOG_WARN("fail to get all sstable iterator", K(ret), KPC(table_store));
} else if (FALSE_IT(time_stats->click("get_all_sstable"))) {
} else if (OB_FAIL(fetch_and_persist_sstable(
table_iter, new_table_store, meta_write_ctxs, total_tablet_meta_size, block_info_set))) {
table_iter, new_table_store, sstable_meta_write_ctxs, total_tablet_meta_size, block_info_set))) {
LOG_WARN("fail to fetch and persist sstable", K(ret), K(table_iter));
} else if (FALSE_IT(time_stats->click("fetch_and_persist_sstable"))) {
} else if (OB_FAIL(fill_write_info(allocator_, &new_table_store, write_infos))) {

View File

@ -1743,7 +1743,8 @@ int ObTabletTableStore::get_need_to_cache_sstables(
if (OB_ISNULL(sstable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, sstable is nullptr", K(ret), KP(sstable));
} else if (sstable->is_loaded()) { // sstable is already loaded to memory
} else if (sstable->is_loaded()) {
// sstable is already loaded to memory, do nothing
} else {
ObStorageMetaValue::MetaType meta_type = sstable->is_co_sstable()
? ObStorageMetaValue::CO_SSTABLE
@ -1770,57 +1771,32 @@ int ObTabletTableStore::batch_cache_sstable_meta(
{
int ret = OB_SUCCESS;
int64_t remain_size = limit_size;
common::ObSEArray<ObSSTableMetaHandle, BASIC_MEMSTORE_CNT> cg_meta_hdls;
if (OB_UNLIKELY(limit_size <= 0 || sstables.count() != handles.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(remain_size), K(sstables), K(handles));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < sstables.count(); ++i) {
ObStorageMetaHandle &handle = handles.at(i);
ObSSTableMetaHandle sst_meta_hdl;
blocksstable::ObSSTable *sstable = sstables.at(i);
blocksstable::ObSSTable *tmp_sstable = nullptr;
int64_t deep_copy_size = 0;
}
if (OB_ISNULL(sstable) || OB_UNLIKELY(!handle.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(sstable), K(handle));
} else if (OB_FAIL(handle.get_sstable(tmp_sstable))) {
LOG_WARN("fail to get sstable", K(ret), K(handle));
} else if (OB_FAIL(tmp_sstable->get_meta(sst_meta_hdl))) {
LOG_WARN("fail to get sstable meta", K(ret));
} else if (tmp_sstable->is_cg_sstable()) {
if (OB_FAIL(cg_meta_hdls.push_back(sst_meta_hdl))) {
LOG_WARN("fail to add cg sstable meta handle", K(ret), KPC(tmp_sstable));
}
continue;
} else if (FALSE_IT(deep_copy_size = sst_meta_hdl.get_sstable_meta().get_deep_copy_size())) {
} else if (0 > remain_size - deep_copy_size) {
break;
} else if (OB_FAIL(cache_sstable_meta(allocator, sst_meta_hdl, sstable, remain_size))) {
LOG_WARN("failed to cache sstable meta", K(ret), KPC(sstable), K(sst_meta_hdl));
} else if (!sstable->is_co_sstable() || 0 == cg_meta_hdls.count()) {
continue;
} else if (OB_UNLIKELY(cg_meta_hdls.count() != static_cast<ObCOSSTableV2 *>(sstable)->get_cg_sstables().count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cg meta handle count is unexpected not equal with cg sstable count",
K(ret), K(cg_meta_hdls.count()), KPC(sstable));
} else {
ObCOSSTableV2 *co_sstable = static_cast<ObCOSSTableV2 *>(sstable);
ObSSTableArray &cg_sstables = co_sstable->get_cg_sstables();
for (int64_t i = 0; OB_SUCC(ret) && i < sstables.count(); ++i) {
ObStorageMetaHandle &handle = handles.at(i);
ObSSTableMetaHandle sst_meta_hdl;
blocksstable::ObSSTable *sstable = sstables.at(i);
blocksstable::ObSSTable *tmp_sstable = nullptr;
int64_t deep_copy_size = 0;
for (int64_t cg_idx = 0; OB_SUCC(ret) && cg_idx < cg_sstables.count(); ++cg_idx) {
int64_t cg_deep_copy_size = cg_meta_hdls.at(cg_idx).get_sstable_meta().get_deep_copy_size();
if (0 < remain_size - cg_deep_copy_size) {
break;
} else if (OB_FAIL(cache_sstable_meta(allocator, cg_meta_hdls.at(cg_idx), cg_sstables[cg_idx], remain_size))) {
LOG_WARN("failed to cache sstable meta", K(ret), KPC(cg_sstables[cg_idx]));
}
}
cg_meta_hdls.reset();
}
} // end for
if (OB_ISNULL(sstable) || OB_UNLIKELY(!handle.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(sstable), K(handle));
} else if (OB_FAIL(handle.get_sstable(tmp_sstable))) {
LOG_WARN("fail to get sstable", K(ret), K(handle));
} else if (OB_FAIL(tmp_sstable->get_meta(sst_meta_hdl))) {
LOG_WARN("fail to get sstable meta", K(ret));
} else if (FALSE_IT(deep_copy_size = sst_meta_hdl.get_sstable_meta().get_deep_copy_size())) {
} else if (0 > remain_size - deep_copy_size) {
break;
} else if (OB_FAIL(cache_sstable_meta(allocator, sst_meta_hdl, sstable, remain_size))) {
LOG_WARN("failed to cache sstable meta", K(ret), KPC(sstable), K(sst_meta_hdl));
}
}
return ret;
}

View File

@ -222,28 +222,38 @@ int ObTableStoreIterator::add_table(ObITable *table)
// lifetime guaranteed by tablet_handle_
} else if (static_cast<ObSSTable *>(table)->is_loaded() || !need_load_sstable_) {
// lifetime guaranteed by table_store_handle_
} else {
ObStorageMetaHandle sstable_meta_hdl;
if (OB_FAIL(ObTabletTableStore::load_sstable(static_cast<ObSSTable *>(table)->get_addr(),
table->is_co_sstable(), sstable_meta_hdl))) {
LOG_WARN("fail to load sstable", K(ret));
} else if (OB_FAIL(sstable_handle_array_.push_back(sstable_meta_hdl))) {
LOG_WARN("fail to push sstable meta handle", K(ret), K(sstable_meta_hdl));
} else if (OB_FAIL(sstable_meta_hdl.get_sstable(sstable))) {
LOG_WARN("fail to get sstable from meta handle", K(ret), K(sstable_meta_hdl), KPC(table));
} else {
table_ptr.table_ = sstable;
table_ptr.hdl_idx_ = sstable_handle_array_.count() - 1;
}
} else if (OB_FAIL(get_table_ptr_with_meta_handle(static_cast<ObSSTable *>(table), table_ptr))) {
LOG_WARN("fail to get table ptr with meta handle", K(ret), KPC(table));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(table_ptr_array_.push_back(table_ptr))) {
if (FAILEDx(table_ptr_array_.push_back(table_ptr))) {
LOG_WARN("fail to push table handle into array", K(ret));
}
return ret;
}
int ObTableStoreIterator::get_table_ptr_with_meta_handle(
const ObSSTable *table,
TablePtr &table_ptr)
{
int ret = OB_SUCCESS;
ObStorageMetaHandle sstable_meta_hdl;
ObSSTable *sstable = nullptr;
if (OB_FAIL(ObTabletTableStore::load_sstable(table->get_addr(),
table->is_co_sstable(), sstable_meta_hdl))) {
LOG_WARN("fail to load sstable", K(ret));
} else if (OB_FAIL(sstable_handle_array_.push_back(sstable_meta_hdl))) {
LOG_WARN("fail to push sstable meta handle", K(ret), K(sstable_meta_hdl));
} else if (OB_FAIL(sstable_meta_hdl.get_sstable(sstable))) {
LOG_WARN("fail to get sstable from meta handle", K(ret), K(sstable_meta_hdl), KPC(table));
} else {
table_ptr.table_ = sstable;
table_ptr.hdl_idx_ = sstable_handle_array_.count() - 1;
}
return ret;
}
int ObTableStoreIterator::inner_move_idx_to_next()
{
int ret = OB_SUCCESS;
@ -275,11 +285,16 @@ int ObTableStoreIterator::add_tables(
LOG_WARN("fail to add sstable to iterator", K(ret), K(i));
} else if (sstable_array[i]->is_co_sstable() && unpack_co_table) {
ObCOSSTableV2 *co_table = static_cast<ObCOSSTableV2 *>(sstable_array[i]);
ObSSTableArray &cg_sstables = co_table->get_cg_sstables();
ObSSTableMetaHandle meta_handle;
if (co_table->is_empty_co_table()) {
// empty co table, no need to call this func recursively
} else if (OB_FAIL(add_tables(cg_sstables, 0, cg_sstables.count(), false))) {
LOG_WARN("fail to add cg table to iterator", K(ret), KPC(co_table));
} else if (OB_FAIL(co_table->get_meta(meta_handle))) {
LOG_WARN("failed to get co meta handle", K(ret), KPC(co_table));
} else {
const ObSSTableArray &cg_sstables = meta_handle.get_sstable_meta().get_cg_sstables();
if (OB_FAIL(add_cg_tables(cg_sstables, co_table->is_loaded(), meta_handle))) {
LOG_WARN("fail to add cg table to iterator", K(ret), KPC(co_table));
}
}
}
}
@ -287,6 +302,55 @@ int ObTableStoreIterator::add_tables(
return ret;
}
/*
* cg sstable should be added carefully:
* if cg is not loaded, its lifetime guranteed by cg meta handle and co meta handle
* if cg is loaded, its lifetime guranteed by co meta handle
*/
int ObTableStoreIterator::add_cg_tables(
const ObSSTableArray &cg_sstables,
const bool is_loaded_co_table,
const ObSSTableMetaHandle &co_meta_handle)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!cg_sstables.is_valid() || !co_meta_handle.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(cg_sstables), K(co_meta_handle));
}
for (int64_t i = 0; OB_SUCC(ret) && i < cg_sstables.count(); ++i) {
ObSSTable *cg_table = cg_sstables[i];
TablePtr table_ptr;
ObSSTableMetaHandle cg_meta_handle;
if (OB_UNLIKELY(nullptr == (cg_table = cg_sstables[i]) || !cg_table->is_cg_sstable())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected cg table", K(ret), KPC(cg_table));
} else if (is_loaded_co_table && cg_table->is_loaded()) {
// lifetime guranteed by loaded co table
table_ptr.table_ = cg_table;
} else if (!cg_table->is_loaded()) {
// cg table is shell, lifetime guranteed by cg meta handle
if (OB_FAIL(get_table_ptr_with_meta_handle(cg_table, table_ptr))) {
LOG_WARN("fail to get table ptr with meta handle", K(ret), KPC(cg_table));
}
} else {
// cg table is loaded, lifetime guranteed by co meta handle
if (OB_FAIL(sstable_handle_array_.push_back(co_meta_handle.get_storage_handle()))) {
LOG_WARN("fail to push sstable meta handle", K(ret), KPC(cg_table));
} else {
table_ptr.table_ = cg_table;
table_ptr.hdl_idx_ = sstable_handle_array_.count() - 1;
}
}
if (FAILEDx(table_ptr_array_.push_back(table_ptr))) {
LOG_WARN("fail to push table handle into array", K(ret));
}
}
return ret;
}
int ObTableStoreIterator::add_tables(
const ObMemtableArray &memtable_array,
const int64_t start_pos)
@ -328,6 +392,9 @@ int ObTableStoreIterator::get_ith_table(const int64_t pos, ObITable *&table)
LOG_WARN("unexpected handle idx for loaded sstable", K(ret), K(hdl_idx), KPC(tmp_table), KPC(this));
} else if (OB_FAIL(sstable_handle_array_.at(hdl_idx).get_sstable(sstable))) {
LOG_WARN("fail to get sstable value", K(ret), K(hdl_idx), K(sstable_handle_array_));
} else if (sstable->is_co_sstable() && tmp_table->is_cg_sstable()) {
// cg sstable's lifetime guranteed by co meta handle
table = tmp_table;
} else {
table = sstable;
table_ptr_array_.at(pos).table_ = sstable;

View File

@ -18,6 +18,12 @@
namespace oceanbase
{
namespace blocksstable
{
class ObSSTableMetaHandle;
class ObSSTable;
}
namespace storage
{
@ -82,7 +88,14 @@ public:
K_(need_load_sstable), K_(table_store_handle), KPC_(transfer_src_table_store_handle));
private:
int inner_move_idx_to_next();
int get_table_ptr_with_meta_handle(
const blocksstable::ObSSTable *table,
TablePtr &table_ptr);
int add_tables(const ObMemtableArray &memtable_array, const int64_t start_pos = 0);
int add_cg_tables(
const ObSSTableArray &sstable_array,
const bool is_loaded_co_table,
const blocksstable::ObSSTableMetaHandle &co_meta_handle);
int get_ith_table(const int64_t pos, ObITable *&table);
private:
friend class ObTablet; // TODO: remove this friend class when possible

View File

@ -405,7 +405,7 @@ TEST_F(TestCOSSTable, empty_co_table_test)
EXPECT_EQ(OB_SUCCESS, ret);
ObCOSSTableV2 *co_table = static_cast<ObCOSSTableV2 *>(co_table_handle.get_table());
EXPECT_EQ(0, co_table->cg_sstables_.count());
EXPECT_EQ(0, co_table->meta_->cg_sstables_.count());
EXPECT_EQ(true, co_table->is_empty_co_table());
EXPECT_EQ(storage_schema.get_column_group_count(), co_table->get_cs_meta().column_group_cnt_);
}