diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 225999f7f..25391c9cf 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -440,6 +440,7 @@ ob_set_subtarget(ob_storage compaction compaction/ob_schedule_dag_func.cpp compaction/ob_medium_compaction_func.cpp compaction/ob_medium_compaction_mgr.cpp + compaction/ob_medium_compaction_info.cpp compaction/ob_compaction_diagnose.cpp compaction/ob_compaction_suggestion.cpp compaction/ob_sstable_merge_info_mgr.cpp diff --git a/src/storage/compaction/ob_medium_compaction_info.cpp b/src/storage/compaction/ob_medium_compaction_info.cpp new file mode 100644 index 000000000..14b320533 --- /dev/null +++ b/src/storage/compaction/ob_medium_compaction_info.cpp @@ -0,0 +1,416 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX STORAGE +#include "storage/compaction/ob_medium_compaction_info.h" + +namespace oceanbase +{ +using namespace storage; + +namespace compaction +{ + +/* + * ObParallelMergeInfo + * */ + +void ObParallelMergeInfo::destroy() +{ + if (list_size_ > 0 && nullptr != parallel_end_key_list_ && nullptr != allocator_) { + for (int i = 0; i < list_size_; ++i) { + parallel_end_key_list_[i].destroy(*allocator_); + } + list_size_ = 0; + allocator_->free(parallel_end_key_list_); + parallel_end_key_list_ = nullptr; + allocator_ = nullptr; + } + parallel_info_ = 0; +} + +int ObParallelMergeInfo::serialize(char *buf, const int64_t buf_len, int64_t &pos) const +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(nullptr == buf || buf_len <= 0 || pos < 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret), K(buf), K(buf_len), K(pos)); + } else if (0 == list_size_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("no need to serialize parallel_merge_info", K(ret), K(list_size_)); + } else { + LST_DO_CODE(OB_UNIS_ENCODE, + parallel_info_); + for (int i = 0; OB_SUCC(ret) && i < list_size_; ++i) { + if (OB_FAIL(parallel_end_key_list_[i].serialize(buf, buf_len, pos))) { + LOG_WARN("failed to encode concurrent cnt", K(ret), K(i), K(list_size_), K(parallel_end_key_list_[i])); + } + } + } + return ret; +} + +int ObParallelMergeInfo::deserialize( + common::ObIAllocator &allocator, + const char *buf, + const int64_t data_len, + int64_t &pos) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(nullptr == buf || data_len <= 0 || pos < 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret), K(buf), K(data_len), K(pos)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, parallel_info_); + if (OB_FAIL(ret)) { + } else if (0 == list_size_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("list size is invalid", K(ret), K(list_size_)); + } else { + allocator_ = &allocator; + void *alloc_buf = nullptr; + if (OB_ISNULL(alloc_buf = allocator.alloc(sizeof(ObStoreRowkey) * list_size_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc store rowkey array", K(ret), K(list_size_)); + } else { + parallel_end_key_list_ = new(alloc_buf) ObStoreRowkey[list_size_]; + } + for (int i = 0; OB_SUCC(ret) && i < list_size_; ++i) { + if (OB_FAIL(parallel_end_key_list_[i].deserialize(allocator, buf, data_len, pos))) { + LOG_WARN("failed to encode concurrent cnt", K(ret), K(i), K(list_size_), K(data_len), K(pos)); + } + } + if (OB_FAIL(ret)) { + destroy(); // free parallel_end_key_list_ in destroy + } + } + } + return ret; +} + +int64_t ObParallelMergeInfo::get_serialize_size() const +{ + int64_t len = 0; + if (list_size_ > 0) { + len += serialization::encoded_length_vi32(parallel_info_); + for (int i = 0; i < list_size_; ++i) { + len += parallel_end_key_list_[i].get_serialize_size(); + } + } + return len; +} + +int ObParallelMergeInfo::generate_from_range_array( + ObIAllocator &allocator, + ObArrayArray ¶l_range) +{ + int ret = OB_SUCCESS; + void *buf = nullptr; + if (OB_UNLIKELY(0 != list_size_ || nullptr != parallel_end_key_list_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("parallel merge info is not empty", K(ret), KPC(this)); + } else { + int64_t sum_range_cnt = 0; + for (int64_t i = 0; i < paral_range.count(); ++i) { + sum_range_cnt += paral_range.at(i).count(); + } + if (sum_range_cnt <= VALID_CONCURRENT_CNT || sum_range_cnt > UINT8_MAX) { + // do nothing + } else if (FALSE_IT(list_size_ = sum_range_cnt - 1)) { + } else if (OB_ISNULL(buf = allocator.alloc(sizeof(ObStoreRowkey) * list_size_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate", K(ret), K(paral_range)); + } else { + allocator_ = &allocator; + parallel_end_key_list_ = new(buf) ObStoreRowkey[list_size_]; + int64_t cnt = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < paral_range.count() && cnt < list_size_; ++i) { + const ObIArray &range_array = paral_range.at(i); + for (int64_t j = 0; OB_SUCC(ret) && j < range_array.count() && cnt < list_size_; ++j) { + if (OB_FAIL(range_array.at(j).get_end_key().deep_copy(parallel_end_key_list_[cnt++], allocator))) { + LOG_WARN("failed to deep copy end key", K(ret), K(i), K(range_array), K(j), K(cnt)); + } + } + } // end of loop array + } + } + LOG_DEBUG("parallel range info", K(ret), KPC(this), K(paral_range), K(paral_range.count()), K(paral_range.at(0))); + + if (OB_FAIL(ret)) { + destroy(); + } else if (get_serialize_size() > MAX_PARALLEL_RANGE_SERIALIZE_LEN) { + ret = OB_SIZE_OVERFLOW; + LOG_DEBUG("parallel range info is too large to sync", K(ret), KPC(this)); + destroy(); + } + return ret; +} + +int ObParallelMergeInfo::init( + common::ObIAllocator &allocator, + const ObParallelMergeInfo &other) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!other.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("other parallel info is invalid", K(ret), K(other)); + } else { + list_size_ = other.list_size_; + allocator_ = &allocator; + if (list_size_ > 0) { + void *buf = nullptr; + if (OB_ISNULL(buf = allocator.alloc(sizeof(ObStoreRowkey) * list_size_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate", K(ret), K(other)); + } else { + parallel_end_key_list_ = new (buf) ObStoreRowkey[list_size_]; + for (int i = 0; OB_SUCC(ret) && i < list_size_; ++i) { + if (OB_FAIL(other.parallel_end_key_list_[i].deep_copy(parallel_end_key_list_[i], allocator))) { + LOG_WARN("failed to deep copy end key", K(ret), K(i), K(other.parallel_end_key_list_[i])); + } + } + if (OB_FAIL(ret)) { + destroy(); + } + } // else + } + } + return ret; +} + +int64_t ObParallelMergeInfo::to_string(char* buf, const int64_t buf_len) const +{ + int64_t pos = 0; + if (OB_ISNULL(buf) || buf_len <= 0) { + } else { + J_OBJ_START(); + J_KV(K_(list_size)); + J_COMMA(); + for (int i = 0; i < list_size_; ++i) { + J_KV(K(i), "key", parallel_end_key_list_[i]); + J_COMMA(); + } + J_OBJ_END(); + } + return pos; +} + +/* + * ObMediumCompactionInfo + * */ +const char *ObMediumCompactionInfo::ObCompactionTypeStr[] = { + "MEDIUM_COMPACTION", + "MAJOR_COMPACTION", +}; + +const char *ObMediumCompactionInfo::get_compaction_type_str(enum ObCompactionType type) +{ + const char *str = ""; + if (type >= COMPACTION_TYPE_MAX || type < MEDIUM_COMPACTION) { + str = "invalid_type"; + } else { + str = ObCompactionTypeStr[type]; + } + return str; +} + +ObMediumCompactionInfo::ObMediumCompactionInfo() + : ObIMultiSourceDataUnit(), + medium_compat_version_(MEIDUM_COMPAT_VERSION), + compaction_type_(COMPACTION_TYPE_MAX), + contain_parallel_range_(false), + medium_merge_reason_(ObAdaptiveMergePolicy::NONE), + reserved_(0), + cluster_id_(0), + data_version_(0), + medium_snapshot_(0), + storage_schema_(), + parallel_merge_info_() +{ + STATIC_ASSERT(static_cast(COMPACTION_TYPE_MAX) == ARRAYSIZEOF(ObCompactionTypeStr), "compaction type str len is mismatch"); +} + +ObMediumCompactionInfo::~ObMediumCompactionInfo() +{ + reset(); +} + +int ObMediumCompactionInfo::init( + ObIAllocator &allocator, + const ObMediumCompactionInfo &medium_info) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!medium_info.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(medium_info)); + } else if (OB_FAIL(storage_schema_.init(allocator, medium_info.storage_schema_))) { + LOG_WARN("failed to init storage schema", K(ret), K(medium_info)); + } else if (OB_FAIL(parallel_merge_info_.init(allocator, medium_info.parallel_merge_info_))) { + LOG_WARN("failed to init parallel merge info", K(ret), K(medium_info)); + } else { + info_ = medium_info.info_; + cluster_id_ = medium_info.cluster_id_; + medium_snapshot_ = medium_info.medium_snapshot_; + data_version_ = medium_info.data_version_; + } + return ret; +} + +bool ObMediumCompactionInfo::is_valid() const +{ + return COMPACTION_TYPE_MAX != compaction_type_ + && medium_snapshot_ > 0 + && data_version_ > 0 + && storage_schema_.is_valid() + && parallel_merge_info_.is_valid(); +} + +void ObMediumCompactionInfo::reset() +{ + info_ = 0; + medium_compat_version_ = 0; + compaction_type_ = COMPACTION_TYPE_MAX; + cluster_id_ = 0; + medium_snapshot_ = 0; + data_version_ = 0; + storage_schema_.reset(); + parallel_merge_info_.destroy(); +} + +int ObMediumCompactionInfo::deep_copy(const ObIMultiSourceDataUnit *src, ObIAllocator *allocator) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(nullptr == src || nullptr == allocator)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), KP(src), KP(allocator)); + } else if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != src->type())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), "type", src->type(), KP(allocator)); + } else { + ret = init(*allocator, *static_cast(src)); + } + return ret; +} + +int ObMediumCompactionInfo::save_storage_schema( + ObIAllocator &allocator, + const storage::ObStorageSchema &storage_schema) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!storage_schema.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(storage_schema)); + } else if (OB_FAIL(storage_schema_.init(allocator, storage_schema))) { + LOG_WARN("failed to init storage schema", K(ret), K(storage_schema)); + } + return ret; +} + +int ObMediumCompactionInfo::gene_parallel_info( + ObIAllocator &allocator, + ObArrayArray ¶l_range) +{ + int ret = OB_SUCCESS; + contain_parallel_range_ = false; + if (OB_FAIL(parallel_merge_info_.generate_from_range_array(allocator, paral_range))) { + if (OB_UNLIKELY(OB_SIZE_OVERFLOW != ret)) { + LOG_WARN("failed to generate parallel merge info", K(ret), K(paral_range)); + } else { + ret = OB_SUCCESS; + } + } else if (parallel_merge_info_.list_size_ > 0) { + contain_parallel_range_ = true; + LOG_INFO("success to gene parallel info", K(ret), K(contain_parallel_range_), K(parallel_merge_info_)); + } + return ret; +} + +int ObMediumCompactionInfo::serialize(char *buf, const int64_t buf_len, int64_t &pos) const +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(nullptr == buf || buf_len <= 0 || pos < 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret), K(buf), K(buf_len), K(pos)); + } else { + LST_DO_CODE( + OB_UNIS_ENCODE, + info_, + cluster_id_, + medium_snapshot_, + data_version_, + storage_schema_); + if (contain_parallel_range_) { + LST_DO_CODE( + OB_UNIS_ENCODE, + parallel_merge_info_); + } + LOG_DEBUG("ObMediumCompactionInfo::serialize", K(ret), K(buf), K(buf_len), K(pos)); + } + return ret; +} + +int ObMediumCompactionInfo::deserialize( + common::ObIAllocator &allocator, + const char *buf, + const int64_t data_len, + int64_t &pos) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(nullptr == buf || data_len <= 0 || pos < 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret), K(buf), K(data_len), K(pos)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, + info_, + cluster_id_, + medium_snapshot_, + data_version_); + if (OB_FAIL(ret)) { + } else if (OB_FAIL(storage_schema_.deserialize(allocator, buf, data_len, pos))) { + LOG_WARN("failed to deserialize storage schema", K(ret)); + } else if (contain_parallel_range_) { + if (OB_FAIL(parallel_merge_info_.deserialize(allocator, buf, data_len, pos))) { + LOG_WARN("failed to deserialize parallel merge info", K(ret), K(buf), K(data_len), K(pos)); + } + } else { + clear_parallel_range(); + LOG_DEBUG("ObMediumCompactionInfo::deserialize", K(ret), K(buf), K(data_len), K(pos)); + } + } + return ret; +} + +int64_t ObMediumCompactionInfo::get_serialize_size() const +{ + int64_t len = 0; + LST_DO_CODE( + OB_UNIS_ADD_LEN, + info_, + cluster_id_, + medium_snapshot_, + data_version_, + storage_schema_); + if (contain_parallel_range_) { + LST_DO_CODE(OB_UNIS_ADD_LEN, parallel_merge_info_); + } + return len; +} + +void ObMediumCompactionInfo::gene_info( + char* buf, const int64_t buf_len, int64_t &pos) const +{ + J_KV("compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_), + K(medium_snapshot_), K_(parallel_merge_info)); +} + +} //namespace compaction +} // namespace oceanbase diff --git a/src/storage/compaction/ob_medium_compaction_info.h b/src/storage/compaction/ob_medium_compaction_info.h new file mode 100644 index 000000000..9a016c2d0 --- /dev/null +++ b/src/storage/compaction/ob_medium_compaction_info.h @@ -0,0 +1,164 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OB_STORAGE_COMPACTION_MEDIUM_COMPACTION_INFO_H_ +#define OB_STORAGE_COMPACTION_MEDIUM_COMPACTION_INFO_H_ + +#include "storage/ob_storage_schema.h" +#include "lib/container/ob_array_array.h" +#include "storage/compaction/ob_partition_merge_policy.h" +#include "observer/ob_server_struct.h" + +namespace oceanbase +{ +namespace storage +{ +class ObTablet; +} +namespace compaction +{ + +struct ObParallelMergeInfo +{ +public: + ObParallelMergeInfo() + : parallel_info_(0), + parallel_end_key_list_(nullptr), + allocator_(nullptr) + {} + ~ObParallelMergeInfo() { destroy(); } // attention!!! use destroy to free memory + int init(common::ObIAllocator &allocator, const ObParallelMergeInfo &other); + void destroy(); + bool is_valid() const + { + return list_size_ == 0 || nullptr != parallel_end_key_list_; + } + + // serialize & deserialize + int serialize(char *buf, const int64_t buf_len, int64_t &pos) const; + int deserialize( + common::ObIAllocator &allocator, + const char *buf, + const int64_t data_len, + int64_t &pos); + int64_t get_serialize_size() const; + + int generate_from_range_array( + ObIAllocator &allocator, + common::ObArrayArray ¶l_range); + + int64_t to_string(char* buf, const int64_t buf_len) const; + static const int64_t MAX_PARALLEL_RANGE_SERIALIZE_LEN = 1 * 1024 * 1024; + static const int64_t VALID_CONCURRENT_CNT = 1; + + union { + uint32_t parallel_info_; + struct { + uint32_t compat_ : 4; + uint32_t list_size_ : 8; + uint32_t reserved_ : 20; + }; + }; + ObStoreRowkey *parallel_end_key_list_; // concurrent_cnt - 1 + + ObIAllocator *allocator_; +}; + +struct ObMediumCompactionInfo : public memtable::ObIMultiSourceDataUnit +{ +public: + enum ObCompactionType + { + MEDIUM_COMPACTION = 0, + MAJOR_COMPACTION = 1, + COMPACTION_TYPE_MAX, + }; + const static char *ObCompactionTypeStr[]; + const static char *get_compaction_type_str(enum ObCompactionType type); +public: + ObMediumCompactionInfo(); + ~ObMediumCompactionInfo(); + + int init(ObIAllocator &allocator, const ObMediumCompactionInfo &medium_info); + int save_storage_schema(ObIAllocator &allocator, const storage::ObStorageSchema &storage_schema); + int gene_parallel_info( + ObIAllocator &allocator, + common::ObArrayArray ¶l_range); + static inline bool is_medium_compaction(const ObCompactionType type) { return MEDIUM_COMPACTION == type; } + static inline bool is_major_compaction(const ObCompactionType type) { return MAJOR_COMPACTION == type; } + inline bool is_major_compaction() const { return is_major_compaction((ObCompactionType)compaction_type_); } + inline bool is_medium_compaction() const { return is_medium_compaction((ObCompactionType)compaction_type_); } + inline void clear_parallel_range() + { + parallel_merge_info_.list_size_ = 0; + parallel_merge_info_.parallel_end_key_list_ = nullptr; + contain_parallel_range_ = false; + } + + // ObIMultiSourceDataUnit section + virtual int deep_copy(const ObIMultiSourceDataUnit *src, ObIAllocator *allocator) override; + virtual void reset() override; + virtual bool is_valid() const override; + virtual inline int64_t get_data_size() const override { return sizeof(ObMediumCompactionInfo); } + virtual inline memtable::MultiSourceDataUnitType type() const override + { + return memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO; + } + virtual int64_t get_version() const override { return medium_snapshot_; } + virtual bool is_save_last() const override { return false; } + bool from_cur_cluster() const { return cluster_id_ == GCONF.cluster_id; } + + // serialize & deserialize + int serialize(char *buf, const int64_t buf_len, int64_t &pos) const; + int deserialize( + common::ObIAllocator &allocator, + const char *buf, + const int64_t data_len, + int64_t &pos); + int64_t get_serialize_size() const; + + void gene_info(char* buf, const int64_t buf_len, int64_t &pos) const; + TO_STRING_KV(K_(cluster_id), K_(medium_compat_version), K_(data_version), + "compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_), + "medium_merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(medium_merge_reason_), K_(cluster_id), + K_(medium_snapshot), K_(storage_schema), + K_(contain_parallel_range), K_(parallel_merge_info)); +public: + static const int64_t MEIDUM_COMPAT_VERSION = 1; + +private: + static const int32_t SCS_ONE_BIT = 1; + static const int32_t SCS_RESERVED_BITS = 49; + +public: + union { + uint64_t info_; + struct { + uint64_t medium_compat_version_ : 4; + uint64_t compaction_type_ : 2; + uint64_t contain_parallel_range_ : SCS_ONE_BIT; + uint64_t medium_merge_reason_ : 8; + uint64_t reserved_ : SCS_RESERVED_BITS; + }; + }; + + uint64_t cluster_id_; // for backup database to throw MEDIUM_COMPACTION clog + uint64_t data_version_; + int64_t medium_snapshot_; + storage::ObStorageSchema storage_schema_; + ObParallelMergeInfo parallel_merge_info_; +}; + +} // namespace compaction +} // namespace oceanbase + +#endif // OB_STORAGE_COMPACTION_MEDIUM_COMPACTION_INFO_H_ diff --git a/src/storage/compaction/ob_medium_compaction_mgr.cpp b/src/storage/compaction/ob_medium_compaction_mgr.cpp index b475e0863..b119bd3d5 100644 --- a/src/storage/compaction/ob_medium_compaction_mgr.cpp +++ b/src/storage/compaction/ob_medium_compaction_mgr.cpp @@ -22,399 +22,6 @@ using namespace storage; namespace compaction { - -/* - * ObParallelMergeInfo - * */ - -void ObParallelMergeInfo::destroy() -{ - if (list_size_ > 0 && nullptr != parallel_end_key_list_ && nullptr != allocator_) { - for (int i = 0; i < list_size_; ++i) { - parallel_end_key_list_[i].destroy(*allocator_); - } - list_size_ = 0; - allocator_->free(parallel_end_key_list_); - parallel_end_key_list_ = nullptr; - allocator_ = nullptr; - } - parallel_info_ = 0; -} - -int ObParallelMergeInfo::serialize(char *buf, const int64_t buf_len, int64_t &pos) const -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == buf || buf_len <= 0 || pos < 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", K(ret), K(buf), K(buf_len), K(pos)); - } else if (0 == list_size_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("no need to serialize parallel_merge_info", K(ret), K(list_size_)); - } else { - LST_DO_CODE(OB_UNIS_ENCODE, - parallel_info_); - for (int i = 0; OB_SUCC(ret) && i < list_size_; ++i) { - if (OB_FAIL(parallel_end_key_list_[i].serialize(buf, buf_len, pos))) { - LOG_WARN("failed to encode concurrent cnt", K(ret), K(i), K(list_size_), K(parallel_end_key_list_[i])); - } - } - } - return ret; -} - -int ObParallelMergeInfo::deserialize( - common::ObIAllocator &allocator, - const char *buf, - const int64_t data_len, - int64_t &pos) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == buf || data_len <= 0 || pos < 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", K(ret), K(buf), K(data_len), K(pos)); - } else { - LST_DO_CODE(OB_UNIS_DECODE, parallel_info_); - if (OB_FAIL(ret)) { - } else if (0 == list_size_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("list size is invalid", K(ret), K(list_size_)); - } else { - allocator_ = &allocator; - void *alloc_buf = nullptr; - if (OB_ISNULL(alloc_buf = allocator.alloc(sizeof(ObStoreRowkey) * list_size_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to alloc store rowkey array", K(ret), K(list_size_)); - } else { - parallel_end_key_list_ = new(alloc_buf) ObStoreRowkey[list_size_]; - } - for (int i = 0; OB_SUCC(ret) && i < list_size_; ++i) { - if (OB_FAIL(parallel_end_key_list_[i].deserialize(allocator, buf, data_len, pos))) { - LOG_WARN("failed to encode concurrent cnt", K(ret), K(i), K(list_size_), K(data_len), K(pos)); - } - } - if (OB_FAIL(ret)) { - destroy(); // free parallel_end_key_list_ in destroy - } - } - } - return ret; -} - -int64_t ObParallelMergeInfo::get_serialize_size() const -{ - int64_t len = 0; - if (list_size_ > 0) { - len += serialization::encoded_length_vi32(parallel_info_); - for (int i = 0; i < list_size_; ++i) { - len += parallel_end_key_list_[i].get_serialize_size(); - } - } - return len; -} - -int ObParallelMergeInfo::generate_from_range_array( - ObIAllocator &allocator, - ObArrayArray ¶l_range) -{ - int ret = OB_SUCCESS; - void *buf = nullptr; - if (OB_UNLIKELY(0 != list_size_ || nullptr != parallel_end_key_list_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("parallel merge info is not empty", K(ret), KPC(this)); - } else { - int64_t sum_range_cnt = 0; - for (int64_t i = 0; i < paral_range.count(); ++i) { - sum_range_cnt += paral_range.at(i).count(); - } - if (sum_range_cnt <= VALID_CONCURRENT_CNT || sum_range_cnt > UINT8_MAX) { - // do nothing - } else if (FALSE_IT(list_size_ = sum_range_cnt - 1)) { - } else if (OB_ISNULL(buf = allocator.alloc(sizeof(ObStoreRowkey) * list_size_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate", K(ret), K(paral_range)); - } else { - allocator_ = &allocator; - parallel_end_key_list_ = new(buf) ObStoreRowkey[list_size_]; - int64_t cnt = 0; - for (int64_t i = 0; OB_SUCC(ret) && i < paral_range.count() && cnt < list_size_; ++i) { - const ObIArray &range_array = paral_range.at(i); - for (int64_t j = 0; OB_SUCC(ret) && j < range_array.count() && cnt < list_size_; ++j) { - if (OB_FAIL(range_array.at(j).get_end_key().deep_copy(parallel_end_key_list_[cnt++], allocator))) { - LOG_WARN("failed to deep copy end key", K(ret), K(i), K(range_array), K(j), K(cnt)); - } - } - } // end of loop array - } - } - LOG_DEBUG("parallel range info", K(ret), KPC(this), K(paral_range), K(paral_range.count()), K(paral_range.at(0))); - - if (OB_FAIL(ret)) { - destroy(); - } else if (get_serialize_size() > MAX_PARALLEL_RANGE_SERIALIZE_LEN) { - ret = OB_SIZE_OVERFLOW; - LOG_DEBUG("parallel range info is too large to sync", K(ret), KPC(this)); - destroy(); - } - return ret; -} - -int ObParallelMergeInfo::init( - common::ObIAllocator &allocator, - const ObParallelMergeInfo &other) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!other.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("other parallel info is invalid", K(ret), K(other)); - } else { - list_size_ = other.list_size_; - allocator_ = &allocator; - if (list_size_ > 0) { - void *buf = nullptr; - if (OB_ISNULL(buf = allocator.alloc(sizeof(ObStoreRowkey) * list_size_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate", K(ret), K(other)); - } else { - parallel_end_key_list_ = new (buf) ObStoreRowkey[list_size_]; - for (int i = 0; OB_SUCC(ret) && i < list_size_; ++i) { - if (OB_FAIL(other.parallel_end_key_list_[i].deep_copy(parallel_end_key_list_[i], allocator))) { - LOG_WARN("failed to deep copy end key", K(ret), K(i), K(other.parallel_end_key_list_[i])); - } - } - if (OB_FAIL(ret)) { - destroy(); - } - } // else - } - } - return ret; -} - -int64_t ObParallelMergeInfo::to_string(char* buf, const int64_t buf_len) const -{ - int64_t pos = 0; - if (OB_ISNULL(buf) || buf_len <= 0) { - } else { - J_OBJ_START(); - J_KV(K_(list_size)); - J_COMMA(); - for (int i = 0; i < list_size_; ++i) { - J_KV(K(i), "key", parallel_end_key_list_[i]); - J_COMMA(); - } - J_OBJ_END(); - } - return pos; -} - -/* - * ObMediumCompactionInfo - * */ -const char *ObMediumCompactionInfo::ObCompactionTypeStr[] = { - "MEDIUM_COMPACTION", - "MAJOR_COMPACTION", -}; - -const char *ObMediumCompactionInfo::get_compaction_type_str(enum ObCompactionType type) -{ - const char *str = ""; - if (type >= COMPACTION_TYPE_MAX || type < MEDIUM_COMPACTION) { - str = "invalid_type"; - } else { - str = ObCompactionTypeStr[type]; - } - return str; -} - -ObMediumCompactionInfo::ObMediumCompactionInfo() - : ObIMultiSourceDataUnit(), - medium_compat_version_(MEIDUM_COMPAT_VERSION), - compaction_type_(COMPACTION_TYPE_MAX), - contain_parallel_range_(false), - medium_merge_reason_(ObAdaptiveMergePolicy::NONE), - reserved_(0), - cluster_id_(0), - data_version_(0), - medium_snapshot_(0), - storage_schema_(), - parallel_merge_info_() -{ - STATIC_ASSERT(static_cast(COMPACTION_TYPE_MAX) == ARRAYSIZEOF(ObCompactionTypeStr), "compaction type str len is mismatch"); -} - -ObMediumCompactionInfo::~ObMediumCompactionInfo() -{ - reset(); -} - -int ObMediumCompactionInfo::init( - ObIAllocator &allocator, - const ObMediumCompactionInfo &medium_info) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!medium_info.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(medium_info)); - } else if (OB_FAIL(storage_schema_.init(allocator, medium_info.storage_schema_))) { - LOG_WARN("failed to init storage schema", K(ret), K(medium_info)); - } else if (OB_FAIL(parallel_merge_info_.init(allocator, medium_info.parallel_merge_info_))) { - LOG_WARN("failed to init parallel merge info", K(ret), K(medium_info)); - } else { - info_ = medium_info.info_; - cluster_id_ = medium_info.cluster_id_; - medium_snapshot_ = medium_info.medium_snapshot_; - data_version_ = medium_info.data_version_; - } - return ret; -} - -bool ObMediumCompactionInfo::is_valid() const -{ - return COMPACTION_TYPE_MAX != compaction_type_ - && medium_snapshot_ > 0 - && data_version_ > 0 - && storage_schema_.is_valid() - && parallel_merge_info_.is_valid(); -} - -void ObMediumCompactionInfo::reset() -{ - info_ = 0; - medium_compat_version_ = 0; - compaction_type_ = COMPACTION_TYPE_MAX; - cluster_id_ = 0; - medium_snapshot_ = 0; - data_version_ = 0; - storage_schema_.reset(); - parallel_merge_info_.destroy(); -} - -int ObMediumCompactionInfo::deep_copy(const ObIMultiSourceDataUnit *src, ObIAllocator *allocator) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == src || nullptr == allocator)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), KP(src), KP(allocator)); - } else if (OB_UNLIKELY(memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO != src->type())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), "type", src->type(), KP(allocator)); - } else { - ret = init(*allocator, *static_cast(src)); - } - return ret; -} - -int ObMediumCompactionInfo::save_storage_schema( - ObIAllocator &allocator, - const storage::ObStorageSchema &storage_schema) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!storage_schema.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(storage_schema)); - } else if (OB_FAIL(storage_schema_.init(allocator, storage_schema))) { - LOG_WARN("failed to init storage schema", K(ret), K(storage_schema)); - } - return ret; -} - -int ObMediumCompactionInfo::gene_parallel_info( - ObIAllocator &allocator, - ObArrayArray ¶l_range) -{ - int ret = OB_SUCCESS; - contain_parallel_range_ = false; - if (OB_FAIL(parallel_merge_info_.generate_from_range_array(allocator, paral_range))) { - if (OB_UNLIKELY(OB_SIZE_OVERFLOW != ret)) { - LOG_WARN("failed to generate parallel merge info", K(ret), K(paral_range)); - } else { - ret = OB_SUCCESS; - } - } else if (parallel_merge_info_.list_size_ > 0) { - contain_parallel_range_ = true; - LOG_INFO("success to gene parallel info", K(ret), K(contain_parallel_range_), K(parallel_merge_info_)); - } - return ret; -} - -int ObMediumCompactionInfo::serialize(char *buf, const int64_t buf_len, int64_t &pos) const -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == buf || buf_len <= 0 || pos < 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", K(ret), K(buf), K(buf_len), K(pos)); - } else { - LST_DO_CODE( - OB_UNIS_ENCODE, - info_, - cluster_id_, - medium_snapshot_, - data_version_, - storage_schema_); - if (contain_parallel_range_) { - LST_DO_CODE( - OB_UNIS_ENCODE, - parallel_merge_info_); - } - LOG_DEBUG("ObMediumCompactionInfo::serialize", K(ret), K(buf), K(buf_len), K(pos)); - } - return ret; -} - -int ObMediumCompactionInfo::deserialize( - common::ObIAllocator &allocator, - const char *buf, - const int64_t data_len, - int64_t &pos) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == buf || data_len <= 0 || pos < 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", K(ret), K(buf), K(data_len), K(pos)); - } else { - LST_DO_CODE(OB_UNIS_DECODE, - info_, - cluster_id_, - medium_snapshot_, - data_version_); - if (OB_FAIL(ret)) { - } else if (OB_FAIL(storage_schema_.deserialize(allocator, buf, data_len, pos))) { - LOG_WARN("failed to deserialize storage schema", K(ret)); - } else if (contain_parallel_range_) { - if (OB_FAIL(parallel_merge_info_.deserialize(allocator, buf, data_len, pos))) { - LOG_WARN("failed to deserialize parallel merge info", K(ret), K(buf), K(data_len), K(pos)); - } - } else { - clear_parallel_range(); - LOG_DEBUG("ObMediumCompactionInfo::deserialize", K(ret), K(buf), K(data_len), K(pos)); - } - } - return ret; -} - -int64_t ObMediumCompactionInfo::get_serialize_size() const -{ - int64_t len = 0; - LST_DO_CODE( - OB_UNIS_ADD_LEN, - info_, - cluster_id_, - medium_snapshot_, - data_version_, - storage_schema_); - if (contain_parallel_range_) { - LST_DO_CODE(OB_UNIS_ADD_LEN, parallel_merge_info_); - } - return len; -} - -void ObMediumCompactionInfo::gene_info( - char* buf, const int64_t buf_len, int64_t &pos) const -{ - J_KV("compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_), - K(medium_snapshot_), K_(parallel_merge_info)); -} - /* * ObTabletMediumCompactionInfoRecorder * */ diff --git a/src/storage/compaction/ob_medium_compaction_mgr.h b/src/storage/compaction/ob_medium_compaction_mgr.h index 87dc4647c..483c6d262 100644 --- a/src/storage/compaction/ob_medium_compaction_mgr.h +++ b/src/storage/compaction/ob_medium_compaction_mgr.h @@ -18,6 +18,7 @@ #include "lib/container/ob_array_array.h" #include "storage/meta_mem/ob_tablet_handle.h" #include "storage/compaction/ob_partition_merge_policy.h" +#include "storage/compaction/ob_medium_compaction_info.h" namespace oceanbase { @@ -27,138 +28,6 @@ class ObTablet; } namespace compaction { - -struct ObParallelMergeInfo -{ -public: - ObParallelMergeInfo() - : parallel_info_(0), - parallel_end_key_list_(nullptr), - allocator_(nullptr) - {} - ~ObParallelMergeInfo() { destroy(); } // attention!!! use destroy to free memory - int init(common::ObIAllocator &allocator, const ObParallelMergeInfo &other); - void destroy(); - bool is_valid() const - { - return list_size_ == 0 || nullptr != parallel_end_key_list_; - } - - // serialize & deserialize - int serialize(char *buf, const int64_t buf_len, int64_t &pos) const; - int deserialize( - common::ObIAllocator &allocator, - const char *buf, - const int64_t data_len, - int64_t &pos); - int64_t get_serialize_size() const; - - int generate_from_range_array( - ObIAllocator &allocator, - common::ObArrayArray ¶l_range); - - int64_t to_string(char* buf, const int64_t buf_len) const; - static const int64_t MAX_PARALLEL_RANGE_SERIALIZE_LEN = 1 * 1024 * 1024; - static const int64_t VALID_CONCURRENT_CNT = 1; - - union { - uint32_t parallel_info_; - struct { - uint32_t compat_ : 4; - uint32_t list_size_ : 8; - uint32_t reserved_ : 20; - }; - }; - ObStoreRowkey *parallel_end_key_list_; // concurrent_cnt - 1 - - ObIAllocator *allocator_; -}; - -struct ObMediumCompactionInfo : public memtable::ObIMultiSourceDataUnit -{ -public: - enum ObCompactionType - { - MEDIUM_COMPACTION = 0, - MAJOR_COMPACTION = 1, - COMPACTION_TYPE_MAX, - }; - const static char *ObCompactionTypeStr[]; - const static char *get_compaction_type_str(enum ObCompactionType type); -public: - ObMediumCompactionInfo(); - ~ObMediumCompactionInfo(); - - int init(ObIAllocator &allocator, const ObMediumCompactionInfo &medium_info); - int save_storage_schema(ObIAllocator &allocator, const storage::ObStorageSchema &storage_schema); - int gene_parallel_info( - ObIAllocator &allocator, - common::ObArrayArray ¶l_range); - static inline bool is_medium_compaction(const ObCompactionType type) { return MEDIUM_COMPACTION == type; } - static inline bool is_major_compaction(const ObCompactionType type) { return MAJOR_COMPACTION == type; } - inline bool is_major_compaction() const { return is_major_compaction((ObCompactionType)compaction_type_); } - inline bool is_medium_compaction() const { return is_medium_compaction((ObCompactionType)compaction_type_); } - inline void clear_parallel_range() - { - parallel_merge_info_.list_size_ = 0; - parallel_merge_info_.parallel_end_key_list_ = nullptr; - contain_parallel_range_ = false; - } - - // ObIMultiSourceDataUnit section - virtual int deep_copy(const ObIMultiSourceDataUnit *src, ObIAllocator *allocator) override; - virtual void reset() override; - virtual bool is_valid() const override; - virtual inline int64_t get_data_size() const override { return sizeof(ObMediumCompactionInfo); } - virtual inline memtable::MultiSourceDataUnitType type() const override - { - return memtable::MultiSourceDataUnitType::MEDIUM_COMPACTION_INFO; - } - virtual int64_t get_version() const override { return medium_snapshot_; } - virtual bool is_save_last() const override { return false; } - bool from_cur_cluster() const { return cluster_id_ == GCONF.cluster_id; } - - // serialize & deserialize - int serialize(char *buf, const int64_t buf_len, int64_t &pos) const; - int deserialize( - common::ObIAllocator &allocator, - const char *buf, - const int64_t data_len, - int64_t &pos); - int64_t get_serialize_size() const; - - void gene_info(char* buf, const int64_t buf_len, int64_t &pos) const; - TO_STRING_KV(K_(cluster_id), K_(medium_compat_version), K_(data_version), - "compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_), - "medium_merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(medium_merge_reason_), K_(cluster_id), - K_(medium_snapshot), K_(storage_schema), - K_(contain_parallel_range), K_(parallel_merge_info)); -public: - static const int64_t MEIDUM_COMPAT_VERSION = 1; - -private: - static const int32_t SCS_ONE_BIT = 1; - static const int32_t SCS_RESERVED_BITS = 49; - -public: - union { - uint64_t info_; - struct { - uint64_t medium_compat_version_ : 4; - uint64_t compaction_type_ : 2; - uint64_t contain_parallel_range_ : SCS_ONE_BIT; - uint64_t medium_merge_reason_ : 8; - uint64_t reserved_ : SCS_RESERVED_BITS; - }; - }; - - uint64_t cluster_id_; // for backup database to throw MEDIUM_COMPACTION clog - uint64_t data_version_; - int64_t medium_snapshot_; - storage::ObStorageSchema storage_schema_; - ObParallelMergeInfo parallel_merge_info_; -}; - class ObTabletMediumCompactionInfoRecorder : public storage::ObIStorageClogRecorder { public: diff --git a/src/storage/tablet/ob_tablet_meta.cpp b/src/storage/tablet/ob_tablet_meta.cpp index 01de52bec..bf25b0d46 100644 --- a/src/storage/tablet/ob_tablet_meta.cpp +++ b/src/storage/tablet/ob_tablet_meta.cpp @@ -809,9 +809,9 @@ ObMigrationTabletParam::ObMigrationTabletParam() ddl_start_scn_(SCN::min_scn()), ddl_snapshot_version_(OB_INVALID_TIMESTAMP), max_sync_storage_schema_version_(0), - max_serialized_medium_scn_(0), ddl_execution_id_(-1), - ddl_cluster_version_(0) + ddl_cluster_version_(0), + max_serialized_medium_scn_(0) { } @@ -892,12 +892,12 @@ int ObMigrationTabletParam::serialize(char *buf, const int64_t len, int64_t &pos LOG_WARN("failed to serialize ddl snapshot version", K(ret), K(len), K(new_pos), K_(ddl_snapshot_version)); } else if (OB_FAIL(serialization::encode_i64(buf, len, new_pos, max_sync_storage_schema_version_))) { LOG_WARN("failed to serialize max_sync_storage_schema_version", K(ret), K(len), K(new_pos), K_(max_sync_storage_schema_version)); - } else if (OB_FAIL(serialization::encode_i64(buf, len, new_pos, max_serialized_medium_scn_))) { - LOG_WARN("failed to serialize max_serialized_medium_scn", K(ret), K(len), K(new_pos), K_(max_serialized_medium_scn)); } else if (OB_FAIL(serialization::encode_i64(buf, len, new_pos, ddl_execution_id_))) { LOG_WARN("failed to serialize ddl execution id", K(ret), K(len), K(new_pos), K_(ddl_execution_id)); } else if (OB_FAIL(serialization::encode_i64(buf, len, new_pos, ddl_cluster_version_))) { LOG_WARN("failed to serialize ddl cluster version", K(ret), K(len), K(new_pos), K_(ddl_cluster_version)); + } else if (OB_FAIL(serialization::encode_i64(buf, len, new_pos, max_serialized_medium_scn_))) { + LOG_WARN("failed to serialize max_serialized_medium_scn", K(ret), K(len), K(new_pos), K_(max_serialized_medium_scn)); } else { pos = new_pos; } @@ -961,12 +961,12 @@ int ObMigrationTabletParam::deserialize(const char *buf, const int64_t len, int6 LOG_WARN("failed to deserialize ddl snapshot version", K(ret), K(len), K(new_pos)); } else if (OB_FAIL(serialization::decode_i64(buf, len, new_pos, &max_sync_storage_schema_version_))) { LOG_WARN("failed to deserialize max sync storage schema version", K(ret), K(len), K(new_pos)); - } else if (OB_FAIL(serialization::decode_i64(buf, len, new_pos, &max_serialized_medium_scn_))) { - LOG_WARN("failed to deserialize max sync medium snapshot", K(ret), K(len), K(new_pos)); } else if (OB_FAIL(serialization::decode_i64(buf, len, new_pos, &ddl_execution_id_))) { LOG_WARN("failed to deserialize ddl execution id", K(ret), K(len), K(new_pos)); } else if (OB_FAIL(serialization::decode_i64(buf, len, new_pos, &ddl_cluster_version_))) { LOG_WARN("failed to deserialize ddl cluster version", K(ret), K(len), K(new_pos)); + } else if (OB_FAIL(serialization::decode_i64(buf, len, new_pos, &max_serialized_medium_scn_))) { + LOG_WARN("failed to deserialize max sync medium snapshot", K(ret), K(len), K(new_pos)); } else { compat_mode_ = static_cast(compat_mode); pos = new_pos; @@ -1000,9 +1000,9 @@ int64_t ObMigrationTabletParam::get_serialize_size() const size += ddl_start_scn_.get_fixed_serialize_size(); size += serialization::encoded_length_i64(ddl_snapshot_version_); size += serialization::encoded_length_i64(max_sync_storage_schema_version_); - size += serialization::encoded_length_i64(max_serialized_medium_scn_); size += serialization::encoded_length_i64(ddl_execution_id_); size += serialization::encoded_length_i64(ddl_cluster_version_); + size += serialization::encoded_length_i64(max_serialized_medium_scn_); return size; } @@ -1029,9 +1029,9 @@ void ObMigrationTabletParam::reset() ddl_start_scn_.set_min(); ddl_snapshot_version_ = OB_INVALID_TIMESTAMP; max_sync_storage_schema_version_ = 0; - max_serialized_medium_scn_ = 0; ddl_execution_id_ = -1; ddl_cluster_version_ = 0; + max_serialized_medium_scn_ = 0; } int ObMigrationTabletParam::assign(const ObMigrationTabletParam ¶m) diff --git a/src/storage/tablet/ob_tablet_meta.h b/src/storage/tablet/ob_tablet_meta.h index 64b2e1e6f..9930b113d 100644 --- a/src/storage/tablet/ob_tablet_meta.h +++ b/src/storage/tablet/ob_tablet_meta.h @@ -263,9 +263,9 @@ public: int64_t ddl_snapshot_version_; // max_sync_version may less than storage_schema.schema_version_ when major update schema int64_t max_sync_storage_schema_version_; - int64_t max_serialized_medium_scn_; int64_t ddl_execution_id_; int64_t ddl_cluster_version_; + int64_t max_serialized_medium_scn_; }; } // namespace storage