diff --git a/src/storage/blocksstable/ob_sstable_meta.cpp b/src/storage/blocksstable/ob_sstable_meta.cpp index 8a5b9da0b4..9582273c40 100644 --- a/src/storage/blocksstable/ob_sstable_meta.cpp +++ b/src/storage/blocksstable/ob_sstable_meta.cpp @@ -382,7 +382,7 @@ ObSSTableMeta::ObSSTableMeta() cg_sstables_(), column_checksums_(nullptr), column_checksum_count_(0), - tx_ids_(), + tx_ctx_(), is_inited_(false) { } @@ -415,7 +415,7 @@ void ObSSTableMeta::reset() basic_meta_.reset(); column_checksums_ = nullptr; column_checksum_count_ = 0; - tx_ids_.reset(); + tx_ctx_.reset(); is_inited_ = false; } @@ -548,7 +548,7 @@ int ObSSTableMeta::init( } if (OB_SUCC(ret) && transaction::ObTransID(param.uncommitted_tx_id_).is_valid()) { - if (OB_FAIL(tx_ids_.push_back(param.uncommitted_tx_id_))) { + if (OB_FAIL(tx_ctx_.tx_descs_.push_back({param.uncommitted_tx_id_, 0}))) { LOG_WARN("failed to alloc memory for tx_ids_", K(ret), K(param)); } } @@ -617,8 +617,8 @@ int ObSSTableMeta::serialize_(char *buf, const int64_t buf_len, int64_t &pos) co LOG_WARN("fail to serialize macro info", K(ret), K(buf_len), K(pos), K(macro_info_)); } else if (OB_FAIL(cg_sstables_.serialize(buf, buf_len, pos))) { LOG_WARN("fail to serialize cg sstables", K(ret), K(buf_len), K(pos), K(cg_sstables_)); - } else if (OB_FAIL(tx_ids_.serialize(buf, buf_len, pos))) { - LOG_WARN("fail to serialize tx ids", K(ret), K(buf_len), K(pos), K(tx_ids_)); + } else if (OB_FAIL(tx_ctx_.serialize(buf, buf_len, pos))) { + LOG_WARN("fail to serialize tx ids", K(ret), K(buf_len), K(pos), K(tx_ctx_)); } } return ret; @@ -694,7 +694,7 @@ int ObSSTableMeta::deserialize_( LOG_WARN("fail to deserialize macro info", K(ret), K(data_len), K(pos), K(des_meta)); } else if (pos < data_len && OB_FAIL(cg_sstables_.deserialize(allocator, buf, data_len, pos))) { LOG_WARN("fail to deserialize cg sstables", K(ret), K(data_len), K(pos)); - } else if (pos < data_len && OB_FAIL(tx_ids_.deserialize(buf, data_len, pos))) { + } else if (pos < data_len && OB_FAIL(tx_ctx_.deserialize(buf, data_len, pos))) { LOG_WARN("fail to deserialize tx ids", K(ret), K(data_len), K(pos)); } } @@ -720,7 +720,7 @@ int64_t ObSSTableMeta::get_serialize_size_() const len += data_root_info_.get_serialize_size(); len += macro_info_.get_serialize_size(); len += cg_sstables_.get_serialize_size(); - len += tx_ids_.get_serialize_size(); + len += tx_ctx_.get_serialize_size(); return len; } @@ -759,8 +759,8 @@ int ObSSTableMeta::deep_copy( LOG_WARN("fail to deep copy macro info", K(ret), KP(buf), K(buf_len), K(pos), K(macro_info_)); } else if (OB_FAIL(cg_sstables_.deep_copy(buf, buf_len, pos, dest->cg_sstables_))) { LOG_WARN("fail to deep copy cg sstables", K(ret), KP(buf), K(buf_len), K(pos), K(cg_sstables_)); - } else if (OB_FAIL(dest->tx_ids_.assign(tx_ids_))) { - LOG_WARN("fail to deep copy cg sstables", K(ret), K(tx_ids_)); + } else if (OB_FAIL(dest->tx_ctx_.assign(tx_ctx_))) { + LOG_WARN("fail to deep copy cg sstables", K(ret), K(tx_ctx_)); } else { dest->is_inited_ = is_inited_; } diff --git a/src/storage/blocksstable/ob_sstable_meta.h b/src/storage/blocksstable/ob_sstable_meta.h index 43cfe3ef10..ac833c8539 100644 --- a/src/storage/blocksstable/ob_sstable_meta.h +++ b/src/storage/blocksstable/ob_sstable_meta.h @@ -29,6 +29,82 @@ struct ObTabletCreateSSTableParam; } namespace blocksstable { +struct ObTxContext +{ + struct ObTxDesc{ + int64_t tx_id_; + int64_t row_count_; + int serialize(char *buf, const int64_t buf_len, int64_t &pos) const + { + int ret = OB_SUCCESS; + if (OB_FAIL(serialization::encode_i64(buf, buf_len,pos, tx_id_))) { + STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos)); + } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, row_count_))) { + STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos)); + } + return ret; + } + + int deserialize(const char *buf, const int64_t buf_len, int64_t &pos) + { + int ret = OB_SUCCESS; + if (OB_FAIL(serialization::decode_i64(buf, buf_len, pos, &tx_id_))) { + STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos)); + } else if (OB_FAIL(serialization::decode_i64(buf, buf_len, pos, &row_count_))) { + STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos)); + } + return ret; + } + + int64_t get_serialize_size() const { + return serialization::encoded_length_i64(tx_id_) + serialization::encoded_length_i64(row_count_); + } + TO_STRING_KV(K(tx_id_), K(row_count_)); + }; + + + int serialize(char *buf, const int64_t buf_len, int64_t &pos) const + { + int ret = OB_SUCCESS; + const_cast(this)->len_ = get_serialize_size(); + if (OB_FAIL(serialization::encode_i32(buf, buf_len,pos, len_))) { + STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos)); + } else if (OB_FAIL(tx_descs_.serialize(buf, buf_len, pos))) { + STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos)); + } + return ret; + } + int64_t get_serialize_size() const { + return serialization::encoded_length_i32(len_) + tx_descs_.get_serialize_size(); + } + int deserialize(const char *buf, const int64_t buf_len, int64_t &pos) + { + int ret = OB_SUCCESS; + const int64_t tmp_pos = pos; + if (OB_FAIL(serialization::decode_i32(buf, buf_len, pos, &len_))) { + STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos)); + } else if (OB_FAIL(tx_descs_.deserialize(buf, buf_len, pos))) { + STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos)); + } else if (OB_UNLIKELY(pos - tmp_pos != len_)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "unexpected len_", K(ret), K(len_), K(tmp_pos), K(pos)); + } + return ret; + } + int assign(const ObTxContext &tx_ctx) { + len_ = tx_ctx.len_; + return tx_descs_.assign(tx_ctx.tx_descs_); + } + void reset() { + len_ = 0; + tx_descs_.reset(); + } + static const int64_t MAX_TX_IDS_COUNT = 16; + int32_t len_; + ObSEArray tx_descs_; + TO_STRING_KV(K(tx_descs_)); +}; + //For compatibility, the variables in this struct MUST NOT be deleted or moved. //You should ONLY add variables at the end. //Note that if you use complex structure as variables, the complex structure should also keep compatibility. @@ -154,8 +230,8 @@ public: OB_INLINE const ObSSTableBasicMeta &get_basic_meta() const { return basic_meta_; } OB_INLINE int64_t get_col_checksum_cnt() const { return column_checksum_count_; } OB_INLINE int64_t *get_col_checksum() const { return column_checksums_; } - OB_INLINE int64_t get_tx_id_count() const { return tx_ids_.count(); } - OB_INLINE int64_t get_tx_ids(int64_t idx) const { return tx_ids_.at(idx); } + OB_INLINE int64_t get_tx_id_count() const { return tx_ctx_.tx_descs_.count(); } + OB_INLINE int64_t get_tx_ids(int64_t idx) const { return tx_ctx_.tx_descs_.at(idx).tx_id_; } OB_INLINE int64_t get_data_checksum() const { return basic_meta_.data_checksum_; } OB_INLINE int64_t get_rowkey_column_count() const { return basic_meta_.rowkey_column_count_; } OB_INLINE int64_t get_column_count() const { return basic_meta_.column_cnt_; } @@ -245,7 +321,7 @@ public: const int64_t buf_len, int64_t &pos, ObSSTableMeta *&dest) const; - TO_STRING_KV(K_(basic_meta), KP_(column_checksums), K_(column_checksum_count), K_(data_root_info), K_(macro_info), K_(cg_sstables), K_(tx_ids), K_(is_inited)); + TO_STRING_KV(K_(basic_meta), KP_(column_checksums), K_(column_checksum_count), K_(data_root_info), K_(macro_info), K_(cg_sstables), K_(tx_ctx), K_(is_inited)); private: bool check_meta() const; int init_base_meta(const ObTabletCreateSSTableParam ¶m, common::ObArenaAllocator &allocator); @@ -265,7 +341,6 @@ private: private: friend class ObSSTable; static const int64_t SSTABLE_META_VERSION = 1; - static const int64_t MAX_TX_IDS_COUNT = 16; private: ObSSTableBasicMeta basic_meta_; ObRootBlockInfo data_root_info_; @@ -273,7 +348,7 @@ private: ObSSTableArray cg_sstables_; int64_t *column_checksums_; int64_t column_checksum_count_; - ObSEArray tx_ids_; + ObTxContext tx_ctx_; // The following fields don't to persist bool is_inited_; DISALLOW_COPY_AND_ASSIGN(ObSSTableMeta); diff --git a/src/storage/compaction/ob_partition_merger.cpp b/src/storage/compaction/ob_partition_merger.cpp index 2ba8e0bee1..c63610e809 100644 --- a/src/storage/compaction/ob_partition_merger.cpp +++ b/src/storage/compaction/ob_partition_merger.cpp @@ -686,6 +686,7 @@ int ObPartitionMajorMerger::merge_partition( } else if (0 == minimum_iters_.count()) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "unexpected minimum_iters_ is null", K(ret)); + } else if (FALSE_IT(set_base_iter(minimum_iters_))) { } else if (merge_helper_->is_need_skip()) { //move purge iters if (OB_FAIL(merge_helper_->move_iters_next(minimum_iters_))) { @@ -1036,6 +1037,7 @@ int ObPartitionMinorMerger::merge_partition( } else if (rowkey_minimum_iters.empty()) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "unexpected rowkey_minimum_iters is null", K(ret)); + } else if (FALSE_IT(set_base_iter(rowkey_minimum_iters))) { } else if (1 == rowkey_minimum_iters.count() && nullptr == rowkey_minimum_iters.at(0)->get_curr_row()) { // only one iter, output its' macro block diff --git a/src/storage/compaction/ob_partition_merger.h b/src/storage/compaction/ob_partition_merger.h index 0185dfdee5..e766f6e6c8 100644 --- a/src/storage/compaction/ob_partition_merger.h +++ b/src/storage/compaction/ob_partition_merger.h @@ -105,6 +105,15 @@ public: protected: int prepare_merge(ObBasicTabletMergeCtx &ctx, const int64_t idx); int get_base_iter_curr_macro_block(const blocksstable::ObMacroBlockDesc *¯o_desc); + void set_base_iter(const MERGE_ITER_ARRAY &minimum_iters) { + const int64_t count = minimum_iters.count(); + if (!minimum_iters.empty() && minimum_iters[count - 1]->is_base_sstable_iter() + && minimum_iters[count - 1]->is_macro_merge_iter()) { + base_iter_ = minimum_iters[count - 1]; + } else { + base_iter_ = nullptr; + } + } static const int64_t CACHED_TRANS_STATE_MAX_CNT = 10 * 1024l; private: virtual int inner_prepare_merge(ObBasicTabletMergeCtx &ctx, const int64_t idx) = 0;