[FIX]fix trigger=0 in minor & error info when submit clog & add cluster-version in medium for compat

This commit is contained in:
obdev 2022-12-20 15:08:00 +00:00 committed by ob-robot
parent 0074651dee
commit 0517f169f0
16 changed files with 73 additions and 61 deletions

View File

@ -141,37 +141,6 @@ int ObDataStoreDesc::cal_row_store_type(const share::schema::ObMergeSchema &merg
return ret;
}
int ObDataStoreDesc::set_major_working_cluster_version()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY((!is_major_merge() && !is_meta_major_merge()) || snapshot_version_ <= 0)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected data store to get major working cluster version",
K(ret), K_(merge_type), K_(snapshot_version));
} else {
ObTenantFreezeInfoMgr::FreezeInfo freeze_info;
if (OB_SUCC(MTL_CALL_FREEZE_INFO_MGR(get_freeze_info_by_snapshot_version, snapshot_version_, freeze_info))) {
// succ to get freeze info
} else if (OB_ENTRY_NOT_EXIST != ret) {
STORAGE_LOG(WARN, "Failed to get freeze info", K(ret), K_(snapshot_version), "tenant_id", MTL_ID());
} else if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(get_latest_freeze_info, freeze_info))) {
STORAGE_LOG(WARN, "Failed to get latest freeze info", K(ret));
}
if (OB_SUCC(ret)) {
if (freeze_info.cluster_version < 0) {
STORAGE_LOG(ERROR, "Unexpected cluster version of freeze info", K(ret), K(freeze_info));
major_working_cluster_version_ = 0;
} else {
major_working_cluster_version_ = freeze_info.cluster_version;
}
ObTaskController::get().allow_next_syslog();
STORAGE_LOG(INFO, "Succ to get major working cluster version",
K_(major_working_cluster_version), K(freeze_info), K_(snapshot_version));
}
}
return ret;
}
int ObDataStoreDesc::init(
const ObMergeSchema &merge_schema,
const share::ObLSID &ls_id,
@ -250,14 +219,17 @@ int ObDataStoreDesc::init(
encoder_opt_.set_store_type(row_store_type_);
}
if (OB_SUCC(ret) && storage::is_major_merge(merge_type)) { // exactly MAJOR MERGE
if (OB_SUCC(ret) && is_major) {
uint64_t compat_version = 0;
int tmp_ret = OB_SUCCESS;
if (cluster_version > 0) {
major_working_cluster_version_ = cluster_version;
} else if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
STORAGE_LOG(WARN, "fail to get data version", K(tmp_ret));
} else {
if (OB_FAIL(set_major_working_cluster_version())) {
STORAGE_LOG(WARN, "Failed to set major working cluster version", K(ret), K(*this));
}
major_working_cluster_version_ = compat_version;
}
STORAGE_LOG(INFO, "success to set major working cluster version", K(tmp_ret), K(merge_type), K(cluster_version), K(major_working_cluster_version_));
}
if (OB_FAIL(ret)) {

View File

@ -129,7 +129,6 @@ private:
int cal_row_store_type(
const share::schema::ObMergeSchema &schema,
const storage::ObMergeType merge_type);
int set_major_working_cluster_version();
int get_emergency_row_store_type();
private:
DISALLOW_COPY_AND_ASSIGN(ObDataStoreDesc);

View File

@ -69,7 +69,6 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot(
medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
medium_info.medium_merge_reason_ = merge_reason;
medium_info.medium_snapshot_ = result.version_range_.snapshot_version_;
medium_info.medium_scn_ = result.scn_range_.end_scn_;
LOG_TRACE("choose_medium_snapshot", K(ret), "ls_id", ls.get_ls_id(),
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(result), K(medium_info));
}
@ -94,8 +93,6 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
} else {
ret = OB_NO_NEED_MERGE;
}
} else if (OB_FAIL(medium_info.medium_scn_.convert_for_tx(schedule_medium_snapshot))) {
LOG_WARN("failed to convert into scn", K(ret), K(schedule_medium_snapshot));
} else {
medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION;
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
@ -211,10 +208,17 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
int64_t multi_version_start = 0;
ObGetMergeTablesResult result;
ObMediumCompactionInfo medium_info;
uint64_t compat_version = 0;
if (OB_FAIL(choose_medium_scn[is_major](ls_, tablet_, schedule_medium_snapshot, merge_reason, medium_info, result))) {
if (OB_NO_NEED_MERGE != ret) {
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
}
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
LOG_WARN("fail to get data version", K(ret));
} else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_1_0_0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid data version to schedule medium compaction", K(ret), K(compat_version));
} else if (FALSE_IT(medium_info.data_version_ = compat_version)) {
} else if (is_major) {
// do nothing
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {

View File

@ -234,8 +234,8 @@ ObMediumCompactionInfo::ObMediumCompactionInfo()
medium_merge_reason_(ObAdaptiveMergePolicy::NONE),
reserved_(0),
cluster_id_(0),
data_version_(0),
medium_snapshot_(0),
medium_scn_(),
storage_schema_(),
parallel_merge_info_()
{
@ -263,7 +263,7 @@ int ObMediumCompactionInfo::init(
info_ = medium_info.info_;
cluster_id_ = medium_info.cluster_id_;
medium_snapshot_ = medium_info.medium_snapshot_;
medium_scn_ = medium_info.medium_scn_;
data_version_ = medium_info.data_version_;
}
return ret;
}
@ -272,7 +272,7 @@ bool ObMediumCompactionInfo::is_valid() const
{
return COMPACTION_TYPE_MAX != compaction_type_
&& medium_snapshot_ > 0
&& medium_scn_.get_val_for_tx() > 0
&& data_version_ > 0
&& storage_schema_.is_valid()
&& parallel_merge_info_.is_valid();
}
@ -284,7 +284,7 @@ void ObMediumCompactionInfo::reset()
compaction_type_ = COMPACTION_TYPE_MAX;
cluster_id_ = 0;
medium_snapshot_ = 0;
medium_scn_.set_min();
data_version_ = 0;
storage_schema_.reset();
parallel_merge_info_.destroy();
}
@ -349,7 +349,7 @@ int ObMediumCompactionInfo::serialize(char *buf, const int64_t buf_len, int64_t
info_,
cluster_id_,
medium_snapshot_,
medium_scn_,
data_version_,
storage_schema_);
if (contain_parallel_range_) {
LST_DO_CODE(
@ -376,7 +376,7 @@ int ObMediumCompactionInfo::deserialize(
info_,
cluster_id_,
medium_snapshot_,
medium_scn_);
data_version_);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(storage_schema_.deserialize(allocator, buf, data_len, pos))) {
LOG_WARN("failed to deserialize storage schema", K(ret));
@ -400,7 +400,7 @@ int64_t ObMediumCompactionInfo::get_serialize_size() const
info_,
cluster_id_,
medium_snapshot_,
medium_scn_,
data_version_,
storage_schema_);
if (contain_parallel_range_) {
LST_DO_CODE(OB_UNIS_ADD_LEN, parallel_merge_info_);
@ -702,7 +702,7 @@ int ObTabletMediumCompactionInfoRecorder::submit_log(
} else if (OB_FAIL(write_clog(clog_buf, clog_len))) {
LOG_WARN("fail to submit log", K(ret), K_(tablet_id), K(medium_info_));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(dec_ref_on_memtable(false))) {
if (clog_scn_.get_val_for_tx() > 0 && OB_TMP_FAIL(dec_ref_on_memtable(false))) {
LOG_ERROR("failed to dec ref on memtable", K(tmp_ret), K_(ls_id), K_(tablet_id));
}
} else {

View File

@ -128,10 +128,10 @@ public:
int64_t get_serialize_size() const;
void gene_info(char* buf, const int64_t buf_len, int64_t &pos) const;
TO_STRING_KV(K_(cluster_id), K_(medium_compat_version),
TO_STRING_KV(K_(cluster_id), K_(medium_compat_version), K_(data_version),
"compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_),
"medium_merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(medium_merge_reason_), K_(cluster_id),
K_(medium_snapshot), K_(medium_scn), K_(storage_schema),
K_(medium_snapshot), K_(storage_schema),
K_(contain_parallel_range), K_(parallel_merge_info));
public:
static const int64_t MEIDUM_COMPAT_VERSION = 1;
@ -153,8 +153,8 @@ public:
};
uint64_t cluster_id_; // for backup database to throw MEDIUM_COMPACTION clog
uint64_t data_version_;
int64_t medium_snapshot_;
share::SCN medium_scn_; // for follower minor merge
storage::ObStorageSchema storage_schema_;
ObParallelMergeInfo parallel_merge_info_;
};

View File

@ -780,7 +780,7 @@ int ObPartitionMergePolicy::refine_minor_merge_result(
{
int ret = OB_SUCCESS;
ObMergeType &merge_type = result.suggest_merge_type_;
if (result.handle_.get_count() <= minor_compact_trigger) {
if (result.handle_.get_count() <= MAX(minor_compact_trigger, 1)) {
ret = OB_NO_NEED_MERGE;
LOG_DEBUG("minor refine, no need to do minor merge", K(result));
result.handle_.reset();

View File

@ -85,7 +85,8 @@ int ObPartitionMerger::init_data_store_desc(ObTabletMergeCtx &ctx)
ctx.param_.ls_id_,
ctx.param_.tablet_id_,
ctx.param_.get_merge_type(),
ctx.sstable_version_range_.snapshot_version_))) {
ctx.sstable_version_range_.snapshot_version_,
ctx.data_version_))) {
STORAGE_LOG(WARN, "Failed to init data store desc", K(ret), K(ctx));
} else {
merge_info_.reset();

View File

@ -524,6 +524,7 @@ ObTabletMergeCtx::ObTabletMergeCtx(
compaction_filter_(nullptr),
time_guard_(),
rebuild_seq_(-1),
data_version_(0),
merge_list_()
{
merge_scn_.set_max();
@ -667,6 +668,7 @@ int ObTabletMergeCtx::inner_init_for_medium()
} else if (OB_FAIL(init_get_medium_compaction_info(param_.merge_version_, medium_info))) { // have checked medium info inside
LOG_WARN("failed to get medium compaction info", K(ret), KPC(this));
} else if (FALSE_IT(get_merge_table_result.schema_version_ = medium_info->storage_schema_.schema_version_)) {
} else if (FALSE_IT(data_version_ = medium_info->data_version_)) {
} else if (FALSE_IT(is_tenant_major_merge_ = medium_info->is_major_compaction())) {
} else if (OB_FAIL(get_basic_info_from_result(get_merge_table_result))) {
LOG_WARN("failed to set basic info to ctx", K(ret), K(get_merge_table_result), KPC(this));
@ -1111,7 +1113,8 @@ int ObTabletMergeCtx::prepare_index_tree()
param_.ls_id_,
param_.tablet_id_,
param_.merge_type_,
sstable_version_range_.snapshot_version_))) {
sstable_version_range_.snapshot_version_,
data_version_))) {
LOG_WARN("failed to init index store desc", K(ret), KPC(this));
} else {
// TODO(zhuixin.gsy) modify index_desc.init to avoid reset col_desc_array_

View File

@ -242,6 +242,7 @@ public:
compaction::ObICompactionFilter *compaction_filter_;
ObCompactionTimeGuard time_guard_;
int64_t rebuild_seq_;
uint64_t data_version_;
ObMediumCompactionInfoList merge_list_;
TO_STRING_KV(K_(param), K_(sstable_version_range), K_(create_snapshot_version),
@ -255,7 +256,7 @@ public:
K_(scn_range), K_(merge_scn), K_(read_base_version),
K_(ls_handle), K_(tablet_handle),
KPC_(merge_progress),
KPC_(compaction_filter), K_(time_guard), K_(rebuild_seq), K_(merge_list));
KPC_(compaction_filter), K_(time_guard), K_(rebuild_seq), K_(data_version), K_(merge_list));
private:
DISALLOW_COPY_AND_ASSIGN(ObTabletMergeCtx);
};

