adjust tx ids field
This commit is contained in:
@ -382,7 +382,7 @@ ObSSTableMeta::ObSSTableMeta()
|
|||||||
cg_sstables_(),
|
cg_sstables_(),
|
||||||
column_checksums_(nullptr),
|
column_checksums_(nullptr),
|
||||||
column_checksum_count_(0),
|
column_checksum_count_(0),
|
||||||
tx_ids_(),
|
tx_ctx_(),
|
||||||
is_inited_(false)
|
is_inited_(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -415,7 +415,7 @@ void ObSSTableMeta::reset()
|
|||||||
basic_meta_.reset();
|
basic_meta_.reset();
|
||||||
column_checksums_ = nullptr;
|
column_checksums_ = nullptr;
|
||||||
column_checksum_count_ = 0;
|
column_checksum_count_ = 0;
|
||||||
tx_ids_.reset();
|
tx_ctx_.reset();
|
||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -548,7 +548,7 @@ int ObSSTableMeta::init(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret) && transaction::ObTransID(param.uncommitted_tx_id_).is_valid()) {
|
if (OB_SUCC(ret) && transaction::ObTransID(param.uncommitted_tx_id_).is_valid()) {
|
||||||
if (OB_FAIL(tx_ids_.push_back(param.uncommitted_tx_id_))) {
|
if (OB_FAIL(tx_ctx_.tx_descs_.push_back({param.uncommitted_tx_id_, 0}))) {
|
||||||
LOG_WARN("failed to alloc memory for tx_ids_", K(ret), K(param));
|
LOG_WARN("failed to alloc memory for tx_ids_", K(ret), K(param));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -617,8 +617,8 @@ int ObSSTableMeta::serialize_(char *buf, const int64_t buf_len, int64_t &pos) co
|
|||||||
LOG_WARN("fail to serialize macro info", K(ret), K(buf_len), K(pos), K(macro_info_));
|
LOG_WARN("fail to serialize macro info", K(ret), K(buf_len), K(pos), K(macro_info_));
|
||||||
} else if (OB_FAIL(cg_sstables_.serialize(buf, buf_len, pos))) {
|
} else if (OB_FAIL(cg_sstables_.serialize(buf, buf_len, pos))) {
|
||||||
LOG_WARN("fail to serialize cg sstables", K(ret), K(buf_len), K(pos), K(cg_sstables_));
|
LOG_WARN("fail to serialize cg sstables", K(ret), K(buf_len), K(pos), K(cg_sstables_));
|
||||||
} else if (OB_FAIL(tx_ids_.serialize(buf, buf_len, pos))) {
|
} else if (OB_FAIL(tx_ctx_.serialize(buf, buf_len, pos))) {
|
||||||
LOG_WARN("fail to serialize tx ids", K(ret), K(buf_len), K(pos), K(tx_ids_));
|
LOG_WARN("fail to serialize tx ids", K(ret), K(buf_len), K(pos), K(tx_ctx_));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -694,7 +694,7 @@ int ObSSTableMeta::deserialize_(
|
|||||||
LOG_WARN("fail to deserialize macro info", K(ret), K(data_len), K(pos), K(des_meta));
|
LOG_WARN("fail to deserialize macro info", K(ret), K(data_len), K(pos), K(des_meta));
|
||||||
} else if (pos < data_len && OB_FAIL(cg_sstables_.deserialize(allocator, buf, data_len, pos))) {
|
} else if (pos < data_len && OB_FAIL(cg_sstables_.deserialize(allocator, buf, data_len, pos))) {
|
||||||
LOG_WARN("fail to deserialize cg sstables", K(ret), K(data_len), K(pos));
|
LOG_WARN("fail to deserialize cg sstables", K(ret), K(data_len), K(pos));
|
||||||
} else if (pos < data_len && OB_FAIL(tx_ids_.deserialize(buf, data_len, pos))) {
|
} else if (pos < data_len && OB_FAIL(tx_ctx_.deserialize(buf, data_len, pos))) {
|
||||||
LOG_WARN("fail to deserialize tx ids", K(ret), K(data_len), K(pos));
|
LOG_WARN("fail to deserialize tx ids", K(ret), K(data_len), K(pos));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -720,7 +720,7 @@ int64_t ObSSTableMeta::get_serialize_size_() const
|
|||||||
len += data_root_info_.get_serialize_size();
|
len += data_root_info_.get_serialize_size();
|
||||||
len += macro_info_.get_serialize_size();
|
len += macro_info_.get_serialize_size();
|
||||||
len += cg_sstables_.get_serialize_size();
|
len += cg_sstables_.get_serialize_size();
|
||||||
len += tx_ids_.get_serialize_size();
|
len += tx_ctx_.get_serialize_size();
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -759,8 +759,8 @@ int ObSSTableMeta::deep_copy(
|
|||||||
LOG_WARN("fail to deep copy macro info", K(ret), KP(buf), K(buf_len), K(pos), K(macro_info_));
|
LOG_WARN("fail to deep copy macro info", K(ret), KP(buf), K(buf_len), K(pos), K(macro_info_));
|
||||||
} else if (OB_FAIL(cg_sstables_.deep_copy(buf, buf_len, pos, dest->cg_sstables_))) {
|
} else if (OB_FAIL(cg_sstables_.deep_copy(buf, buf_len, pos, dest->cg_sstables_))) {
|
||||||
LOG_WARN("fail to deep copy cg sstables", K(ret), KP(buf), K(buf_len), K(pos), K(cg_sstables_));
|
LOG_WARN("fail to deep copy cg sstables", K(ret), KP(buf), K(buf_len), K(pos), K(cg_sstables_));
|
||||||
} else if (OB_FAIL(dest->tx_ids_.assign(tx_ids_))) {
|
} else if (OB_FAIL(dest->tx_ctx_.assign(tx_ctx_))) {
|
||||||
LOG_WARN("fail to deep copy cg sstables", K(ret), K(tx_ids_));
|
LOG_WARN("fail to deep copy cg sstables", K(ret), K(tx_ctx_));
|
||||||
} else {
|
} else {
|
||||||
dest->is_inited_ = is_inited_;
|
dest->is_inited_ = is_inited_;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -29,6 +29,82 @@ struct ObTabletCreateSSTableParam;
|
|||||||
}
|
}
|
||||||
namespace blocksstable
|
namespace blocksstable
|
||||||
{
|
{
|
||||||
|
struct ObTxContext
|
||||||
|
{
|
||||||
|
struct ObTxDesc{
|
||||||
|
int64_t tx_id_;
|
||||||
|
int64_t row_count_;
|
||||||
|
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_FAIL(serialization::encode_i64(buf, buf_len,pos, tx_id_))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
|
||||||
|
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, row_count_))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int deserialize(const char *buf, const int64_t buf_len, int64_t &pos)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_FAIL(serialization::decode_i64(buf, buf_len, pos, &tx_id_))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
|
||||||
|
} else if (OB_FAIL(serialization::decode_i64(buf, buf_len, pos, &row_count_))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t get_serialize_size() const {
|
||||||
|
return serialization::encoded_length_i64(tx_id_) + serialization::encoded_length_i64(row_count_);
|
||||||
|
}
|
||||||
|
TO_STRING_KV(K(tx_id_), K(row_count_));
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const_cast<ObTxContext *>(this)->len_ = get_serialize_size();
|
||||||
|
if (OB_FAIL(serialization::encode_i32(buf, buf_len,pos, len_))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
|
||||||
|
} else if (OB_FAIL(tx_descs_.serialize(buf, buf_len, pos))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
int64_t get_serialize_size() const {
|
||||||
|
return serialization::encoded_length_i32(len_) + tx_descs_.get_serialize_size();
|
||||||
|
}
|
||||||
|
int deserialize(const char *buf, const int64_t buf_len, int64_t &pos)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const int64_t tmp_pos = pos;
|
||||||
|
if (OB_FAIL(serialization::decode_i32(buf, buf_len, pos, &len_))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
|
||||||
|
} else if (OB_FAIL(tx_descs_.deserialize(buf, buf_len, pos))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to encode length", K(ret), K(buf_len), K(pos));
|
||||||
|
} else if (OB_UNLIKELY(pos - tmp_pos != len_)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
STORAGE_LOG(WARN, "unexpected len_", K(ret), K(len_), K(tmp_pos), K(pos));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
int assign(const ObTxContext &tx_ctx) {
|
||||||
|
len_ = tx_ctx.len_;
|
||||||
|
return tx_descs_.assign(tx_ctx.tx_descs_);
|
||||||
|
}
|
||||||
|
void reset() {
|
||||||
|
len_ = 0;
|
||||||
|
tx_descs_.reset();
|
||||||
|
}
|
||||||
|
static const int64_t MAX_TX_IDS_COUNT = 16;
|
||||||
|
int32_t len_;
|
||||||
|
ObSEArray<ObTxDesc, MAX_TX_IDS_COUNT> tx_descs_;
|
||||||
|
TO_STRING_KV(K(tx_descs_));
|
||||||
|
};
|
||||||
|
|
||||||
//For compatibility, the variables in this struct MUST NOT be deleted or moved.
|
//For compatibility, the variables in this struct MUST NOT be deleted or moved.
|
||||||
//You should ONLY add variables at the end.
|
//You should ONLY add variables at the end.
|
||||||
//Note that if you use complex structure as variables, the complex structure should also keep compatibility.
|
//Note that if you use complex structure as variables, the complex structure should also keep compatibility.
|
||||||
@ -154,8 +230,8 @@ public:
|
|||||||
OB_INLINE const ObSSTableBasicMeta &get_basic_meta() const { return basic_meta_; }
|
OB_INLINE const ObSSTableBasicMeta &get_basic_meta() const { return basic_meta_; }
|
||||||
OB_INLINE int64_t get_col_checksum_cnt() const { return column_checksum_count_; }
|
OB_INLINE int64_t get_col_checksum_cnt() const { return column_checksum_count_; }
|
||||||
OB_INLINE int64_t *get_col_checksum() const { return column_checksums_; }
|
OB_INLINE int64_t *get_col_checksum() const { return column_checksums_; }
|
||||||
OB_INLINE int64_t get_tx_id_count() const { return tx_ids_.count(); }
|
OB_INLINE int64_t get_tx_id_count() const { return tx_ctx_.tx_descs_.count(); }
|
||||||
OB_INLINE int64_t get_tx_ids(int64_t idx) const { return tx_ids_.at(idx); }
|
OB_INLINE int64_t get_tx_ids(int64_t idx) const { return tx_ctx_.tx_descs_.at(idx).tx_id_; }
|
||||||
OB_INLINE int64_t get_data_checksum() const { return basic_meta_.data_checksum_; }
|
OB_INLINE int64_t get_data_checksum() const { return basic_meta_.data_checksum_; }
|
||||||
OB_INLINE int64_t get_rowkey_column_count() const { return basic_meta_.rowkey_column_count_; }
|
OB_INLINE int64_t get_rowkey_column_count() const { return basic_meta_.rowkey_column_count_; }
|
||||||
OB_INLINE int64_t get_column_count() const { return basic_meta_.column_cnt_; }
|
OB_INLINE int64_t get_column_count() const { return basic_meta_.column_cnt_; }
|
||||||
@ -245,7 +321,7 @@ public:
|
|||||||
const int64_t buf_len,
|
const int64_t buf_len,
|
||||||
int64_t &pos,
|
int64_t &pos,
|
||||||
ObSSTableMeta *&dest) const;
|
ObSSTableMeta *&dest) const;
|
||||||
TO_STRING_KV(K_(basic_meta), KP_(column_checksums), K_(column_checksum_count), K_(data_root_info), K_(macro_info), K_(cg_sstables), K_(tx_ids), K_(is_inited));
|
TO_STRING_KV(K_(basic_meta), KP_(column_checksums), K_(column_checksum_count), K_(data_root_info), K_(macro_info), K_(cg_sstables), K_(tx_ctx), K_(is_inited));
|
||||||
private:
|
private:
|
||||||
bool check_meta() const;
|
bool check_meta() const;
|
||||||
int init_base_meta(const ObTabletCreateSSTableParam ¶m, common::ObArenaAllocator &allocator);
|
int init_base_meta(const ObTabletCreateSSTableParam ¶m, common::ObArenaAllocator &allocator);
|
||||||
@ -265,7 +341,6 @@ private:
|
|||||||
private:
|
private:
|
||||||
friend class ObSSTable;
|
friend class ObSSTable;
|
||||||
static const int64_t SSTABLE_META_VERSION = 1;
|
static const int64_t SSTABLE_META_VERSION = 1;
|
||||||
static const int64_t MAX_TX_IDS_COUNT = 16;
|
|
||||||
private:
|
private:
|
||||||
ObSSTableBasicMeta basic_meta_;
|
ObSSTableBasicMeta basic_meta_;
|
||||||
ObRootBlockInfo data_root_info_;
|
ObRootBlockInfo data_root_info_;
|
||||||
@ -273,7 +348,7 @@ private:
|
|||||||
ObSSTableArray cg_sstables_;
|
ObSSTableArray cg_sstables_;
|
||||||
int64_t *column_checksums_;
|
int64_t *column_checksums_;
|
||||||
int64_t column_checksum_count_;
|
int64_t column_checksum_count_;
|
||||||
ObSEArray<int64_t, MAX_TX_IDS_COUNT> tx_ids_;
|
ObTxContext tx_ctx_;
|
||||||
// The following fields don't to persist
|
// The following fields don't to persist
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObSSTableMeta);
|
DISALLOW_COPY_AND_ASSIGN(ObSSTableMeta);
|
||||||
|
|||||||
@ -686,6 +686,7 @@ int ObPartitionMajorMerger::merge_partition(
|
|||||||
} else if (0 == minimum_iters_.count()) {
|
} else if (0 == minimum_iters_.count()) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
STORAGE_LOG(WARN, "unexpected minimum_iters_ is null", K(ret));
|
STORAGE_LOG(WARN, "unexpected minimum_iters_ is null", K(ret));
|
||||||
|
} else if (FALSE_IT(set_base_iter(minimum_iters_))) {
|
||||||
} else if (merge_helper_->is_need_skip()) {
|
} else if (merge_helper_->is_need_skip()) {
|
||||||
//move purge iters
|
//move purge iters
|
||||||
if (OB_FAIL(merge_helper_->move_iters_next(minimum_iters_))) {
|
if (OB_FAIL(merge_helper_->move_iters_next(minimum_iters_))) {
|
||||||
@ -1036,6 +1037,7 @@ int ObPartitionMinorMerger::merge_partition(
|
|||||||
} else if (rowkey_minimum_iters.empty()) {
|
} else if (rowkey_minimum_iters.empty()) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
STORAGE_LOG(WARN, "unexpected rowkey_minimum_iters is null", K(ret));
|
STORAGE_LOG(WARN, "unexpected rowkey_minimum_iters is null", K(ret));
|
||||||
|
} else if (FALSE_IT(set_base_iter(rowkey_minimum_iters))) {
|
||||||
} else if (1 == rowkey_minimum_iters.count()
|
} else if (1 == rowkey_minimum_iters.count()
|
||||||
&& nullptr == rowkey_minimum_iters.at(0)->get_curr_row()) {
|
&& nullptr == rowkey_minimum_iters.at(0)->get_curr_row()) {
|
||||||
// only one iter, output its' macro block
|
// only one iter, output its' macro block
|
||||||
|
|||||||
@ -105,6 +105,15 @@ public:
|
|||||||
protected:
|
protected:
|
||||||
int prepare_merge(ObBasicTabletMergeCtx &ctx, const int64_t idx);
|
int prepare_merge(ObBasicTabletMergeCtx &ctx, const int64_t idx);
|
||||||
int get_base_iter_curr_macro_block(const blocksstable::ObMacroBlockDesc *¯o_desc);
|
int get_base_iter_curr_macro_block(const blocksstable::ObMacroBlockDesc *¯o_desc);
|
||||||
|
void set_base_iter(const MERGE_ITER_ARRAY &minimum_iters) {
|
||||||
|
const int64_t count = minimum_iters.count();
|
||||||
|
if (!minimum_iters.empty() && minimum_iters[count - 1]->is_base_sstable_iter()
|
||||||
|
&& minimum_iters[count - 1]->is_macro_merge_iter()) {
|
||||||
|
base_iter_ = minimum_iters[count - 1];
|
||||||
|
} else {
|
||||||
|
base_iter_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
static const int64_t CACHED_TRANS_STATE_MAX_CNT = 10 * 1024l;
|
static const int64_t CACHED_TRANS_STATE_MAX_CNT = 10 * 1024l;
|
||||||
private:
|
private:
|
||||||
virtual int inner_prepare_merge(ObBasicTabletMergeCtx &ctx, const int64_t idx) = 0;
|
virtual int inner_prepare_merge(ObBasicTabletMergeCtx &ctx, const int64_t idx) = 0;
|
||||||
|
|||||||
Reference in New Issue
Block a user