[OBCDC] Support transfer_in trans with single prepare_log or commit_log

This commit is contained in:
SanmuWangZJU
2024-02-28 07:44:43 +00:00
committed by ob-robot
parent 938e06f8eb
commit 255b1c2508
2 changed files with 55 additions and 22 deletions

View File

@ -804,9 +804,17 @@ int ObCDCPartTransResolver::handle_prepare_(
LOG_DEBUG("push_back_commit_info_log_lsn_to_miss_log", K_(tls_id), K(tx_id), K(prepare_log), K(missing_info));
}
} else {
// prev_record_lsn is invalid, may transfer after trans execute prepare
// usually, commit_info_log and prepare_log is in the same log_entry. and won't handle
// prepare_log in this case
const transaction::ObTxPrevLogType &prev_log_type = prepare_log.get_prev_log_type();
if (OB_UNLIKELY(prev_log_type.is_transfer_in())) {
LOG_INFO("[TRANS_TRANSFER_IN] trans transfer in with only prepare log", K_(tls_id), K(tx_id), K(prepare_lsn), K(prepare_log));
// trans transfer to this ls with only a prepare log, but no commit info log. Most likely in transfer case.`
part_trans_task->mark_read_commit_info();
} else {
ret = OB_STATE_NOT_MATCH;
LOG_ERROR("unexpected trans state: trans not read commit_info_log and commit_info_log lsn is invalid", KR(ret), K_(tls_id), K(tx_id),
K(prepare_log), K(prev_log_type), K(missing_info), KPC(part_trans_task));
}
}
}
@ -826,7 +834,7 @@ int ObCDCPartTransResolver::handle_commit_(
bool &is_served)
{
int ret = OB_SUCCESS;
const bool is_resolving_miss_log = missing_info.is_resolving_miss_log();
const bool is_reconsuming = missing_info.is_reconsuming();
transaction::ObTxCommitLogTempRef tmp_ref;
transaction::ObTxCommitLog commit_log(tmp_ref);
PartTransTask *part_trans_task = NULL;
@ -854,13 +862,13 @@ int ObCDCPartTransResolver::handle_commit_(
if (OB_FAIL(part_trans_dispatcher_.remove_task(tls_id_.is_sys_log_stream(), tx_id))) {
LOG_ERROR("handle unserverd PartTransTask failed", KR(ret), K_(tls_id), K(tx_id));
}
} else if (OB_FAIL(obtain_task_(tx_id, part_trans_task, is_resolving_miss_log))) {
} else if (OB_FAIL(obtain_task_(tx_id, part_trans_task, is_reconsuming))) {
LOG_ERROR("obtain_part_trans_task fail while reading commit log", KR(ret), K_(tls_id), K(tx_id), K(lsn),
K(commit_log), K(missing_info));
} else if (OB_FAIL(part_trans_task->push_multi_data_source_data(lsn, commit_log.get_multi_source_data(), true/*is_commit_log*/))) {
LOG_ERROR("push_multi_data_source_data failed", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(commit_log), KPC(part_trans_task));
} else if (!part_trans_task->has_read_commit_info()) {
if (is_resolving_miss_log) {
if (is_reconsuming) {
// commit info is miss log and handled done, reconsumeing commit_log
// should have read commit_info log
ret = OB_ERR_UNEXPECTED;
@ -868,23 +876,48 @@ int ObCDCPartTransResolver::handle_commit_(
K_(tls_id), K(tx_id), K(commit_log), K(lsn), KPC(part_trans_task), K(missing_info));
} else {
const palf::LSN &prev_log_lsn = commit_log.get_prev_lsn();
// 1. if dist-trans, prepare_log should be not the same log_entry with commit log, the prev_log_lsn in commit_log should be prepare log,
// should fetch prev prepare log and misslog found in prepare log.
// 2. if single_ls_trans, which doesn't have prepare_log, has_read_commit_info == false means
// commit_info_log is not fetched yet, should try fetch prev_log_lsn(if valid).
if (OB_UNLIKELY(!prev_log_lsn.is_valid())) {
// 1. if the prev_log is in the same log_entry with commit_log, it should already handled before commit_log
// 2. if the prev_log is not same log_entry with commit_log, the prev_log_lsn should be valid
// 3. in transfer case, CommitLog may be alone at a LS, should remove the task
ret = OB_ERR_UNEXPECTED;
// expect prev_log_lsn of commit_log is valid: prepare_log for dist_trans and
// commit_info_log for single_ls_trans.
LOG_ERROR("expect valid prev_log_lsn in commit_log", KR(ret), K_(tls_id), K(tx_id), K(commit_log), KPC(part_trans_task));
// remove part_trans_task if dist_trans and invalid prev_log_lsn
// if (OB_FAIL(part_trans_dispatcher_.remove_task(tls_id_.is_sys_log_stream(), tx_id))) {
// LOG_ERROR("handle unserverd single CommitLog(commit_log with invalid prev_log_lsn in dist_trans) failed",
// KR(ret), K_(tls_id), K(tx_id), K(commit_log), K(lsn));
// }
const transaction::ObTxPrevLogType &prev_log_type = commit_log.get_prev_log_type();
if (OB_UNLIKELY(prev_log_type.is_transfer_in())) {
LOG_INFO("[TRANS_TRANSFER_IN] trans transfer in with only commit log", K_(tls_id), K(tx_id), K(lsn), K(commit_log));
const transaction::ObLSLogInfoArray &ls_prepare_info_arr = commit_log.get_ls_log_info_arr();
ARRAY_FOREACH(ls_prepare_info_arr, idx) {
const transaction::ObLSLogInfo &ls_log_info = ls_prepare_info_arr.at(idx);
// the trans is transfered from another LS and only commit_log is transfered to this LS,
// thus expected ls_log_info_arr(which record prepare_log info) should not contain current LS
if (OB_UNLIKELY(ls_log_info.get_ls_id() == tls_id_.get_ls_id())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("expected current ls not in prepare_log_arr while prev_log_type of commit_log is TRANSFER_IN",
KR(ret), K_(tls_id), K(tx_id), K(lsn), K(commit_log));
}
}
if (FAILEDx(part_trans_dispatcher_.remove_task(tls_id_.is_sys_log_stream(), tx_id))) {
LOG_ERROR("remove transfer_in(only commit_log) part_trans failed", KR(ret), K_(tls_id), K(tx_id));
} else {
LOG_INFO("[TRANSFER_IN] remove part_trans transfered in with only commit_log", K_(tls_id), K(tx_id), K(commit_log));
}
} else {
// 1. if the prev_log is in the same log_entry with commit_log, it should already handled before commit_log
// 2. if the prev_log is not same log_entry with commit_log, the prev_log_lsn should be valid
// 3. in transfer case, CommitLog may be alone at a LS, should remove the task
ret = OB_ERR_UNEXPECTED;
// expect prev_log_lsn of commit_log is valid: prepare_log for dist_trans and
// commit_info_log for single_ls_trans.
LOG_ERROR("expect valid prev_log_lsn in commit_log", KR(ret), K_(tls_id), K(tx_id), K(commit_log), KPC(part_trans_task));
// remove part_trans_task if dist_trans and invalid prev_log_lsn
// if (OB_FAIL(part_trans_dispatcher_.remove_task(tls_id_.is_sys_log_stream(), tx_id))) {
// LOG_ERROR("handle unserverd single CommitLog(commit_log with invalid prev_log_lsn in dist_trans) failed",
// KR(ret), K_(tls_id), K(tx_id), K(commit_log), K(lsn));
// }
}
} else if (OB_FAIL(missing_info.set_miss_record_or_state_log_lsn(prev_log_lsn))) {
LOG_ERROR("push_back_single_miss_log_lsn failed", KR(ret), K_(tls_id), K(tx_id), K(commit_log), K(missing_info));
} else {

View File

@ -207,15 +207,15 @@ public:
ObTxPrevLogType() { reset(); }
ObTxPrevLogType(const TypeEnum prev_log_type) : prev_log_type_(prev_log_type) {}
bool is_valid() { return prev_log_type_ > 0; }
bool is_valid() const { return prev_log_type_ > 0; }
void set_self() { prev_log_type_ = TypeEnum::SELF; }
bool is_self() { return TypeEnum::SELF == prev_log_type_; }
bool is_self() const { return TypeEnum::SELF == prev_log_type_; }
void set_tranfer_in() { prev_log_type_ = TypeEnum::TRANSFER_IN; }
bool is_transfer_in() { return TypeEnum::TRANSFER_IN == prev_log_type_; }
bool is_transfer_in() const { return TypeEnum::TRANSFER_IN == prev_log_type_; }
void set_prepare() { prev_log_type_ = TypeEnum::PREPARE; }
void set_commit_info() { prev_log_type_ = TypeEnum::COMMIT_INFO; }
bool is_normal_log()
bool is_normal_log() const
{
return TypeEnum::COMMIT_INFO == prev_log_type_ || TypeEnum::PREPARE == prev_log_type_;
}