View File

@ -912,7 +912,12 @@ void ObTenantFreezeInfoMgr::ReloadTask::runTimerTask()
void ObTenantFreezeInfoMgr::UpdateLSResvSnapshotTask::runTimerTask()
{
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(mgr_.try_update_reserved_snapshot())) {
uint64_t compat_version = 0;
if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
LOG_WARN("fail to get data version", K(tmp_ret));
} else if (compat_version < DATA_VERSION_4_1_0_0) {
// do nothing, should not update reserved snapshot
} else if (OB_TMP_FAIL(mgr_.try_update_reserved_snapshot())) {
LOG_WARN("fail to try reserved snapshot", KR(tmp_ret));
}
}

View File

@ -708,7 +708,9 @@ int ObTenantTabletScheduler::schedule_tablet_minor_merge(
} else {
ObTabletMergeDagParam dag_param(MERGE_TYPES[i], ls_id, tablet_id);
for (int k = 0; OB_SUCC(ret) && k < parallel_results.count(); ++k) {
if (OB_FAIL(schedule_merge_execute_dag<T>(dag_param, ls_handle, tablet_handle, parallel_results.at(k)))) {
if (OB_UNLIKELY(parallel_results.at(k).handle_.get_count() <= 1)) {
LOG_WARN("invalid parallel result", K(ret), K(k), K(parallel_results));
} else if (OB_FAIL(schedule_merge_execute_dag<T>(dag_param, ls_handle, tablet_handle, parallel_results.at(k)))) {
LOG_WARN("failed to schedule minor execute dag", K(ret), K(k), K(parallel_results.at(k)));
} else {
LOG_INFO("success to schedule tablet minor merge", K(ret), K(ls_id), K(tablet_id),

View File

@ -179,7 +179,7 @@ int ObLSReservedSnapshotMgr::update_min_reserved_snapshot_for_leader(const int64
} // end of lock
if (OB_SUCC(ret) && send_log_flag) {
if (OB_FAIL(try_update_for_leader(new_snapshot_version, nullptr/*allocator*/))) {
if (OB_FAIL(sync_clog(new_snapshot_version))) {
LOG_WARN("failed to send update reserved snapshot log", K(ret), K(new_snapshot_version));
} else if (need_print_log()) {
LOG_INFO("submit reserved snapshot log success", "ls_id", ls_->get_ls_id(),
@ -204,7 +204,7 @@ int ObLSReservedSnapshotMgr::try_sync_reserved_snapshot(
if (OB_FAIL(update_min_reserved_snapshot_for_leader(new_reserved_snapshot))) {
LOG_WARN("failed to update min_reserved_snapshot", K(ret), K(new_reserved_snapshot));
}
} else if (OB_FAIL(try_update_for_leader(new_reserved_snapshot, nullptr/*allocator*/))) {
} else if (OB_FAIL(sync_clog(new_reserved_snapshot))) {
LOG_WARN("failed to send update reserved snapshot log", K(ret), K(new_reserved_snapshot));
} else if (need_print_log()) {
LOG_INFO("submit reserved snapshot log success", "ls_id", ls_->get_ls_id(),
@ -213,6 +213,20 @@ int ObLSReservedSnapshotMgr::try_sync_reserved_snapshot(
return ret;
}
int ObLSReservedSnapshotMgr::sync_clog(const int64_t new_reserved_snapshot)
{
int ret = OB_SUCCESS;
uint64_t compat_version = 0;
if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
LOG_WARN("fail to get data version", K(ret));
} else if (compat_version < DATA_VERSION_4_1_0_0) {
// do nothing, should sync clog
} else if (OB_FAIL(try_update_for_leader(new_reserved_snapshot, nullptr/*allocator*/))) {
LOG_WARN("failed to send update reserved snapshot log", K(ret), K(new_reserved_snapshot));
}
return ret;
}
int ObLSReservedSnapshotMgr::replay_reserved_snapshot_log(
const share::SCN &scn, const char *buf, const int64_t size, int64_t &pos)
{

View File

@ -90,6 +90,7 @@ private:
}
return bret;
}
int sync_clog(const int64_t new_reserved_snapshot);
bool is_inited_;
common::ObArenaAllocator allocator_;

View File

@ -2425,10 +2425,12 @@ int ObTablet::get_kept_multi_version_start(
int64_t &multi_version_start)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
multi_version_start = 0;
int64_t max_merged_snapshot = 0;
int64_t min_reserved_snapshot = 0;
int64_t min_medium_snapshot = INT64_MAX;
int64_t ls_min_reserved_snapshot = INT64_MAX;
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
const ObTabletTableStore &table_store = tablet.get_table_store();
if (0 != table_store.get_major_sstables().count()) {
@ -2444,9 +2446,17 @@ int ObTablet::get_kept_multi_version_start(
&& OB_FAIL(tablet.get_min_medium_snapshot(min_medium_snapshot))) {
LOG_WARN("failed to get min medium snapshot", K(ret), K(tablet));
}
// for compat, if cluster not upgrade to 4.1, should not consider ls.get_min_reserved_snapshot()
uint64_t compat_version = 0;
if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
LOG_WARN("fail to get data version", K(tmp_ret));
} else if (compat_version >= DATA_VERSION_4_1_0_0) {
ls_min_reserved_snapshot = ls.get_min_reserved_snapshot();
}
if (OB_SUCC(ret)) {
min_reserved_snapshot = common::min(
ls.get_min_reserved_snapshot(),
ls_min_reserved_snapshot,
common::min(min_reserved_snapshot, min_medium_snapshot));
multi_version_start = MIN(MAX(min_reserved_snapshot, multi_version_start), tablet.get_snapshot_version());
}

View File

@ -169,7 +169,7 @@ void TestCompactionPolicy::SetUp()
medium_info_.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
medium_info_.medium_snapshot_ = 100;
medium_info_.medium_scn_.convert_for_tx(100);
medium_info_.data_version_ = 100;
medium_info_.storage_schema_.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL);
}

View File

@ -30,7 +30,7 @@ public:
medium_info_.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION;
medium_info_.medium_snapshot_ = 100;
medium_info_.medium_scn_.convert_for_tx(100);
medium_info_.data_version_ = 100;
medium_info_.cluster_id_ = INIT_CLUSTER_ID;
medium_info_.storage_schema_.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL);