[CP] [4.0] fix the multi-data writen by multi-trans register is filled by last multi-trans abort

This commit is contained in:
obdev
2022-11-18 08:05:37 +00:00
committed by wangzelin.wzl
parent 64f224fd99
commit 35313f0152
3 changed files with 107 additions and 13 deletions

View File

@ -230,7 +230,10 @@ public:
template<class T>
int prepare_data(T &multi_source_data_unit, const transaction::ObMulSourceDataNotifyArg &trans_flags);
template<class T>
int back_fill_log_ts_for_commit(T &multi_source_data_unit);
int clear_unsynced_cnt_for_tx_end_if_need(
T &multi_source_data_unit,
const int64_t log_ts,
const bool for_replay);
template<class T>
int set_multi_data_for_commit(T &multi_source_data_unit, const int64_t log_ts, const bool for_replay, const memtable::MemtableRefOp ref_op);
@ -453,6 +456,17 @@ private:
int set_tx_data_in_tablet_pointer(const ObTabletTxMultiSourceDataUnit &tx_data);
int get_max_sync_storage_schema_version(int64_t &max_schema_version) const;
int check_max_sync_schema_version() const;
template<class T>
int dec_unsynced_cnt_for_if_need(
T &multi_source_data_unit,
const bool for_replay,
const memtable::MemtableRefOp ref_op);
template<class T>
int inner_set_multi_data_for_commit(
T &multi_source_data_unit,
const int64_t log_ts,
const bool for_replay,
const memtable::MemtableRefOp ref_op);
private:
static const int32_t TABLET_VERSION = 1;
@ -623,23 +637,105 @@ int ObTablet::set_multi_data_for_commit(
} else if (OB_UNLIKELY(!multi_source_data_unit.is_valid())) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(multi_source_data_unit));
} else {
} else if (!multi_source_data_unit.is_tx_end()) {
if (OB_FAIL(save_multi_source_data_unit(&multi_source_data_unit, log_ts, for_replay, ref_op, is_callback))) {
TRANS_LOG(WARN, "failed to save tx data", K(ret), K(multi_source_data_unit), K(for_replay), K(ref_op));
} else if (OB_FAIL(back_fill_log_ts_for_commit(multi_source_data_unit))) {
TRANS_LOG(WARN, "failed to back_fill_log_ts_for_commit", K(ret), K(multi_source_data_unit));
}
} else if (OB_FAIL(inner_set_multi_data_for_commit(multi_source_data_unit, log_ts, for_replay, ref_op))) {
TRANS_LOG(WARN, "failed to back_fill_log_ts_for_ddl_data", K(ret), K(multi_source_data_unit));
}
return ret;
}
template<class T>
int ObTablet::dec_unsynced_cnt_for_if_need(
T &multi_source_data_unit,
const bool for_replay,
const memtable::MemtableRefOp ref_op)
{
int ret = OB_SUCCESS;
const int64_t log_ts = INT64_MAX;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not inited", K(ret), K_(is_inited));
} else if (OB_UNLIKELY(!multi_source_data_unit.is_valid())) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(multi_source_data_unit));
} else if (memtable::MultiSourceDataUnitType::TABLET_TX_DATA == multi_source_data_unit.type()) {
ObTabletTxMultiSourceDataUnit tx_data;
if (OB_FAIL(get_tx_data(tx_data))) {
TRANS_LOG(WARN, "failed to get tx data", K(ret));
} else if (OB_FAIL(save_multi_source_data_unit(&tx_data, log_ts, for_replay, ref_op, true/*is_callback*/))) {
TRANS_LOG(WARN, "failed to save tx data", K(ret), K(tx_data), K(log_ts));
}
} else if (memtable::MultiSourceDataUnitType::TABLET_BINDING_INFO == multi_source_data_unit.type()) {
ObTabletBindingInfo binding_info;
if (OB_FAIL(get_ddl_data(binding_info))) {
TRANS_LOG(WARN, "failed to get binding info", K(ret));
} else if (OB_FAIL(save_multi_source_data_unit(&binding_info, log_ts, for_replay, ref_op, true/*is_callback*/))) {
TRANS_LOG(WARN, "failed to binding info", K(ret), K(binding_info), K(log_ts));
}
} else {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "multi-data type not support", K(ret), K(multi_source_data_unit));
}
return ret;
}
template<class T>
int ObTablet::inner_set_multi_data_for_commit(
T &multi_source_data_unit,
const int64_t log_ts,
const bool for_replay,
const memtable::MemtableRefOp ref_op)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not inited", K(ret), K_(is_inited));
} else if (OB_UNLIKELY(!multi_source_data_unit.is_valid())) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(multi_source_data_unit));
} else if (OB_FAIL(dec_unsynced_cnt_for_if_need(multi_source_data_unit, for_replay, ref_op))) {
TRANS_LOG(WARN, "failed to dec_unsynced_cnt_for_ddl_data_if_need", K(ret), K(multi_source_data_unit), K(for_replay), K(ref_op));
} else if (memtable::MultiSourceDataUnitType::TABLET_TX_DATA == multi_source_data_unit.type()) {
ObTabletTxMultiSourceDataUnit tx_data;
if (OB_FAIL(get_tx_data(tx_data))) {
TRANS_LOG(WARN, "failed to get tx data", K(ret));
} else if (OB_FAIL(tx_data.deep_copy(&multi_source_data_unit))) {
TRANS_LOG(WARN, "fail to deep_copy", K(ret), K(multi_source_data_unit), K(tx_data), K(get_tablet_meta()));
} else if (OB_FAIL(clear_unsynced_cnt_for_tx_end_if_need(tx_data, log_ts, for_replay))) {
TRANS_LOG(WARN, "failed to save tx data", K(ret), K(tx_data), K(log_ts));
}
} else if (memtable::MultiSourceDataUnitType::TABLET_BINDING_INFO == multi_source_data_unit.type()) {
ObTabletBindingInfo binding_info;
if (OB_FAIL(get_ddl_data(binding_info))) {
TRANS_LOG(WARN, "failed to get binding info", K(ret));
} else if (OB_FAIL(binding_info.deep_copy(&multi_source_data_unit))) {
TRANS_LOG(WARN, "fail to deep_copy", K(ret), K(multi_source_data_unit), K(binding_info), K(get_tablet_meta()));
} else if (OB_FAIL(clear_unsynced_cnt_for_tx_end_if_need(binding_info, log_ts, for_replay))) {
TRANS_LOG(WARN, "failed to save ddl data", K(ret), K(multi_source_data_unit), K(log_ts));
}
} else {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "multi-data type not support", K(ret), K(multi_source_data_unit));
}
return ret;
}
template<class T>
int ObTablet::back_fill_log_ts_for_commit(T &multi_source_data_unit)
int ObTablet::clear_unsynced_cnt_for_tx_end_if_need(
T &multi_source_data_unit,
const int64_t log_ts,
const bool for_replay)
{
int ret = OB_SUCCESS;
const int64_t log_ts = INT64_MAX;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not inited", K(ret), K_(is_inited));
@ -647,13 +743,10 @@ int ObTablet::back_fill_log_ts_for_commit(T &multi_source_data_unit)
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(multi_source_data_unit));
} else if (OB_UNLIKELY(!multi_source_data_unit.is_tx_end())) {
TRANS_LOG(INFO, "skip for is_tx_end is false", K(multi_source_data_unit));
} else if (FALSE_IT(multi_source_data_unit.set_tx_end(false))) {
} else if (OB_FAIL(save_multi_source_data_unit(&multi_source_data_unit, log_ts, false/*for_replay*/, memtable::MemtableRefOp::DEC_REF, true/*is_callback*/))) {
} else if (OB_FAIL(save_multi_source_data_unit(&multi_source_data_unit, log_ts, for_replay, memtable::MemtableRefOp::DEC_REF, true/*is_callback*/))) {
TRANS_LOG(WARN, "failed to save tx data", K(ret), K(multi_source_data_unit), K(log_ts));
}
TRANS_LOG(INFO, "back fill log ts for commit", K(ret), K(multi_source_data_unit));
return ret;
}

