Optimize ObTxContext memory

This commit is contained in:
JiahuaChen
2024-05-15 07:46:08 +00:00
committed by ob-robot
parent 3bfa0f6a5d
commit a8c76f3652
3 changed files with 300 additions and 70 deletions

View File

@ -374,6 +374,182 @@ int ObSSTableBasicMeta::set_upper_trans_version(const int64_t upper_trans_versio
return ret;
}
//================================== ObTxDesc & ObTxContext ==================================
int ObTxContext::ObTxDesc::serialize(char *buf, const int64_t buf_len, int64_t &pos) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(serialization::encode_i64(buf, buf_len,pos, tx_id_))) {
LOG_WARN("fail to encode length", K(ret), K(buf_len), K(pos));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, row_count_))) {
LOG_WARN("fail to encode length", K(ret), K(buf_len), K(pos));
}
return ret;
}
int ObTxContext::ObTxDesc::deserialize(const char *buf, const int64_t buf_len, int64_t &pos)
{
int ret = OB_SUCCESS;
if (OB_FAIL(serialization::decode_i64(buf, buf_len, pos, &tx_id_))) {
LOG_WARN("fail to encode length", K(ret), K(buf_len), K(pos));
} else if (OB_FAIL(serialization::decode_i64(buf, buf_len, pos, &row_count_))) {
LOG_WARN("fail to encode length", K(ret), K(buf_len), K(pos));
}
return ret;
}
int64_t ObTxContext::ObTxDesc::get_serialize_size() const
{
return serialization::encoded_length_i64(tx_id_) + serialization::encoded_length_i64(row_count_);
}
int ObTxContext::serialize(char *buf, const int64_t buf_len, int64_t &pos) const
{
int ret = OB_SUCCESS;
const int64_t tmp_pos = pos;
const_cast<ObTxContext *>(this)->len_ = get_serialize_size();
if (OB_FAIL(serialization::encode_i32(buf, buf_len,pos, len_))) {
LOG_WARN("fail to encode length", K(ret), K(buf_len), K(pos));
} else if (OB_FAIL(serialization::encode_vi64(buf, buf_len, pos, count_))) {
LOG_WARN("fail to encode count", K(ret), K(buf_len), K(pos));
}
for (int64_t i = 0; OB_SUCC(ret) && i < count_; i ++) {
if (OB_FAIL(serialization::encode(buf, buf_len, pos, tx_descs_[i]))) {
LOG_WARN("fail to encode item", K(i), K(ret), K(buf_len), K(pos));
}
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(pos - tmp_pos != len_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected len_", K(ret), K(len_), K(tmp_pos), K(pos));
}
return ret;
}
int64_t ObTxContext::get_serialize_size() const
{
int64_t size = serialization::encoded_length_i32(len_);
size += serialization::encoded_length_vi64(count_);
for (int64_t i = 0; i < count_; i++) {
size += serialization::encoded_length(tx_descs_[i]);
}
return size;
}
int ObTxContext::deserialize(
common::ObArenaAllocator &allocator,
const char *buf,
const int64_t buf_len,
int64_t &pos)
{
int ret = OB_SUCCESS;
const int64_t tmp_pos = pos;
if (OB_FAIL(serialization::decode_i32(buf, buf_len, pos, &len_))) {
LOG_WARN("fail to encode length", K(ret), K(buf_len), K(pos));
} else if (OB_FAIL(serialization::decode_vi64(buf, buf_len, pos, &count_))) {
LOG_WARN("fail to decode ob array count", K(ret));
} else if (count_ > 0) {
if (OB_ISNULL(tx_descs_ = static_cast<ObTxDesc *>(allocator.alloc(sizeof(ObTxDesc) * count_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate tx context", K(ret), K(count_));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < count_; i ++) {
ObTxDesc &item = tx_descs_[i];
if (OB_FAIL(serialization::decode(buf, buf_len, pos, item))) {
LOG_WARN("fail to decode array item", K(ret), K(i), K(count_));
}
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(pos - tmp_pos != len_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected len_", K(ret), K(len_), K(tmp_pos), K(pos));
}
return ret;
}
int ObTxContext::deep_copy(
char *buf,
const int64_t buf_len,
int64_t &pos,
ObTxContext &dest) const
{
int ret = OB_SUCCESS;
const int64_t variable_size = get_variable_size();
if (this == &dest) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("can't deep copy self", K(ret), K(*this));
} else if (pos + variable_size > buf_len) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("buf not enough", K(ret), K(pos), K(buf_len), K(*this));
} else {
dest.len_ = len_;
dest.count_ = count_;
if (0 == count_) {
dest.tx_descs_ = nullptr;
} else {
dest.tx_descs_ = reinterpret_cast<ObTxDesc *>(buf + pos);
for (int64_t i = 0; i < count_; i++) {
dest.tx_descs_[i] = tx_descs_[i];
}
}
pos += variable_size;
}
return ret;
}
int ObTxContext::init(const common::ObIArray<ObTxDesc> &tx_descs, common::ObArenaAllocator &allocator)
{
int ret = OB_SUCCESS;
const int64_t cnt = tx_descs.count();
if (nullptr != tx_descs_ || count_ > 0) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret), K(*this));
} else if (OB_UNLIKELY(MAX_TX_IDS_COUNT < cnt)) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("too many tx desc", K(ret), K(cnt));
} else if (0 == cnt) {
reset();
} else if (OB_ISNULL(tx_descs_ = static_cast<ObTxContext::ObTxDesc *>(allocator.alloc(sizeof(ObTxContext::ObTxDesc) * cnt)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate tx context", K(ret), KP(tx_descs_));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < cnt; i++) {
if (OB_FAIL(push_back(tx_descs.at(i)))) {
LOG_WARN("failed to alloc memory for tx_ids_", K(ret), K(i), K(tx_descs.at(i)));
}
}
}
return ret;
}
int64_t ObTxContext::get_tx_id(const int64_t idx) const
{
OB_ASSERT(idx >=0 && idx < count_);
return tx_descs_[idx].tx_id_;
}
int ObTxContext::push_back(const ObTxDesc &desc)
{
int ret = OB_SUCCESS;
if (count_ >= MAX_TX_IDS_COUNT) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("tx desc array overflow", K(ret), K(count_));
} else if (OB_ISNULL(tx_descs_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tx desc array is null", K(ret), K(count_), KP(tx_descs_));
} else {
tx_descs_[count_++] = desc;
}
return ret;
}
int64_t ObTxContext::get_variable_size() const
{
return count_ * sizeof(ObTxDesc);
}
//================================== ObSSTableMeta ==================================
ObSSTableMeta::ObSSTableMeta()
: basic_meta_(),
@ -507,6 +683,20 @@ int ObSSTableMeta::prepare_column_checksum(
return ret;
}
int ObSSTableMeta::prepare_tx_context(
const ObTxContext::ObTxDesc &tx_desc,
common::ObArenaAllocator &allocator)
{
int ret = OB_SUCCESS;
ObSEArray<ObTxContext::ObTxDesc, 1> tx_desc_arr;
if (OB_FAIL(tx_desc_arr.push_back(tx_desc))) {
LOG_WARN("push back tx desc fail", K(ret), K(tx_desc));
} else if (OB_FAIL(tx_ctx_.init(tx_desc_arr, allocator))) {
LOG_WARN("failed to alloc memory for tx_ids_", K(ret), K(tx_desc));
}
return ret;
}
bool ObSSTableMeta::check_meta() const
{
return basic_meta_.is_valid()
@ -548,7 +738,7 @@ int ObSSTableMeta::init(
}
if (OB_SUCC(ret) && transaction::ObTransID(param.uncommitted_tx_id_).is_valid()) {
if (OB_FAIL(tx_ctx_.tx_descs_.push_back({param.uncommitted_tx_id_, 0}))) {
if (OB_FAIL(prepare_tx_context({param.uncommitted_tx_id_, 0}, allocator))) {
LOG_WARN("failed to alloc memory for tx_ids_", K(ret), K(param));
}
}
@ -694,7 +884,7 @@ int ObSSTableMeta::deserialize_(
LOG_WARN("fail to deserialize macro info", K(ret), K(data_len), K(pos), K(des_meta));
} else if (pos < data_len && OB_FAIL(cg_sstables_.deserialize(allocator, buf, data_len, pos))) {
LOG_WARN("fail to deserialize cg sstables", K(ret), K(data_len), K(pos));
} else if (pos < data_len && OB_FAIL(tx_ctx_.deserialize(buf, data_len, pos))) {
} else if (pos < data_len && OB_FAIL(tx_ctx_.deserialize(allocator, buf, data_len, pos))) {
LOG_WARN("fail to deserialize tx ids", K(ret), K(data_len), K(pos));
}
}
@ -729,7 +919,8 @@ int64_t ObSSTableMeta::get_variable_size() const
return sizeof(int64_t) * column_checksum_count_ // column checksums
+ data_root_info_.get_variable_size()
+ macro_info_.get_variable_size()
+ cg_sstables_.get_deep_copy_size();
+ cg_sstables_.get_deep_copy_size()
+ tx_ctx_.get_variable_size();
}
int ObSSTableMeta::deep_copy(
@ -759,8 +950,11 @@ int ObSSTableMeta::deep_copy(
LOG_WARN("fail to deep copy macro info", K(ret), KP(buf), K(buf_len), K(pos), K(macro_info_));
} else if (OB_FAIL(cg_sstables_.deep_copy(buf, buf_len, pos, dest->cg_sstables_))) {
LOG_WARN("fail to deep copy cg sstables", K(ret), KP(buf), K(buf_len), K(pos), K(cg_sstables_));
} else if (OB_FAIL(dest->tx_ctx_.assign(tx_ctx_))) {
} else if (OB_FAIL(tx_ctx_.deep_copy(buf, buf_len, pos, dest->tx_ctx_))) {
LOG_WARN("fail to deep copy cg sstables", K(ret), K(tx_ctx_));
} else if (deep_size != pos - tmp_pos) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("deep copy size miss match", K(ret), K(*this), KPC(dest), K(deep_size), K(tmp_pos), K(pos));
} else {
dest->is_inited_ = is_inited_;
}

View File

@ -13,6 +13,7 @@
#ifndef OCEANBASE_STORAGE_BLOCKSSTABLE_OB_SSTABLE_META_H
#define OCEANBASE_STORAGE_BLOCKSSTABLE_OB_SSTABLE_META_H
#include "lib/container/ob_iarray.h"
#include "share/schema/ob_table_schema.h"
#include "storage/ob_storage_schema.h"
#include "storage/ob_i_table.h"
@ -29,80 +30,49 @@ struct ObTabletCreateSSTableParam;
}
namespace blocksstable
{
struct ObTxContext
class ObTxContext final
{
public:
struct ObTxDesc final
{
struct ObTxDesc{
int64_t tx_id_;
int64_t row_count_;
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(serialization::encode_i64(buf, buf_len,pos, tx_id_))) {
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, row_count_))) {
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
}
return ret;
}
int deserialize(const char *buf, const int64_t buf_len, int64_t &pos)
{
int ret = OB_SUCCESS;
if (OB_FAIL(serialization::decode_i64(buf, buf_len, pos, &tx_id_))) {
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
} else if (OB_FAIL(serialization::decode_i64(buf, buf_len, pos, &row_count_))) {
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
}
return ret;
}
int64_t get_serialize_size() const {
return serialization::encoded_length_i64(tx_id_) + serialization::encoded_length_i64(row_count_);
}
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const;
int deserialize(const char *buf, const int64_t buf_len, int64_t &pos);
int64_t get_serialize_size() const;
TO_STRING_KV(K(tx_id_), K(row_count_));
};
ObTxContext() : len_(0), count_(0), tx_descs_(nullptr) {};
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const
int init(const common::ObIArray<ObTxDesc> &tx_descs, common::ObArenaAllocator &allocator);
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const;
int64_t get_serialize_size() const;
int deserialize(common::ObArenaAllocator &allocator, const char *buf, const int64_t buf_len, int64_t &pos);
int deep_copy(
char *buf,
const int64_t buf_len,
int64_t &pos,
ObTxContext &dest) const;
int64_t get_variable_size() const;
OB_INLINE int64_t get_count() const { return count_; }
int64_t get_tx_id(const int64_t idx) const;
void reset()
{
int ret = OB_SUCCESS;
const_cast<ObTxContext *>(this)->len_ = get_serialize_size();
if (OB_FAIL(serialization::encode_i32(buf, buf_len,pos, len_))) {
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
} else if (OB_FAIL(tx_descs_.serialize(buf, buf_len, pos))) {
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
}
return ret;
}
int64_t get_serialize_size() const {
return serialization::encoded_length_i32(len_) + tx_descs_.get_serialize_size();
}
int deserialize(const char *buf, const int64_t buf_len, int64_t &pos)
{
int ret = OB_SUCCESS;
const int64_t tmp_pos = pos;
if (OB_FAIL(serialization::decode_i32(buf, buf_len, pos, &len_))) {
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
} else if (OB_FAIL(tx_descs_.deserialize(buf, buf_len, pos))) {
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
} else if (OB_UNLIKELY(pos - tmp_pos != len_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected len_", K(ret), K(len_), K(tmp_pos), K(pos));
}
return ret;
}
int assign(const ObTxContext &tx_ctx) {
len_ = tx_ctx.len_;
return tx_descs_.assign(tx_ctx.tx_descs_);
}
void reset() {
len_ = 0;
tx_descs_.reset();
count_ = 0;
tx_descs_ = nullptr;
}
TO_STRING_KV(K_(count), K(ObArrayWrap<ObTxDesc>(tx_descs_, count_)));
private:
int push_back(const ObTxDesc &desc);
private:
static const int64_t MAX_TX_IDS_COUNT = 16;
int32_t len_;
ObSEArray<ObTxDesc, MAX_TX_IDS_COUNT> tx_descs_;
TO_STRING_KV(K(tx_descs_));
int32_t len_; // for compat
int64_t count_; // actual item count
ObTxDesc *tx_descs_;
DISALLOW_COPY_AND_ASSIGN(ObTxContext);
};
//For compatibility, the variables in this struct MUST NOT be deleted or moved.
@ -230,8 +200,8 @@ public:
OB_INLINE const ObSSTableBasicMeta &get_basic_meta() const { return basic_meta_; }
OB_INLINE int64_t get_col_checksum_cnt() const { return column_checksum_count_; }
OB_INLINE int64_t *get_col_checksum() const { return column_checksums_; }
OB_INLINE int64_t get_tx_id_count() const { return tx_ctx_.tx_descs_.count(); }
OB_INLINE int64_t get_tx_ids(int64_t idx) const { return tx_ctx_.tx_descs_.at(idx).tx_id_; }
OB_INLINE int64_t get_tx_id_count() const { return tx_ctx_.get_count(); }
OB_INLINE int64_t get_tx_ids(const int64_t idx) const { return tx_ctx_.get_tx_id(idx); }
OB_INLINE int64_t get_data_checksum() const { return basic_meta_.data_checksum_; }
OB_INLINE int64_t get_rowkey_column_count() const { return basic_meta_.rowkey_column_count_; }
OB_INLINE int64_t get_column_count() const { return basic_meta_.column_cnt_; }
@ -331,6 +301,9 @@ private:
int prepare_column_checksum(
const common::ObIArray<int64_t> &column_checksums,
common::ObArenaAllocator &allocator);
int prepare_tx_context(
const ObTxContext::ObTxDesc &tx_desc,
common::ObArenaAllocator &allocator);
int serialize_(char *buf, const int64_t buf_len, int64_t &pos) const;
int deserialize_(
common::ObArenaAllocator &allocator,

View File

@ -604,6 +604,70 @@ TEST_F(TestSSTableMeta, test_sstable_deep_copy)
ASSERT_EQ(full_sstable.meta_->is_inited_, tiny_sstable->meta_->is_inited_);
}
TEST_F(TestSSTableMeta, test_sstable_meta_deep_copy)
{
int ret = OB_SUCCESS;
ObSSTableMeta src_meta;
// add salt
src_meta.basic_meta_.data_checksum_ = 20240514;
src_meta.column_checksum_count_ = 3;
src_meta.column_checksums_ = (int64_t*)ob_malloc_align(4<<10, 3 * sizeof(int64_t), ObMemAttr());
src_meta.column_checksums_[0] = 1111;
src_meta.column_checksums_[1] = 2222;
src_meta.column_checksums_[2] = 3333;
src_meta.tx_ctx_.tx_descs_ = (ObTxContext::ObTxDesc*)ob_malloc_align(4<<10, 2 * sizeof(ObTxContext::ObTxDesc), ObMemAttr());
ret = src_meta.tx_ctx_.push_back({987, 654});
ASSERT_EQ(OB_SUCCESS, ret);
ret = src_meta.tx_ctx_.push_back({123, 456});
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(2, src_meta.tx_ctx_.count_);
ASSERT_EQ(2 * sizeof(ObTxContext::ObTxDesc), src_meta.tx_ctx_.get_variable_size());
src_meta.tx_ctx_.len_ = src_meta.tx_ctx_.get_serialize_size();
// test deep copy from dynamic memory meta to flat memory meta
const int64_t buf_size = 8 << 10; //8K
int64_t pos = 0;
char *flat_buf_1 = (char*)ob_malloc(buf_size, ObMemAttr());
int64_t deep_copy_size = src_meta.get_deep_copy_size();
ObSSTableMeta *flat_meta_1;
ret = src_meta.deep_copy(flat_buf_1, deep_copy_size, pos, flat_meta_1);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(deep_copy_size, pos);
OB_LOG(INFO, "cooper", K(src_meta), K(sizeof(ObSSTableMeta)), K(deep_copy_size));
OB_LOG(INFO, "cooper", K(*flat_meta_1));
// can't use MEMCMP between dynamic memory and flat memory, because one is stack, the other is heap
ASSERT_EQ(src_meta.basic_meta_, flat_meta_1->basic_meta_);
// ASSERT_EQ(0, MEMCMP((char*)&src_meta.data_root_info_, (char*)&flat_meta_1->data_root_info_, sizeof(src_meta.data_root_info_)));
// ASSERT_EQ(0, MEMCMP((char*)&src_meta.macro_info_, (char*)&dst_meta->macro_info_, sizeof(src_meta.macro_info_)));
// ASSERT_EQ(0, MEMCMP((char*)&src_meta.cg_sstables_, (char*)&dst_meta->cg_sstables_, sizeof(src_meta.cg_sstables_)));
ASSERT_EQ(0, MEMCMP(src_meta.column_checksums_, flat_meta_1->column_checksums_, src_meta.column_checksum_count_ * sizeof(int64_t)));
ASSERT_EQ(src_meta.tx_ctx_.len_, flat_meta_1->tx_ctx_.len_);
ASSERT_EQ(src_meta.tx_ctx_.count_, flat_meta_1->tx_ctx_.count_);
ASSERT_EQ(0, MEMCMP(src_meta.tx_ctx_.tx_descs_, flat_meta_1->tx_ctx_.tx_descs_, flat_meta_1->tx_ctx_.get_variable_size()));
// test deep copy from flat memory meta to flat memory meta
pos = 0;
char *flat_buf_2 = (char*)ob_malloc_align(4<<10, buf_size, ObMemAttr());
deep_copy_size = flat_meta_1->get_deep_copy_size();
ObSSTableMeta *flat_meta_2;
ret = flat_meta_1->deep_copy(flat_buf_2, deep_copy_size, pos, flat_meta_2);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(deep_copy_size, pos);
OB_LOG(INFO, "cooper", K(*flat_meta_1));
OB_LOG(INFO, "cooper", K(*flat_meta_2));
ASSERT_EQ(0, MEMCMP((char*)&flat_meta_1->basic_meta_, (char*)&flat_meta_2->basic_meta_, sizeof(flat_meta_1->basic_meta_)));
ASSERT_EQ(0, MEMCMP(&flat_meta_1->data_root_info_, &flat_meta_2->data_root_info_, sizeof(flat_meta_1->data_root_info_)));
ASSERT_EQ(0, MEMCMP(&flat_meta_1->macro_info_, &flat_meta_2->macro_info_, sizeof(flat_meta_1->macro_info_)));
ASSERT_EQ(0, MEMCMP((char*)&flat_meta_1->cg_sstables_, (char*)&flat_meta_2->cg_sstables_, sizeof(flat_meta_1->cg_sstables_)));
ASSERT_EQ(0, MEMCMP(flat_meta_1->column_checksums_, flat_meta_2->column_checksums_, flat_meta_1->column_checksum_count_ * sizeof(int64_t)));
ASSERT_EQ(flat_meta_2->tx_ctx_.len_, flat_meta_1->tx_ctx_.len_);
ASSERT_EQ(flat_meta_2->tx_ctx_.count_, flat_meta_1->tx_ctx_.count_);
ASSERT_NE(flat_meta_1->tx_ctx_.tx_descs_, flat_meta_2->tx_ctx_.tx_descs_);
ASSERT_EQ(0, MEMCMP(flat_meta_2->tx_ctx_.tx_descs_, flat_meta_1->tx_ctx_.tx_descs_, flat_meta_1->tx_ctx_.get_variable_size()));
}
TEST_F(TestMigrationSSTableParam, test_empty_sstable_serialize_and_deserialize)
{
ObMigrationSSTableParam mig_param;
@ -698,7 +762,6 @@ TEST_F(TestMigrationSSTableParam, test_migrate_sstable)
ASSERT_TRUE(dest_sstable_param.encrypt_id_ == src_sstable_param.encrypt_id_);
}
} // end namespace unittest
} // end namespace oceanbase