add transfer_seq check when merge

This commit is contained in:
a1iive 2023-11-10 08:09:13 +00:00 committed by ob-robot
parent d49d442c2f
commit 4d7029aa80
8 changed files with 46 additions and 13 deletions

View File

@ -234,7 +234,8 @@ int ObCOMergePrepareTask::create_schedule_dag(ObCOTabletMergeCtx &ctx)
result.version_range_.multi_version_start_ = ctx.get_tablet()->get_multi_version_start();
result.version_range_.base_version_ = 0;
result.version_range_.snapshot_version_ = ctx.get_tablet()->get_snapshot_version();
ObTabletMergeDagParam dag_param(MINOR_MERGE, ctx.get_ls_id(), ctx.get_tablet_id());
ObTabletMergeDagParam dag_param(MINOR_MERGE, ctx.get_ls_id(), ctx.get_tablet_id(),
ctx.get_transfer_seq());
if (OB_FAIL(MTL(share::ObTenantDagScheduler *)->alloc_dag(minor_exe_dag))) {
LOG_WARN("failed to alloc dag", K(ret));
} else if (OB_FAIL(minor_exe_dag->prepare_init(
@ -1271,6 +1272,10 @@ int ObCOMergeDagNet::swap_tablet_after_minor()
ObTabletCommon::DEFAULT_GET_TABLET_NO_WAIT,
storage::ObMDSGetTabletMode::READ_ALL_COMMITED))) {
LOG_WARN("failed to get tablet", K(ret));
} else if (OB_FAIL(ObTablet::check_transfer_seq_equal(*tmp_tablet_handle.get_obj(), co_merge_ctx_->get_transfer_seq()))) {
LOG_WARN("tmp tablet transfer seq not eq with old transfer seq", K(ret),
"tmp_tablet_meta", tmp_tablet_handle.get_obj()->get_tablet_meta(),
"old_transfer_seq", co_merge_ctx_->get_transfer_seq());
} else if (OB_FAIL(ObPartitionMergePolicy::get_result_by_snapshot(
*tmp_tablet_handle.get_obj(),
co_merge_ctx_->get_merge_version(),
@ -1434,6 +1439,10 @@ int ObCOMergeDagNet::get_compat_mode()
0/*timeout_us*/,
storage::ObMDSGetTabletMode::READ_ALL_COMMITED))) {
LOG_WARN("failed to get tablet", K(ret), K(ls_id_), K(tablet_id_));
} else if (OB_FAIL(ObTablet::check_transfer_seq_equal(*tmp_tablet_handle.get_obj(), basic_param_.transfer_seq_))) {
LOG_WARN("tmp tablet transfer seq not eq with old transfer seq", K(ret),
"tmp_tablet_meta", tmp_tablet_handle.get_obj()->get_tablet_meta(),
"old_transfer_seq", basic_param_.transfer_seq_);
} else {
basic_param_.dag_net_id_ = get_dag_id();
basic_param_.skip_get_tablet_ = true;

View File

@ -370,6 +370,10 @@ int ObBasicTabletMergeCtx::build_ctx(bool &finish_flag)
if (OB_TABLET_NOT_EXIST != ret) {
LOG_PRINT_WRAPPER("failed to get ls_handle/tablet_handle/rebuild_seq");
}
} else if (ObTablet::check_transfer_seq_equal(*get_tablet(), get_transfer_seq())) {
LOG_WARN("new tablet transfer seq not eq with old transfer seq", K(ret),
"new_tablet_meta", get_tablet()->get_tablet_meta(),
"old_transfer_seq", get_transfer_seq());
} else if (OB_FAIL(get_merge_tables(get_merge_table_result))) {
if (OB_NO_NEED_MERGE != ret) {
LOG_PRINT_WRAPPER("failed to get merge tables");
@ -1025,6 +1029,10 @@ int ObBasicTabletMergeCtx::swap_tablet(ObGetMergeTablesResult &get_merge_table_r
tables_handle.reset(); // clear tables array
if (OB_FAIL(swap_tablet())) {
LOG_WARN("failed to get alloc tablet handle", KR(ret));
} else if (OB_FAIL(ObTablet::check_transfer_seq_equal(*get_tablet(), get_transfer_seq()))) {
LOG_WARN("new tablet transfer seq not eq with old transfer seq", K(ret),
"new_tablet_meta", get_tablet()->get_tablet_meta(),
"old_transfer_seq", get_transfer_seq());
} else if (OB_FAIL(get_merge_tables(get_merge_table_result))) {
if (OB_NO_NEED_MERGE != ret) {
LOG_WARN("failed to get merge tables", KR(ret), KPC(this));

View File

@ -182,6 +182,7 @@ public:
DAG_PARAM_FUNC(const ObLSID &, ls_id);
DAG_PARAM_FUNC(const ObTabletID &, tablet_id);
DAG_PARAM_FUNC(int64_t, merge_version);
DAG_PARAM_FUNC(int64_t, transfer_seq);
STATIC_PARAM_FUNC(bool, is_full_merge);
STATIC_PARAM_FUNC(bool, need_parallel_minor_merge);
STATIC_PARAM_FUNC(int64_t, read_base_version);

View File

@ -311,6 +311,7 @@ ObTabletMergeDagParam::ObTabletMergeDagParam()
is_reserve_mode_(false),
merge_type_(INVALID_MERGE_TYPE),
merge_version_(0),
transfer_seq_(-1),
ls_id_(),
tablet_id_()
{
@ -319,13 +320,15 @@ ObTabletMergeDagParam::ObTabletMergeDagParam()
ObTabletMergeDagParam::ObTabletMergeDagParam(
const compaction::ObMergeType merge_type,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id)
const ObTabletID &tablet_id,
const int64_t transfer_seq)
: skip_get_tablet_(false),
is_tenant_major_merge_(false),
need_swap_tablet_flag_(false),
is_reserve_mode_(false),
merge_type_(merge_type),
merge_version_(0),
transfer_seq_(transfer_seq),
ls_id_(ls_id),
tablet_id_(tablet_id)
{
@ -381,6 +384,10 @@ int ObTabletMergeDag::get_tablet_and_compat_mode()
if (OB_NO_NEED_MERGE != ret) {
LOG_WARN("failed to check need merge", K(ret));
}
} else if (OB_FAIL(ObTablet::check_transfer_seq_equal(*tmp_tablet_handle.get_obj(), param_.transfer_seq_))) {
LOG_WARN("tmp tablet transfer seq not eq with old transfer seq", K(ret),
"tmp_tablet_meta", tmp_tablet_handle.get_obj()->get_tablet_meta(),
"old_transfer_seq", param_.transfer_seq_);
} else if (FALSE_IT(compat_mode_ = tmp_tablet_handle.get_obj()->get_tablet_meta().compat_mode_)) {
} else if (is_mini_merge(merge_type_)) {
int64_t inc_sstable_cnt = 0;

View File

@ -148,11 +148,11 @@ struct ObTabletMergeDagParam : public share::ObIDagInitParam
ObTabletMergeDagParam(
const compaction::ObMergeType merge_type,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id);
const ObTabletID &tablet_id,
const int64_t transfer_seq);
virtual bool is_valid() const override;
VIRTUAL_TO_STRING_KV(K_(skip_get_tablet), "merge_type", merge_type_to_str(merge_type_), K_(merge_version),
K_(ls_id), K_(tablet_id), K_(is_tenant_major_merge), K_(need_swap_tablet_flag), K_(is_reserve_mode));
K_(ls_id), K_(tablet_id), K_(is_tenant_major_merge), K_(need_swap_tablet_flag), K_(is_reserve_mode), K_(transfer_seq));
bool skip_get_tablet_;
bool is_tenant_major_merge_;
@ -160,6 +160,7 @@ struct ObTabletMergeDagParam : public share::ObIDagInitParam
bool is_reserve_mode_;
compaction::ObMergeType merge_type_;
int64_t merge_version_;
int64_t transfer_seq_; // only affect minor and major now
share::ObLSID ls_id_;
ObTabletID tablet_id_;
ObCompactionParam compaction_param_; // used for adaptive compaction dag scheduling

View File

@ -844,6 +844,7 @@ int ObTenantTabletScheduler::schedule_merge_dag(
param.merge_version_ = merge_snapshot_version;
param.is_tenant_major_merge_ = is_tenant_major_merge;
param.compat_mode_ = tablet.get_tablet_meta().compat_mode_;
param.transfer_seq_ = tablet.get_tablet_meta().transfer_info_.transfer_seq_;
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tablet_co_merge_dag_net(param))) {
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
LOG_WARN("failed to schedule tablet merge dag", K(ret));
@ -857,6 +858,7 @@ int ObTenantTabletScheduler::schedule_merge_dag(
param.merge_type_ = merge_type;
param.merge_version_ = merge_snapshot_version;
param.is_tenant_major_merge_ = is_tenant_major_merge;
param.transfer_seq_ = tablet.get_tablet_meta().transfer_info_.transfer_seq_;
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tablet_merge_dag(param))) {
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
LOG_WARN("failed to schedule tablet merge dag", K(ret));
@ -924,6 +926,7 @@ int ObTenantTabletScheduler::schedule_tablet_meta_merge(
dag_param.merge_version_ = result.merge_version_;
dag_param.is_tenant_major_merge_ = false;
dag_param.compat_mode_ = tablet->get_tablet_meta().compat_mode_;
dag_param.transfer_seq_ = tablet->get_tablet_meta().transfer_info_.transfer_seq_;
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_tablet_co_merge_dag_net(dag_param))) {
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
LOG_WARN("failed to schedule tablet merge dag", K(ret));
@ -931,7 +934,8 @@ int ObTenantTabletScheduler::schedule_tablet_meta_merge(
}
FLOG_INFO("chaser debug schedule co merge dag", K(ret), K(dag_param), K(tablet->is_row_store()));
} else {
ObTabletMergeDagParam dag_param(META_MAJOR_MERGE, ls_id, tablet_id);
ObTabletMergeDagParam dag_param(META_MAJOR_MERGE, ls_id, tablet_id,
tablet->get_tablet_meta().transfer_info_.transfer_seq_);
dag_param.merge_version_ = result.merge_version_;
ObTabletMergeExecuteDag *schedule_dag = nullptr;
if (OB_FAIL(schedule_merge_execute_dag<ObTabletMergeExecuteDag>(dag_param, ls_handle, tablet_handle, result, schedule_dag))) {
@ -1040,7 +1044,8 @@ int ObTenantTabletScheduler::schedule_tablet_minor_merge(
const int64_t parallel_dag_cnt = minor_range_mgr.exe_range_array_.count() + parallel_results.count();
const int64_t total_sstable_cnt = result.handle_.get_count();
const int64_t create_time = common::ObTimeUtility::fast_current_time();
ObTabletMergeDagParam dag_param(MERGE_TYPES[i], ls_id, tablet_id);
ObTabletMergeDagParam dag_param(MERGE_TYPES[i], ls_id, tablet_id,
tablet_handle.get_obj()->get_tablet_meta().transfer_info_.transfer_seq_);
T *schedule_dag = nullptr;
for (int64_t k = 0; OB_SUCC(ret) && k < parallel_results.count(); ++k) {
if (OB_UNLIKELY(parallel_results.at(k).handle_.get_count() <= 1)) {

View File

@ -6496,13 +6496,15 @@ int ObTablet::check_snapshot_readable(int64_t snapshot_version)
return ret;
}
int ObTablet::check_transfer_seq_equal(const ObTablet &old_tablet, const int64_t transfer_seq)
int ObTablet::check_transfer_seq_equal(const ObTablet &tablet, const int64_t transfer_seq)
{
int ret = OB_SUCCESS;
if (old_tablet.get_tablet_meta().transfer_info_.transfer_seq_ != transfer_seq) {
ret = OB_TABLET_TRANSFER_SEQ_NOT_MATCH;
LOG_WARN("old tablet transfer seq not eq with new transfer seq",
"old_tablet_meta", old_tablet.get_tablet_meta(), K(transfer_seq));
if (0 <= transfer_seq) {
if (tablet.get_tablet_meta().transfer_info_.transfer_seq_ != transfer_seq) {
ret = OB_TABLET_TRANSFER_SEQ_NOT_MATCH;
LOG_WARN("tablet transfer seq not eq with transfer seq",
"tablet_meta", tablet.get_tablet_meta(), K(transfer_seq));
}
}
return ret;
}

View File

@ -393,6 +393,7 @@ public:
static int64_t get_lock_wait_timeout(
const int64_t abs_lock_timeout,
const int64_t stmt_timeout);
static int check_transfer_seq_equal(const ObTablet &tablet, const int64_t transfer_seq);
int rowkey_exists(
ObRelativeTable &relative_table,
ObStoreCtx &store_ctx,
@ -595,7 +596,6 @@ private:
int get_tablet_memtable_mgr(ObTabletMemtableMgr *&memtable_mgr) const;
int check_schema_version(const int64_t schema_version);
int check_snapshot_readable(const int64_t snapshot_version);
int check_transfer_seq_equal(const ObTablet &old_tablet, const int64_t transfer_seq);
int get_column_store_sstable_checksum(common::ObIArray<int64_t> &column_checksums, ObCOSSTableV2 &co_sstable);
logservice::ObLogHandler *get_log_handler() const { return log_handler_; } // TODO(bowen.gbw): get log handler from tablet pointer handle