Fix prepare version generating bug
This commit is contained in:
@ -1010,6 +1010,7 @@ int ObPartTransCtx::get_gts_callback(const MonotonicTs srr,
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
const SCN local_prepare_version = SCN::max(gts, max_read_ts);
|
const SCN local_prepare_version = SCN::max(gts, max_read_ts);
|
||||||
|
// should not overwrite the prepare verison of other participants
|
||||||
exec_info_.prepare_version_ = SCN::max(local_prepare_version, exec_info_.prepare_version_);
|
exec_info_.prepare_version_ = SCN::max(local_prepare_version, exec_info_.prepare_version_);
|
||||||
|
|
||||||
if (get_upstream_state() <= ObTxState::PREPARE
|
if (get_upstream_state() <= ObTxState::PREPARE
|
||||||
@ -2207,7 +2208,7 @@ int ObPartTransCtx::generate_prepare_version_()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
if (ObTxState::PREPARE > upstream_state_) {
|
if (!mt_ctx_.is_prepared()) {
|
||||||
SCN gts = SCN::min_scn();
|
SCN gts = SCN::min_scn();
|
||||||
SCN local_max_read_version = SCN::min_scn();
|
SCN local_max_read_version = SCN::min_scn();
|
||||||
bool is_gts_ok = false;
|
bool is_gts_ok = false;
|
||||||
@ -2240,7 +2241,9 @@ int ObPartTransCtx::generate_prepare_version_()
|
|||||||
if (OB_FAIL(get_local_max_read_version_(local_max_read_version))) {
|
if (OB_FAIL(get_local_max_read_version_(local_max_read_version))) {
|
||||||
TRANS_LOG(WARN, "get local max read version failed", KR(ret), K(*this));
|
TRANS_LOG(WARN, "get local max read version failed", KR(ret), K(*this));
|
||||||
} else {
|
} else {
|
||||||
exec_info_.prepare_version_ = std::max(gts, local_max_read_version);
|
// should not overwrite the prepare version of other participants
|
||||||
|
exec_info_.prepare_version_ = SCN::max(SCN::max(gts, local_max_read_version),
|
||||||
|
exec_info_.prepare_version_);
|
||||||
if (exec_info_.prepare_version_ > gts) {
|
if (exec_info_.prepare_version_ > gts) {
|
||||||
mt_ctx_.before_prepare(exec_info_.prepare_version_);
|
mt_ctx_.before_prepare(exec_info_.prepare_version_);
|
||||||
}
|
}
|
||||||
@ -3158,6 +3161,7 @@ int ObPartTransCtx::submit_record_log_()
|
|||||||
int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type)
|
int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
int tmp_ret = OB_SUCCESS;
|
||||||
MTL_SWITCH(tenant_id_) {
|
MTL_SWITCH(tenant_id_) {
|
||||||
switch (log_type) {
|
switch (log_type) {
|
||||||
// for instant logging during execution,
|
// for instant logging during execution,
|
||||||
@ -3178,11 +3182,28 @@ int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case ObTxLogType::TX_PREPARE_LOG: {
|
case ObTxLogType::TX_PREPARE_LOG: {
|
||||||
|
// try generate prepare verison
|
||||||
|
ret = generate_prepare_version_();
|
||||||
|
|
||||||
|
if (OB_SUCC(ret) && mt_ctx_.is_prepared()) {
|
||||||
ret = submit_prepare_log_();
|
ret = submit_prepare_log_();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case ObTxLogType::TX_COMMIT_LOG: {
|
case ObTxLogType::TX_COMMIT_LOG: {
|
||||||
|
if (!mt_ctx_.is_prepared()) {
|
||||||
|
if (!is_local_tx_()) {
|
||||||
|
tmp_ret = OB_ERR_UNEXPECTED;
|
||||||
|
TRANS_LOG(ERROR, "not set prepare verison into mt_ctx, unexpected error", "ret", tmp_ret, K(*this));
|
||||||
|
print_trace_log_();
|
||||||
|
} else {
|
||||||
|
// try generate commit version
|
||||||
|
ret = generate_commit_version_();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (OB_SUCC(ret) && mt_ctx_.is_prepared()) {
|
||||||
ret = submit_commit_log_();
|
ret = submit_commit_log_();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case ObTxLogType::TX_ABORT_LOG: {
|
case ObTxLogType::TX_ABORT_LOG: {
|
||||||
@ -3301,7 +3322,7 @@ int ObPartTransCtx::after_submit_log_(ObTxLogBlock &log_block,
|
|||||||
if (OB_SUCC(ret) && is_contain(cb_arg_array, ObTxLogType::TX_PREPARE_LOG)) {
|
if (OB_SUCC(ret) && is_contain(cb_arg_array, ObTxLogType::TX_PREPARE_LOG)) {
|
||||||
sub_state_.set_state_log_submitting();
|
sub_state_.set_state_log_submitting();
|
||||||
sub_state_.set_state_log_submitted();
|
sub_state_.set_state_log_submitted();
|
||||||
exec_info_.prepare_version_ = MAX(log_cb->get_log_ts(), exec_info_.prepare_version_);
|
exec_info_.prepare_version_ = SCN::max(log_cb->get_log_ts(), exec_info_.prepare_version_);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret) && is_contain(cb_arg_array, ObTxLogType::TX_COMMIT_LOG)) {
|
if (OB_SUCC(ret) && is_contain(cb_arg_array, ObTxLogType::TX_COMMIT_LOG)) {
|
||||||
sub_state_.set_state_log_submitting();
|
sub_state_.set_state_log_submitting();
|
||||||
|
|||||||
Reference in New Issue
Block a user