[CP] [4.1] fix remote plan execute timeout serval bugs
This commit is contained in:
1
deps/oblib/src/lib/worker.h
vendored
1
deps/oblib/src/lib/worker.h
vendored
@ -42,6 +42,7 @@ public:
|
||||
virtual bool can_retry() const { return false; }
|
||||
// Set retry flag so that scheduler will reprocess this request then
|
||||
virtual void set_need_retry() {}
|
||||
virtual void unset_need_retry() {}
|
||||
// It's used to check whether query need retry. Whenever worker has
|
||||
// observed this query need retry, it should stop processing this
|
||||
// query immediately.
|
||||
|
@ -1100,7 +1100,24 @@ void ObQueryRetryCtrl::test_and_save_retry_state(const ObGlobalContext &gctx,
|
||||
if (RETRY_TYPE_NONE != retry_type_) {
|
||||
retry_err_code_ = client_ret;
|
||||
}
|
||||
if (RETRY_TYPE_NONE != retry_type_) {
|
||||
result.set_close_fail_callback([this](const int err)-> void { this->on_close_resultset_fail_(err); });
|
||||
}
|
||||
}
|
||||
|
||||
void ObQueryRetryCtrl::on_close_resultset_fail_(const int err)
|
||||
{
|
||||
// some unretryable error happened in close result set phase
|
||||
if (OB_SUCCESS != err && RETRY_TYPE_NONE != retry_type_) {
|
||||
// the txn relative error in close stmt
|
||||
if (OB_TRANS_NEED_ROLLBACK == err ||
|
||||
OB_TRANS_INVALID_STATE == err ||
|
||||
OB_TRANS_HAS_DECIDED == err) {
|
||||
retry_type_ = RETRY_TYPE_NONE;
|
||||
// also clear the packet retry
|
||||
THIS_WORKER.unset_need_retry();
|
||||
}
|
||||
}
|
||||
}
|
||||
}/* ns observer*/
|
||||
}/* ns oceanbase */
|
||||
|
@ -310,6 +310,7 @@ private:
|
||||
static void inner_location_error_proc(ObRetryParam &v);
|
||||
static void inner_location_error_nothing_readable_proc(ObRetryParam &v);
|
||||
static void inner_peer_server_status_uncertain_proc(ObRetryParam &v);
|
||||
void on_close_resultset_fail_(const int err);
|
||||
|
||||
/* variables */
|
||||
// map_ is used to fast lookup the error code retry processor
|
||||
|
@ -57,6 +57,10 @@ public:
|
||||
virtual bool can_retry() const override { return can_retry_; }
|
||||
// Note: you CAN NOT call set_need_retry when can_retry_ == false
|
||||
virtual void set_need_retry() override { need_retry_ = true; }
|
||||
// THIS is _only_ used (for easy impl) in query_retry_ctrl decide to retry
|
||||
// but following process want to invalid the decision.
|
||||
// refer `ObQueryRetryCtrl::on_close_resulet_fail_`
|
||||
virtual void unset_need_retry() override { need_retry_ = false; }
|
||||
virtual bool need_retry() const override { return need_retry_; }
|
||||
virtual void resume() override;
|
||||
|
||||
|
@ -468,6 +468,22 @@ bool ObDASCtx::has_same_lsid(ObLSID *lsid)
|
||||
return bret;
|
||||
}
|
||||
|
||||
int ObDASCtx::get_all_lsid(share::ObLSArray &ls_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
FOREACH_X(table_node, table_locs_, OB_SUCC(ret)) {
|
||||
ObDASTableLoc *table_loc = *table_node;
|
||||
for (DASTabletLocListIter tablet_node = table_loc->tablet_locs_begin();
|
||||
OB_SUCC(ret) && tablet_node != table_loc->tablet_locs_end(); ++tablet_node) {
|
||||
ObDASTabletLoc *tablet_loc = *tablet_node;
|
||||
if (!is_contain(ls_ids, tablet_loc->ls_id_)) {
|
||||
ret = ls_ids.push_back(tablet_loc->ls_id_);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObDASCtx::get_related_tablet_cnt() const
|
||||
{
|
||||
int64_t total_cnt = 0;
|
||||
|
@ -86,6 +86,7 @@ public:
|
||||
ObDASTabletMapper &tablet_mapper,
|
||||
const DASTableIDArrayWrap *related_table_ids = nullptr);
|
||||
bool has_same_lsid(share::ObLSID *lsid);
|
||||
int get_all_lsid(share::ObLSArray &ls_ids);
|
||||
int64_t get_related_tablet_cnt() const;
|
||||
void set_snapshot(const transaction::ObTxReadSnapshot &snapshot) { snapshot_ = snapshot; }
|
||||
transaction::ObTxReadSnapshot &get_snapshot() { return snapshot_; }
|
||||
|
@ -253,7 +253,8 @@ int ObRemoteScheduler::execute_with_sql(ObExecContext &ctx, ObPhysicalPlan *phy_
|
||||
session,
|
||||
has_sent_task,
|
||||
has_transfer_err,
|
||||
phy_plan);
|
||||
phy_plan,
|
||||
ctx);
|
||||
NG_TRACE_EXT(remote_task_completed, OB_ID(ret), ret,
|
||||
OB_ID(runner_svr), task.get_runner_svr(), OB_ID(task), task);
|
||||
// 说明:本函数返回后,最终控制权会进入到ObDirectReceive中,
|
||||
|
@ -102,7 +102,8 @@ int ObRemoteTaskExecutor::execute(ObExecContext &query_ctx, ObJob *job, ObTaskIn
|
||||
session,
|
||||
has_sent_task,
|
||||
has_transfer_err,
|
||||
plan_ctx->get_phy_plan());
|
||||
plan_ctx->get_phy_plan(),
|
||||
query_ctx);
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -177,7 +178,8 @@ int ObRemoteTaskExecutor::handle_tx_after_rpc(ObScanner *scanner,
|
||||
ObSQLSessionInfo *session,
|
||||
const bool has_sent_task,
|
||||
const bool has_transfer_err,
|
||||
const ObPhysicalPlan *phy_plan)
|
||||
const ObPhysicalPlan *phy_plan,
|
||||
ObExecContext &exec_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
auto tx_desc = session->get_tx_desc();
|
||||
@ -204,17 +206,26 @@ int ObRemoteTaskExecutor::handle_tx_after_rpc(ObScanner *scanner,
|
||||
K(tx_desc));
|
||||
}
|
||||
if (has_transfer_err || OB_FAIL(ret)) {
|
||||
// report Unknown of tx participant involved
|
||||
LOG_WARN("remote execute fail with transfer_error, tx will rollback");
|
||||
session->get_trans_result().set_incomplete();
|
||||
// TODO: yunxing.cyx
|
||||
// get the remote LSID and report to transaction let tx continue
|
||||
/* share::ObLSID ls_id = xxx;
|
||||
if (OB_FAIL(MTL(transaction::ObTransService*)
|
||||
->add_unknown_tx_exec_part(*tx_desc,
|
||||
ls_id))) {
|
||||
LOG_WARN("report tx unknown part fail", K(ret), K(ls_id));
|
||||
} */
|
||||
if (exec_ctx.use_remote_sql()) {
|
||||
LOG_WARN("remote execute use sql fail with transfer_error, tx will rollback", K(ret));
|
||||
session->get_trans_result().set_incomplete();
|
||||
} else {
|
||||
ObDASCtx &das_ctx = DAS_CTX(exec_ctx);
|
||||
share::ObLSArray ls_ids;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(das_ctx.get_all_lsid(ls_ids))) {
|
||||
LOG_WARN("get all ls_ids failed", K(tmp_ret));
|
||||
} else if (OB_TMP_FAIL(session->get_trans_result().add_touched_ls(ls_ids))) {
|
||||
LOG_WARN("add touched ls to txn failed", K(tmp_ret));
|
||||
} else {
|
||||
LOG_INFO("add touched ls succ", K(ls_ids));
|
||||
}
|
||||
if (OB_TMP_FAIL(tmp_ret)) {
|
||||
LOG_WARN("remote execute use plan fail with transfer_error and try add touched ls failed, tx will rollback", K(tmp_ret));
|
||||
session->get_trans_result().set_incomplete();
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -34,7 +34,8 @@ public:
|
||||
ObSQLSessionInfo *session,
|
||||
const bool has_sent_task,
|
||||
const bool has_transfer_err,
|
||||
const ObPhysicalPlan *phy_plan);
|
||||
const ObPhysicalPlan *phy_plan,
|
||||
ObExecContext &exec_ctx);
|
||||
private:
|
||||
int build_task(ObExecContext &query_ctx,
|
||||
ObJob &job,
|
||||
|
@ -893,6 +893,11 @@ int ObResultSet::close()
|
||||
ret = OB_SUCCESS == ret ? tmp_ret : ret;
|
||||
}
|
||||
}
|
||||
// notify close fail to listener
|
||||
int err = COVER_SUCC(do_close_plan_ret);
|
||||
if (OB_SUCCESS != err && close_fail_cb_.is_valid()) {
|
||||
close_fail_cb_(err);
|
||||
}
|
||||
//NG_TRACE_EXT(result_set_close, OB_ID(ret), ret, OB_ID(arg1), prev_ret,
|
||||
//OB_ID(arg2), ins_ret, OB_ID(arg3), errcode_, OB_ID(async), async);
|
||||
return ret; // 后面所有的操作都通过callback来完成
|
||||
|
@ -321,6 +321,7 @@ public:
|
||||
static void replace_lob_type(const ObSQLSessionInfo &session,
|
||||
const ObField &field,
|
||||
obmysql::ObMySQLField &mfield);
|
||||
void set_close_fail_callback(ObFunction<void(const int)> func) { close_fail_cb_ = func; }
|
||||
private:
|
||||
// types and constants
|
||||
static const int64_t TRANSACTION_SET_VIOLATION_MAX_RETRY = 3;
|
||||
@ -426,6 +427,7 @@ private:
|
||||
common::ObString ps_sql_; // for sql in pl
|
||||
bool is_init_;
|
||||
common::ParamStore ps_params_; // 文本 ps params 记录,用于填入 sql_audit
|
||||
common::ObFunction<void(const int)> close_fail_cb_;
|
||||
};
|
||||
|
||||
|
||||
|
@ -399,6 +399,7 @@ protected:
|
||||
bool PARTS_INCOMPLETE_: 1; // participants set incomplete (must abort)
|
||||
bool PART_EPOCH_MISMATCH_: 1; // participant's born epoch mismatched
|
||||
bool WITH_TEMP_TABLE_: 1; // with txn level temporary table
|
||||
bool DEFER_ABORT_: 1; // need do abort in txn start node
|
||||
};
|
||||
void switch_to_idle_();
|
||||
FLAG update_with(const FLAG &flag);
|
||||
|
@ -559,6 +559,16 @@ int ObTransService::handle_tx_commit_result_(ObTxDesc &tx,
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTransService::abort_tx__(ObTxDesc &tx, const bool cleanup)
|
||||
{
|
||||
abort_participants_(tx);
|
||||
if (!cleanup) {
|
||||
invalid_registered_snapshot_(tx);
|
||||
} else {
|
||||
tx_post_terminate_(tx);
|
||||
}
|
||||
}
|
||||
|
||||
int ObTransService::abort_tx_(ObTxDesc &tx, const int cause, const bool cleanup)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -573,18 +583,29 @@ int ObTransService::abort_tx_(ObTxDesc &tx, const int cause, const bool cleanup)
|
||||
}
|
||||
tx.state_ = ObTxDesc::State::IN_TERMINATE;
|
||||
tx.abort_cause_ = cause;
|
||||
abort_participants_(tx);
|
||||
tx.state_ = ObTxDesc::State::ABORTED;
|
||||
if (!cleanup) {
|
||||
invalid_registered_snapshot_(tx);
|
||||
// promise the abort request always send from scheduler
|
||||
if (tx.addr_ == self_ || tx.xa_start_addr_ == self_) {
|
||||
abort_tx__(tx, cleanup);
|
||||
} else {
|
||||
tx_post_terminate_(tx);
|
||||
tx.flags_.DEFER_ABORT_ = true;
|
||||
}
|
||||
tx.state_ = ObTxDesc::State::ABORTED;
|
||||
}
|
||||
TRANS_LOG(INFO, "abort tx", K(ret), K(*this), K(tx), K(cause));
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTransService::handle_defer_abort(ObTxDesc &tx)
|
||||
{
|
||||
ObSpinLockGuard guard(tx.lock_);
|
||||
if (tx.addr_ == self_ || tx.xa_start_addr_ == self_) {
|
||||
if (tx.flags_.DEFER_ABORT_) {
|
||||
abort_tx__(tx, true);
|
||||
tx.flags_.DEFER_ABORT_ = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ObTransService::invalid_registered_snapshot_(ObTxDesc &tx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -172,7 +172,7 @@ int check_for_standby(const share::ObLSID &ls_id,
|
||||
bool &is_determined_state);
|
||||
void register_standby_cleanup_task();
|
||||
int do_standby_cleanup();
|
||||
|
||||
void handle_defer_abort(ObTxDesc &tx);
|
||||
TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(this));
|
||||
|
||||
private:
|
||||
@ -180,6 +180,7 @@ int check_ls_status_(const share::ObLSID &ls_id, bool &leader);
|
||||
void init_tx_(ObTxDesc &tx, const uint32_t session_id);
|
||||
int start_tx_(ObTxDesc &tx);
|
||||
int abort_tx_(ObTxDesc &tx, const int cause, bool cleanup = true);
|
||||
void abort_tx__(ObTxDesc &tx, bool cleanup);
|
||||
int finalize_tx_(ObTxDesc &tx);
|
||||
int find_parts_after_sp_(ObTxDesc &tx,
|
||||
ObTxPartRefList &parts,
|
||||
|
@ -50,6 +50,13 @@ void ObTxnFreeRouteCtx::init_before_handle_request(ObTxDesc *tx)
|
||||
in_txn_before_handle_request_ = false;
|
||||
audit_record_.proxy_flag_ = is_proxy_support_;
|
||||
if (OB_NOT_NULL(tx)) {
|
||||
if (tx->flags_.DEFER_ABORT_) {
|
||||
auto txs = MTL_WITH_CHECK_TENANT(ObTransService*, tx->tenant_id_);
|
||||
if (OB_ISNULL(txs)) {
|
||||
int ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "[tx free route] MTL(txs) is null", K(ret), K(tx->tenant_id_));
|
||||
} else { txs->handle_defer_abort(*tx); }
|
||||
}
|
||||
ObSpinLockGuard guard(tx->lock_);
|
||||
in_txn_before_handle_request_ = tx->in_tx_for_free_route_();
|
||||
txn_addr_ = TX_START_OR_RESUME_ADDR(tx);
|
||||
|
Reference in New Issue
Block a user