[BUG] ignore 4377 when participants die

This commit is contained in:
Handora 2023-08-03 02:42:36 +00:00 committed by ob-robot
parent 0561a3c7aa
commit f9b9ce2307
14 changed files with 293 additions and 2 deletions

View File

@ -698,6 +698,9 @@ PCODE_DEF(OB_XA_END_STMT_REQ, 0x772)
PCODE_DEF(OB_XA_ROLLBACK, 0x773)
PCODE_DEF(OB_XA_TERMINATE, 0x774)
// tx state check for 4377
PCODE_DEF(OB_ASK_TX_STATE_FOR_4377, 0x780)
// clog
PCODE_DEF(OB_CLOG, 0x801)

View File

@ -23,6 +23,7 @@
#include "observer/table/ob_htable_utils.h"
#include "observer/table/ob_htable_filter_operator.h"
#include "observer/table/ob_table_service.h"
#include "storage/ls/ob_ls_tablet_service.h"
#include <thread>
#undef private
#undef protected
@ -43,6 +44,17 @@ const char* table_name = "batch_execute_test";
const char* sys_root_pass = "";
typedef char DefaultBuf[128];
namespace oceanbase
{
namespace storage
{
int ObLSTabletService::check_parts_tx_state_in_transfer_for_4377_(transaction::ObTxDesc *)
{
return OB_SUCCESS;
}
}
}
// create table if not exists batch_execute_test (C1 bigint primary key, C2 bigint, C3 varchar(100)) PARTITION BY KEY(C1) PARTITIONS 16
class TestBatchExecute: public ::testing::Test
{

View File

@ -162,6 +162,8 @@ void oceanbase::observer::init_srv_xlator_for_transaction(ObSrvRpcXlator *xlator
RPC_PROCESSOR(ObTxFreeRouteCheckAliveP);
RPC_PROCESSOR(ObTxFreeRouteCheckAliveRespP);
RPC_PROCESSOR(ObTxFreeRoutePushStateP);
// for tx state check of 4377
RPC_PROCESSOR(ObAskTxStateFor4377P);
}
void oceanbase::observer::init_srv_xlator_for_clog(ObSrvRpcXlator *xlator) {

View File

@ -3676,6 +3676,9 @@ int ObLSTabletService::check_old_row_legitimacy(
data_tablet_handle))) {
ret = tmp_ret;
LOG_WARN("check need rollback in transfer for 4377 found exception", K(ret), K(old_row), K(data_table));
} else if (OB_TMP_FAIL(check_parts_tx_state_in_transfer_for_4377_(run_ctx.store_ctx_.mvcc_acc_ctx_.tx_desc_))) {
ret = tmp_ret;
LOG_WARN("check need rollback in transfer for 4377 found exception", K(ret), K(old_row), K(data_table));
} else if (is_udf) {
ret = OB_ERR_INDEX_KEY_NOT_FOUND;
LOG_WARN("index key not found on udf column", K(ret), K(old_row));
@ -6231,6 +6234,7 @@ int ObLSTabletService::ha_get_tablet(
}
return ret;
}
int ObLSTabletService::check_real_leader_for_4377_(const ObLSID ls_id)
{
int ret = OB_SUCCESS;
@ -6314,5 +6318,48 @@ int ObLSTabletService::check_need_rollback_in_transfer_for_4377_(const transacti
return ret;
}
int ObLSTabletService::check_parts_tx_state_in_transfer_for_4377_(transaction::ObTxDesc *tx_desc)
{
int ret = OB_SUCCESS;
transaction::ObTransService *txs = MTL(transaction::ObTransService *);
transaction::ObTxPartList copy_parts;
bool is_alive = false;
if (OB_ISNULL(tx_desc)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tx_desc is null", K(ret));
} else if (OB_FAIL(tx_desc->get_parts_copy(copy_parts))) {
TRANS_LOG(WARN, "get participants copy error", K(ret));
} else {
// Although we can verify whether transaction data is incomplete due to the
// transfer by checking the local tablet. While in the scenario where we
// validate the index table through the main table, we cannot discover
// whether the main table's txn data is incomplete due to the transfer by
// using the local index table. Furthermore, the local index table and the
// main table may not even locate on the same machine.
//
// Therefore, we choose to traverse all participant lists through RPC to
// confirm whether the txn context that could potentially participate in the
// transfer and has been terminated due to the transfer. In this scenario,
// we consider the occurrence of 4377 as misreporting, and we convert it
// into OB_TRANS_NEED_ROLLBACK to indicate the need to rollback the txn.
for (int64_t i = 0; OB_SUCC(ret) && i < copy_parts.count(); i++) {
ObLSID ls_id = copy_parts[i].id_;
is_alive = false;
if (OB_FAIL(txs->ask_tx_state_for_4377(ls_id,
tx_desc->get_tx_id(),
is_alive))) {
TRANS_LOG(WARN, "fail to ask tx state for 4377", K(tx_desc));
} else if (!is_alive) {
ret = OB_TRANS_NEED_ROLLBACK;
LOG_WARN("maybe meet transfer during kill tx, which can cause 4377 error, so we will rollback it",
K(tx_desc));
}
}
}
return ret;
}
} // namespace storage
} // namespace oceanbase

