fix datum rowkey deep_copy bug

This commit is contained in:
yangqise7en
2023-07-25 03:42:43 +00:00
committed by ob-robot
parent c47fbf7825
commit f92e509c74
5 changed files with 29 additions and 18 deletions

View File

@ -483,9 +483,9 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
medium_info.data_version_ = compat_version; medium_info.data_version_ = compat_version;
int64_t schema_version = 0; int64_t schema_version = 0;
if (medium_info.data_version_ < DATA_VERSION_4_2_0_0) { if (medium_info.data_version_ < DATA_VERSION_4_2_0_0) {
medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION; medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION;
} else { } else {
medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION_V2; medium_info.medium_compat_version_ = ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2;
} }
if (OB_FAIL(choose_medium_scn[is_major](ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) { if (OB_FAIL(choose_medium_scn[is_major](ls_, *tablet, merge_reason, allocator_, medium_info, result, schema_version))) {
@ -888,7 +888,7 @@ int ObMediumCompactionScheduleFunc::get_table_schema_to_merge(
// for old version medium info, need generate old version schema // for old version medium info, need generate old version schema
if (FAILEDx(medium_info.storage_schema_.init( if (FAILEDx(medium_info.storage_schema_.init(
allocator, *table_schema, tablet.get_tablet_meta().compat_mode_, false/*skip_column_info*/, allocator, *table_schema, tablet.get_tablet_meta().compat_mode_, false/*skip_column_info*/,
ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION_V2 == medium_info.medium_compat_version_ ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2 == medium_info.medium_compat_version_
? ObStorageSchema::STORAGE_SCHEMA_VERSION_V2 ? ObStorageSchema::STORAGE_SCHEMA_VERSION_V2
: ObStorageSchema::STORAGE_SCHEMA_VERSION))) { : ObStorageSchema::STORAGE_SCHEMA_VERSION))) {
LOG_WARN("failed to init storage schema", K(ret), K(schema_version)); LOG_WARN("failed to init storage schema", K(ret), K(schema_version));

View File

@ -271,6 +271,9 @@ int ObParallelMergeInfo::init(
ret = deep_copy_list(allocator, other.parallel_store_rowkey_list_, parallel_store_rowkey_list_); ret = deep_copy_list(allocator, other.parallel_store_rowkey_list_, parallel_store_rowkey_list_);
} else if (PARALLEL_INFO_VERSION_V1 == compat_) { } else if (PARALLEL_INFO_VERSION_V1 == compat_) {
ret = deep_copy_list(allocator, other.parallel_datum_rowkey_list_, parallel_datum_rowkey_list_); ret = deep_copy_list(allocator, other.parallel_datum_rowkey_list_, parallel_datum_rowkey_list_);
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid compat version", KR(ret), K_(compat));
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
destroy(); destroy();
@ -290,12 +293,20 @@ int ObParallelMergeInfo::deep_copy_datum_rowkey(
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid idx", KR(ret), K(idx), K_(list_size)); LOG_WARN("invalid idx", KR(ret), K(idx), K_(list_size));
} else if (PARALLEL_INFO_VERSION_V0 == compat_) { } else if (PARALLEL_INFO_VERSION_V0 == compat_) {
if (OB_FAIL(rowkey.from_rowkey(parallel_store_rowkey_list_[idx].get_rowkey()/*src*/, input_allocator))) { ObDatumRowkeyHelper rowkey_helper;
STORAGE_LOG(WARN, "failed to deep copy end key", K(ret), K(idx), K(parallel_store_rowkey_list_[idx])); ObDatumRowkey tmp_datum_rowkey;
if (OB_FAIL(rowkey_helper.convert_datum_rowkey(parallel_store_rowkey_list_[idx].get_rowkey()/*src*/, tmp_datum_rowkey/*dst*/))) {
STORAGE_LOG(WARN, "failed to convert to datum rowkey", K(ret), K(idx), K(parallel_store_rowkey_list_[idx]));
} else if (OB_FAIL(tmp_datum_rowkey.deep_copy(rowkey/*dst*/, input_allocator))) {
STORAGE_LOG(WARN, "failed to deep copy datum rowkey", KR(ret), K(tmp_datum_rowkey));
} }
} else if (PARALLEL_INFO_VERSION_V1 == compat_ } else if (PARALLEL_INFO_VERSION_V1 == compat_) {
&& OB_FAIL(parallel_datum_rowkey_list_[idx].deep_copy(rowkey/*dst*/, input_allocator))) { if (OB_FAIL(parallel_datum_rowkey_list_[idx].deep_copy(rowkey/*dst*/, input_allocator))) {
STORAGE_LOG(WARN, "failed to deep copy end key", K(ret), K(idx), K(parallel_datum_rowkey_list_[idx])); STORAGE_LOG(WARN, "failed to deep copy end key", K(ret), K(idx), K(parallel_datum_rowkey_list_[idx]));
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid compat version", KR(ret), K_(compat));
} }
return ret; return ret;
} }
@ -348,7 +359,7 @@ const char *ObMediumCompactionInfo::get_compaction_type_str(enum ObCompactionTyp
} }
ObMediumCompactionInfo::ObMediumCompactionInfo() ObMediumCompactionInfo::ObMediumCompactionInfo()
: medium_compat_version_(MEIDUM_COMPAT_VERSION_V2), : medium_compat_version_(MEDIUM_COMPAT_VERSION_V2),
compaction_type_(COMPACTION_TYPE_MAX), compaction_type_(COMPACTION_TYPE_MAX),
contain_parallel_range_(false), contain_parallel_range_(false),
medium_merge_reason_(ObAdaptiveMergePolicy::NONE), medium_merge_reason_(ObAdaptiveMergePolicy::NONE),
@ -405,8 +416,8 @@ bool ObMediumCompactionInfo::is_valid() const
&& data_version_ > 0 && data_version_ > 0
&& storage_schema_.is_valid() && storage_schema_.is_valid()
&& parallel_merge_info_.is_valid() && parallel_merge_info_.is_valid()
&& (MEIDUM_COMPAT_VERSION == medium_compat_version_ && (MEDIUM_COMPAT_VERSION == medium_compat_version_
|| (MEIDUM_COMPAT_VERSION_V2 == medium_compat_version_ && last_medium_snapshot_ != 0)); || (MEDIUM_COMPAT_VERSION_V2 == medium_compat_version_ && last_medium_snapshot_ != 0));
} }
void ObMediumCompactionInfo::reset() void ObMediumCompactionInfo::reset()
@ -464,7 +475,7 @@ int ObMediumCompactionInfo::serialize(char *buf, const int64_t buf_len, int64_t
OB_UNIS_ENCODE, OB_UNIS_ENCODE,
parallel_merge_info_); parallel_merge_info_);
} }
if (OB_SUCC(ret) && MEIDUM_COMPAT_VERSION_V2 == medium_compat_version_) { if (OB_SUCC(ret) && MEDIUM_COMPAT_VERSION_V2 == medium_compat_version_) {
LST_DO_CODE( LST_DO_CODE(
OB_UNIS_ENCODE, OB_UNIS_ENCODE,
last_medium_snapshot_); last_medium_snapshot_);
@ -501,7 +512,7 @@ int ObMediumCompactionInfo::deserialize(
clear_parallel_range(); clear_parallel_range();
LOG_DEBUG("ObMediumCompactionInfo::deserialize", K(ret), K(buf), K(data_len), K(pos)); LOG_DEBUG("ObMediumCompactionInfo::deserialize", K(ret), K(buf), K(data_len), K(pos));
} }
if (OB_SUCC(ret) && MEIDUM_COMPAT_VERSION_V2 == medium_compat_version_) { if (OB_SUCC(ret) && MEDIUM_COMPAT_VERSION_V2 == medium_compat_version_) {
LST_DO_CODE( LST_DO_CODE(
OB_UNIS_DECODE, OB_UNIS_DECODE,
last_medium_snapshot_); last_medium_snapshot_);
@ -523,7 +534,7 @@ int64_t ObMediumCompactionInfo::get_serialize_size() const
if (contain_parallel_range_) { if (contain_parallel_range_) {
LST_DO_CODE(OB_UNIS_ADD_LEN, parallel_merge_info_); LST_DO_CODE(OB_UNIS_ADD_LEN, parallel_merge_info_);
} }
if (MEIDUM_COMPAT_VERSION_V2 == medium_compat_version_) { if (MEDIUM_COMPAT_VERSION_V2 == medium_compat_version_) {
LST_DO_CODE( LST_DO_CODE(
OB_UNIS_ADD_LEN, OB_UNIS_ADD_LEN,
last_medium_snapshot_); last_medium_snapshot_);

View File

@ -209,8 +209,8 @@ public:
void gene_info(char* buf, const int64_t buf_len, int64_t &pos) const; void gene_info(char* buf, const int64_t buf_len, int64_t &pos) const;
int64_t to_string(char* buf, const int64_t buf_len) const; int64_t to_string(char* buf, const int64_t buf_len) const;
public: public:
static const int64_t MEIDUM_COMPAT_VERSION = 1; static const int64_t MEDIUM_COMPAT_VERSION = 1;
static const int64_t MEIDUM_COMPAT_VERSION_V2 = 2; static const int64_t MEDIUM_COMPAT_VERSION_V2 = 2;
private: private:
static const int32_t SCS_ONE_BIT = 1; static const int32_t SCS_ONE_BIT = 1;

View File

@ -679,7 +679,7 @@ int ObMediumCompactionInfoList::check_medium_info_and_last_major(
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (nullptr != last_major_sstable if (nullptr != last_major_sstable
&& ObMediumCompactionInfo::MEIDUM_COMPAT_VERSION_V2 == medium_info.medium_compat_version_ && ObMediumCompactionInfo::MEDIUM_COMPAT_VERSION_V2 == medium_info.medium_compat_version_
&& medium_info.medium_snapshot_ > last_major_sstable->get_snapshot_version()) { && medium_info.medium_snapshot_ > last_major_sstable->get_snapshot_version()) {
if (medium_info.from_cur_cluster()) { // same cluster_id & same tenant_id if (medium_info.from_cur_cluster()) { // same cluster_id & same tenant_id
if (OB_UNLIKELY(medium_info.last_medium_snapshot_ != last_major_sstable->get_snapshot_version())) { if (OB_UNLIKELY(medium_info.last_medium_snapshot_ != last_major_sstable->get_snapshot_version())) {

View File

@ -160,7 +160,7 @@ int ObParallelMergeCtx::init(const compaction::ObMediumCompactionInfo &medium_in
concurrent_cnt_ = paral_info.get_size() + 1; concurrent_cnt_ = paral_info.get_size() + 1;
parallel_type_ = PARALLEL_MAJOR; parallel_type_ = PARALLEL_MAJOR;
is_inited_ = true; is_inited_ = true;
STORAGE_LOG(INFO, "success to init parallel merge ctx", KPC(this)); STORAGE_LOG(INFO, "success to init parallel merge ctx from medium_info", K(ret), KPC(this), K(paral_info));
} }
} }
return ret; return ret;