fix the core caused by msg processing exception

This commit is contained in:
obdev 2023-04-26 13:56:34 +00:00 committed by ob-robot
parent 57a5e728d9
commit e88d9ca03e
7 changed files with 36 additions and 9 deletions

File diff suppressed because one or more lines are too long

View File

@ -1686,6 +1686,7 @@ DEFINE_ORACLE_ERROR(OB_TRANS_FREE_ROUTE_NOT_SUPPORTED, -6279, 6279, "HY000", "Qu
DEFINE_ERROR(OB_TRANS_LIVE_TOO_MUCH_TIME, -6280, -1, "HY000", "Transaction cost too much without commit or rollback");
DEFINE_ERROR(OB_TRANS_COMMIT_TOO_MUCH_TIME, -6281, -1, "HY000", "Transaction commit cost too much");
DEFINE_ORACLE_ERROR(OB_TRANS_TOO_MANY_PARTICIPANTS, -6282, 6002, "40000", "too many transaction participants", 24761, "transaction rolled back: too many transaction participants");
// for clog
DEFINE_ERROR(OB_LOG_ID_NOT_FOUND, -6301, -1, "HY000", "log id not found");

View File

@ -1313,6 +1313,7 @@ constexpr int OB_TRANS_IDLE_TIMEOUT = -6278;
constexpr int OB_TRANS_FREE_ROUTE_NOT_SUPPORTED = -6279;
constexpr int OB_TRANS_LIVE_TOO_MUCH_TIME = -6280;
constexpr int OB_TRANS_COMMIT_TOO_MUCH_TIME = -6281;
constexpr int OB_TRANS_TOO_MANY_PARTICIPANTS = -6282;
constexpr int OB_LOG_ID_NOT_FOUND = -6301;
constexpr int OB_LSR_THREAD_STOPPED = -6302;
constexpr int OB_NO_LOG = -6303;
@ -3256,6 +3257,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_TRANS_FREE_ROUTE_NOT_SUPPORTED__USER_ERROR_MSG "Query is not supported to be executed on txn temporary node"
#define OB_TRANS_LIVE_TOO_MUCH_TIME__USER_ERROR_MSG "Transaction cost too much without commit or rollback"
#define OB_TRANS_COMMIT_TOO_MUCH_TIME__USER_ERROR_MSG "Transaction commit cost too much"
#define OB_TRANS_TOO_MANY_PARTICIPANTS__USER_ERROR_MSG "too many transaction participants"
#define OB_LOG_ID_NOT_FOUND__USER_ERROR_MSG "log id not found"
#define OB_LSR_THREAD_STOPPED__USER_ERROR_MSG "log scan runnable thread stop"
#define OB_NO_LOG__USER_ERROR_MSG "no log ever scanned"
@ -5308,6 +5310,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_TRANS_FREE_ROUTE_NOT_SUPPORTED__ORA_USER_ERROR_MSG "ORA-06279: Query is not supported to be executed on txn temporary node"
#define OB_TRANS_LIVE_TOO_MUCH_TIME__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -6280, Transaction cost too much without commit or rollback"
#define OB_TRANS_COMMIT_TOO_MUCH_TIME__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -6281, Transaction commit cost too much"
#define OB_TRANS_TOO_MANY_PARTICIPANTS__ORA_USER_ERROR_MSG "ORA-24761: transaction rolled back: too many transaction participants"
#define OB_LOG_ID_NOT_FOUND__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -6301, log id not found"
#define OB_LSR_THREAD_STOPPED__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -6302, log scan runnable thread stop"
#define OB_NO_LOG__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -6303, no log ever scanned"
@ -5823,7 +5826,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_ERR_DATA_TOO_LONG_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-12899: value too large for column %.*s (actual: %ld, maximum: %ld)"
#define OB_ERR_INVALID_DATE_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-01861: Incorrect datetime value for column '%.*s' at row %ld"
extern int g_all_ob_errnos[2048];
extern int g_all_ob_errnos[2049];
const char *ob_error_name(const int oberr);
const char* ob_error_cause(const int oberr);

View File

@ -310,7 +310,7 @@ int ObTransCtx::set_app_trace_info_(const ObString &app_trace_info)
const int64_t len = app_trace_info.length();
if (OB_UNLIKELY(len < 0) || OB_UNLIKELY(len > OB_MAX_TRACE_ID_BUFFER_SIZE)) {
TRANS_LOG(WARN, "invalid argument", K(app_trace_info), "context", *this);
TRANS_LOG(WARN, "invalid argument", "context", *this);
ret = OB_INVALID_ARGUMENT;
} else if (0 == trace_info_.get_app_trace_info().length()) {
// set for the first time

View File

@ -6135,9 +6135,13 @@ int ObPartTransCtx::sub_end_tx(const int64_t &request_id,
} else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval
+ trans_id_.hash() % USEC_PER_SEC))) {
TRANS_LOG(WARN, "register timeout handler error", K(ret), KPC(this));
} else if (ObTxState::REDO_COMPLETE != upstream_state_) {
} else if (!is_rollback && ObTxState::REDO_COMPLETE > get_downstream_state()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "not in prepare state", K(ret), KPC(this));
// TODO, check state
} else if (!is_rollback && ObTxState::REDO_COMPLETE < get_upstream_state()) {
// do nothing
} else if (!is_rollback && ObTxState::REDO_COMPLETE == get_upstream_state() && ObTxState::REDO_COMPLETE == get_downstream_state() && !all_downstream_collected_()) {
// do nothing
} else {
tmp_scheduler_= tmp_scheduler;
// (void)set_sub2pc_coord_state(Ob2PCPrepareState::VERSION_PREPARING);

View File

@ -364,10 +364,10 @@ private:
int decide_2pc_log_type_(bool &need_submit, ObTwoPhaseCommitLogType &log_type);
int submit_2pc_log_();
int collect_downstream_(const int64_t participant);
protected:
// Means we collect all downstream responses
bool all_downstream_collected_();
int collect_downstream_(const int64_t participant);
protected:
// colloected_ is the bit set for storing responses from participants
//

View File

@ -498,7 +498,10 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type)
break;
}
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ: {
if (is_sub2pc()) {
if (!is_sub2pc() && TX_MSG_TYPE::TX_2PC_PREPARE_VERSION_REQ == msg_2pc_cache_->type_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpect tx flag", KR(ret), KPC(this));
} else if (is_sub2pc()) {
// prepare version for xa trans
// these actions has been done in entrance function handle_tx_2pc_prepare_version_req
} else {
@ -514,7 +517,10 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type)
break;
}
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP: {
if (is_sub2pc()) {
if (!is_sub2pc() && TX_MSG_TYPE::TX_2PC_PREPARE_VERSION_RESP == msg_2pc_cache_->type_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpect tx flag", KR(ret), KPC(this));
} else if (is_sub2pc()) {
// prepare version for xa trans
// these actions has been done in entrance function handle_tx_2pc_prepare_version_resp
} else {