View File

@ -514,6 +514,7 @@ private:
static int check_real_leader_for_4377_(const ObLSID ls_id);
static int check_need_rollback_in_transfer_for_4377_(const transaction::ObTxDesc *tx_desc,
ObTabletHandle &tablet_handle);
static int check_parts_tx_state_in_transfer_for_4377_(transaction::ObTxDesc *tx_desc);
static int build_create_sstable_param_for_migration(
const blocksstable::ObMigrationSSTableParam &migrate_sstable_param,
ObTabletCreateSSTableParam &create_sstable_param);

View File

@ -8052,5 +8052,29 @@ int ObPartTransCtx::handle_trans_collect_state_resp(const ObCollectStateRespMsg
return ret;
}
int ObPartTransCtx::handle_ask_tx_state_for_4377(bool &is_alive)
{
int ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
if (IS_NOT_INIT) {
is_alive = false;
TRANS_LOG(WARN, "ObPartTransCtx not inited");
} else {
if (exec_info_.state_ == ObTxState::ABORT) {
// Lost data read during transfer sources from reading aborted txn.
// Because of the strong log synchronization semantic of transfer, we can
// relay on the transfer src txn is already in a state of death with abort
// log synchronized.
is_alive = false;
} else {
is_alive = true;
}
}
TRANS_LOG(INFO, "handle ask tx state for 4377", K(ret), KPC(this), K(is_alive));
return ret;
}
} // namespace transaction
} // namespace oceanbase

View File

@ -217,6 +217,9 @@ public:
int handle_trans_ask_state_resp(const ObAskStateRespMsg &msg);
int handle_trans_collect_state(ObStateInfo &state_info, const SCN &snapshot);
int handle_trans_collect_state_resp(const ObCollectStateRespMsg &msg);
// tx state check for 4377
int handle_ask_tx_state_for_4377(bool &is_alive);
public:
// thread safe
int64_t to_string(char* buf, const int64_t buf_len) const;

View File

@ -581,6 +581,42 @@ int ObTransRpc::post_sub_response_msg_(const ObAddr &server, ObTxMsg &msg)
return ret;
}
int ObTransRpc::ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg,
ObAskTxStateFor4377RespMsg &resp)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = trans_service_->get_tenant_id();
int64_t cluster_id = GCONF.cluster_id;
ObAddr server;
if (OB_UNLIKELY(!is_inited_)) {
TRANS_LOG(WARN, "ObTransRpc not inited");
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(!is_running_)) {
TRANS_LOG(WARN, "ObTransRpc is not running");
ret = OB_NOT_RUNNING;
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))
|| OB_UNLIKELY(!msg.is_valid())) {
TRANS_LOG(WARN, "invalid argument", K(tenant_id), K(msg));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(trans_service_->get_location_adapter()->nonblock_get_leader(cluster_id,
tenant_id,
msg.ls_id_,
server))) {
TRANS_LOG(WARN, "get leader failed", KR(ret), K(msg), K(cluster_id), K(tenant_id));
} else {
ret = rpc_proxy_.
to(server).
by(tenant_id).
timeout(GCONF._ob_trans_rpc_timeout).
ask_tx_state_for_4377(msg, resp);
TRANS_LOG(WARN, "ask tx state for 4377 finished", KR(ret), K(msg), K(cluster_id));
}
return ret;
}
int ObTransRpc::post_standby_msg_(const ObAddr &server, ObTxMsg &msg)
{
int ret = OB_SUCCESS;
@ -627,6 +663,30 @@ void ObTransRpc::statistics_()
}
}
int ObAskTxStateFor4377P::process()
{
int ret = OB_SUCCESS;
bool is_alive = false;
transaction::ObAskTxStateFor4377Msg &msg = arg_;
transaction::ObAskTxStateFor4377RespMsg &resp = result_;
transaction::ObTransService *txs = MTL(transaction::ObTransService*);
if (OB_ISNULL(txs)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "fail to get trans service", K(ret));
} else if (OB_FAIL(txs->handle_ask_tx_state_for_4377(msg, is_alive))) {
TRANS_LOG(WARN, "handle ask tx state for 4377 failed", K(ret), K(msg));
} else {
TRANS_LOG(INFO, "handle ask tx state for 4377 succeed", K(ret), K(msg), K(resp));
}
resp.is_alive_ = is_alive;
resp.ret_ = ret;
// We rewrite the return code to distinguish the rpc error and txn error
return OB_SUCCESS;
}
} // transaction
} // oceanbase