View File

@ -539,6 +539,7 @@ int ObTabletBindingHelper::fix_unsynced_cnt_for_binding_info(const ObTabletID &t
ObTabletHandle tablet_handle;
ObTablet *tablet = nullptr;
ObTabletBindingInfo binding_info;
const int64_t log_ts = INT64_MAX;
if (OB_FAIL(get_tablet(tablet_id, tablet_handle))) {
if (OB_NO_NEED_UPDATE == ret) {
@ -549,7 +550,7 @@ int ObTabletBindingHelper::fix_unsynced_cnt_for_binding_info(const ObTabletID &t
} else if (FALSE_IT(tablet = tablet_handle.get_obj())) {
} else if (OB_FAIL(tablet->get_ddl_data(binding_info))) {
LOG_WARN("failed to get ddl data", KR(ret));
} else if (OB_FAIL(tablet->back_fill_log_ts_for_commit(binding_info))) {
} else if (OB_FAIL(tablet->clear_unsynced_cnt_for_tx_end_if_need(binding_info, log_ts, trans_flags_.for_replay_))) {
LOG_WARN("failed to prepare binding info", KR(ret), K(binding_info));
}

View File

@ -874,8 +874,8 @@ int ObTabletCreateDeleteHelper::roll_back_remove_tablet(
LOG_WARN("failed to get tx data", K(ret), K(ls_id), K(tablet_id), K(cnt));
} else if (OB_FAIL(ObTabletBindingHelper::check_need_dec_cnt_for_abort(tx_data, need_dec))) {
LOG_WARN("failed to save tx data", K(ret), K(tx_data), K(trans_flags));
} else if (need_dec && OB_FAIL(tablet->save_multi_source_data_unit(&tx_data, ObLogTsRange::MAX_TS,
trans_flags.for_replay_, MemtableRefOp::DEC_REF, true/*is_callback*/))) {
} else if (need_dec && OB_FAIL(tablet->set_multi_data_for_commit(tx_data, ObLogTsRange::MAX_TS,
trans_flags.for_replay_, MemtableRefOp::DEC_REF))) {
LOG_WARN("failed to save msd", K(ret), K(ls_id), K(tablet_id), K(cnt));
} else {
LOG_INFO("succeeded to dec ref for memtable in roll back operation", K(ret), K(ls_id), K(tablet_id), K(cnt));