View File

@ -106,6 +106,8 @@ public:
RPC_AP(PR3 post_msg, OB_TX_FREE_ROUTE_CHECK_ALIVE, (transaction::ObTxFreeRouteCheckAliveMsg), ObTransRpcResult);
RPC_AP(PR3 post_msg, OB_TX_FREE_ROUTE_CHECK_ALIVE_RESP, (transaction::ObTxFreeRouteCheckAliveRespMsg), ObTransRpcResult);
RPC_S(@PR3 sync_access, OB_TX_FREE_ROUTE_PUSH_STATE, (transaction::ObTxFreeRoutePushState), transaction::ObTxFreeRoutePushStateResp);
// state check for 4377
RPC_S(@PR3 ask_tx_state_for_4377, OB_ASK_TX_STATE_FOR_4377, (transaction::ObAskTxStateFor4377Msg), transaction::ObAskTxStateFor4377RespMsg);
};
#define TX_P_(name, pcode) \
@ -373,6 +375,9 @@ public:
virtual int post_msg(const share::ObLSID &p, ObTxMsg &msg) = 0;
virtual int post_msg(const ObAddr &server, const ObTxFreeRouteMsg &m) = 0;
virtual int sync_access(const ObAddr &server, const ObTxFreeRoutePushState &m, ObTxFreeRoutePushStateResp &result) = 0;
virtual int ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg,
ObAskTxStateFor4377RespMsg &resp) = 0;
};
/*
@ -409,7 +414,9 @@ public:
int post_msg(const share::ObLSID &p, ObTxMsg &msg);
int post_msg(const ObAddr &server, const ObTxFreeRouteMsg &m);
int sync_access(const ObAddr &server, const ObTxFreeRoutePushState &m, ObTxFreeRoutePushStateResp &result);
private:
int ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg,
ObAskTxStateFor4377RespMsg &resp);
private:
int post_(const ObAddr &server, ObTxMsg &msg);
int post_commit_msg_(const ObAddr &server, ObTxMsg &msg);
int post_sub_request_msg_(const ObAddr &server, ObTxMsg &msg);
@ -450,7 +457,11 @@ private:
int64_t last_stat_ts_ CACHE_ALIGNED;
};
class ObAskTxStateFor4377P : public obrpc::ObRpcProcessor< obrpc::ObTransRpcProxy::ObRpc<obrpc::OB_ASK_TX_STATE_FOR_4377> >
{
protected:
int process();
};
} // transaction

View File

@ -3534,5 +3534,73 @@ bool ObTransService::is_ls_dropped_(const share::ObLSID ls_id) {
}
return bret;
}
int ObTransService::ask_tx_state_for_4377(const ObLSID ls_id,
const ObTransID tx_id,
bool &is_alive)
{
int ret = OB_SUCCESS;
static const int64_t MAX_ALLOWED_ASK_STATE_FOR_4377_TIMES = 10 * 1000 * 1000; //10s
static const int64_t SLEEP_DURATION_FOR_ASK_STATE_FOR_4377 = 100 * 1000; //100ms
const int64_t start_ts = ObTimeUtility::current_time();
ObAskTxStateFor4377Msg msg;
ObAskTxStateFor4377RespMsg resp;
msg.tx_id_ = tx_id;
msg.ls_id_ = ls_id;
do {
if (OB_FAIL(rpc_->ask_tx_state_for_4377(msg, resp))) {
TRANS_LOG(WARN, "ask tx state for 4377 failed", K(ret));
if (OB_LS_IS_DELETED == ret) {
is_alive = false;
ret = OB_SUCCESS;
TRANS_LOG(WARN, "ls is deleted during ask tx state", K(ret), K(msg));
}
} else if (OB_FAIL(resp.ret_)) {
ret = resp.ret_;
} else {
is_alive = resp.is_alive_;
}
if (OB_FAIL(ret)) {
usleep(SLEEP_DURATION_FOR_ASK_STATE_FOR_4377);
}
if (OB_FAIL(ret) && ObTimeUtility::current_time() - start_ts >= MAX_ALLOWED_ASK_STATE_FOR_4377_TIMES) {
TRANS_LOG(WARN, "timeout for 4377 check", K(ret), K(ls_id), K(tx_id), K(start_ts));
ret = OB_TIMEOUT;
}
} while (OB_FAIL(ret) && OB_TIMEOUT != ret);
TRANS_LOG(INFO, "tx state check for 4377 finished", K(ls_id), K(tx_id), K(ret), K(is_alive));
return ret;
}
int ObTransService::handle_ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg,
bool &is_alive)
{
int ret = OB_SUCCESS;
ObPartTransCtx *ctx = NULL;
is_alive = false;
if (OB_FAIL(get_tx_ctx_(msg.ls_id_, msg.tx_id_, ctx))) {
TRANS_LOG(WARN, "fail to get tx context", K(ret), K(msg));
if (OB_TRANS_CTX_NOT_EXIST == ret || OB_PARTITION_NOT_EXIST == ret) {
ret = OB_SUCCESS;
is_alive = false;
TRANS_LOG(WARN, "tx state is not exist for 4377", K(ret), K(msg));
}
} else if (OB_FAIL(ctx->handle_ask_tx_state_for_4377(is_alive))) {
TRANS_LOG(WARN, "fail to handle trans ask state resp", K(ret), K(msg));
}
if (OB_NOT_NULL(ctx)) {
revert_tx_ctx_(ctx);
}
TRANS_LOG(INFO, "handle ask tx state for 4377", K(ret), K(msg), K(is_alive));
return ret;
}
} // transaction
} // ocenabase

View File

@ -177,6 +177,14 @@ int check_for_standby(const share::ObLSID &ls_id,
void register_standby_cleanup_task();
int do_standby_cleanup();
void handle_defer_abort(ObTxDesc &tx);
// tx state check for 4377
int ask_tx_state_for_4377(const ObLSID ls_id,
const ObTransID tx_id,
bool &is_alive);
int handle_ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg,
bool &is_alive);
TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(this));
private:

View File

@ -488,5 +488,23 @@ bool ObCollectStateRespMsg::is_valid() const
return ret;
}
OB_SERIALIZE_MEMBER(ObAskTxStateFor4377Msg, tx_id_, ls_id_);
OB_SERIALIZE_MEMBER(ObAskTxStateFor4377RespMsg, is_alive_, ret_);
bool ObAskTxStateFor4377Msg::is_valid() const
{
bool ret = false;
if (tx_id_.is_valid()
&& ls_id_.is_valid()) {
ret = true;
}
return ret;
}
bool ObAskTxStateFor4377RespMsg::is_valid() const
{
return true;
}
} // transaction
} // oceanbase

View File

@ -546,6 +546,35 @@ namespace transaction
|| (TX_2PC_PREPARE_REDO_REQ <= msg_type && TX_2PC_PREPARE_VERSION_RESP >= msg_type);
}
};
struct ObAskTxStateFor4377Msg
{
public:
ObAskTxStateFor4377Msg() :
tx_id_(),
ls_id_() {}
public:
ObTransID tx_id_;
share::ObLSID ls_id_;
bool is_valid() const;
TO_STRING_KV(K_(tx_id), K_(ls_id));
OB_UNIS_VERSION(1);
};
struct ObAskTxStateFor4377RespMsg
{
public:
ObAskTxStateFor4377RespMsg() :
is_alive_(false),
ret_(OB_SUCCESS) {}
public:
bool is_alive_;
int ret_;
bool is_valid() const;
TO_STRING_KV(K_(is_alive), K_(ret));
OB_UNIS_VERSION(1);
};
}
}

View File

@ -162,6 +162,11 @@ public:
OZ(result.deserialize(resp.ptr(), resp.length(), pos));
return ret;
}
int ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg,
ObAskTxStateFor4377RespMsg &resp)
{
return OB_SUCCESS;
}
template<class MSG_RESULT_T>
int send_msg_callback(const ObAddr &recv,
const ObTxMsg &msg,