From f93ef2fe39b7a2cf74eef87fc7afca09ccdfbdd3 Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 13 Dec 2022 04:37:59 +0000 Subject: [PATCH] fix 4377 caused by px retry --- src/sql/engine/px/ob_dfo.h | 2 +- src/sql/engine/px/ob_dfo_scheduler.cpp | 15 +++++++++++--- src/sql/engine/px/ob_px_coord_op.cpp | 21 ++++++++++++-------- src/sql/engine/px/ob_px_coord_op.h | 3 +-- src/sql/engine/px/ob_px_rpc_processor.cpp | 22 +++++++++------------ src/sql/engine/px/ob_px_rpc_processor.h | 12 +++++------ src/sql/engine/px/ob_px_sqc_async_proxy.cpp | 9 +-------- src/sql/engine/px/ob_px_util.cpp | 2 +- src/sql/ob_result_set.cpp | 1 + src/sql/ob_sql.cpp | 1 - 10 files changed, 44 insertions(+), 44 deletions(-) diff --git a/src/sql/engine/px/ob_dfo.h b/src/sql/engine/px/ob_dfo.h index 9916412411..17d1308305 100644 --- a/src/sql/engine/px/ob_dfo.h +++ b/src/sql/engine/px/ob_dfo.h @@ -227,7 +227,7 @@ public: int64_t get_px_sequence_id() { return px_sequence_id_; } void set_ignore_vtable_error(bool flag) { ignore_vtable_error_ = flag; } bool is_ignore_vtable_error() { return ignore_vtable_error_; } - void set_server_not_alive() { server_not_alive_ = true; } + void set_server_not_alive(bool not_alive) { server_not_alive_ = not_alive; } bool is_server_not_alive() { return server_not_alive_; } ObPxTransmitDataChannelMsg &get_transmit_channel_msg() { return transmit_channel_; } ObPxReceiveDataChannelMsg &get_receive_channel_msg() { return receive_channel_; } diff --git a/src/sql/engine/px/ob_dfo_scheduler.cpp b/src/sql/engine/px/ob_dfo_scheduler.cpp index 26684d3e8f..dcecc7e977 100644 --- a/src/sql/engine/px/ob_dfo_scheduler.cpp +++ b/src/sql/engine/px/ob_dfo_scheduler.cpp @@ -492,7 +492,7 @@ int ObSerialDfoScheduler::dispatch_sqcs(ObExecContext &exec_ctx, LOG_WARN("no memory", K(ret)); } } - bool ignore_vtable_error = true; + bool ignore_vtable_error = dfo.is_ignore_vtable_error(); if (OB_SUCC(ret)) { ObDfo *child_dfo = nullptr; for (int i = 0; i < dfo.get_child_count() && OB_SUCC(ret); ++i) { @@ -561,7 +561,7 @@ int ObSerialDfoScheduler::dispatch_sqcs(ObExecContext &exec_ctx, .fast_init_sqc(args, &sqc_cb))) { LOG_WARN("fail to init sqc", K(ret), K(sqc)); sqc.set_need_report(false); - sqc.set_server_not_alive(); + sqc.set_server_not_alive(true); } } } @@ -1139,11 +1139,20 @@ int ObParallelDfoScheduler::dispatch_sqc(ObExecContext &exec_ctx, const ObArray &callbacks = proxy.get_callbacks(); for (int i = 0; i < callbacks.count(); ++i) { cb = callbacks.at(i); + ObPxSqcMeta &sqc = *sqcs.at(i); if (OB_NOT_NULL(cb) && cb->is_processed() && OB_SUCCESS == cb->get_ret_code().rcode_ && OB_SUCCESS == cb->get_result().rc_) { - ObPxSqcMeta &sqc = *sqcs.at(i); sqc.set_need_report(true); + } else if (!cb->is_processed()) { + // if init_sqc_msg is not processed and the msg may be sent successfully, set server not alive. + // then when qc waiting_all_dfo_exit, it will push sqc.access_table_locations into trans_result, + // and the query can be retried. + bool msg_not_send_out = (cb->get_error() == EASY_TIMEOUT_NOT_SENT_OUT + || cb->get_error() == EASY_DISCONNECT_NOT_SENT_OUT); + if (!msg_not_send_out) { + sqc.set_server_not_alive(true); + } } } } else { diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index 5b1fe70255..d372fcdf2a 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -687,16 +687,15 @@ int ObPxCoordOp::wait_all_running_dfos_exit() while (OB_SUCC(ret) && wait_msg) { ObDtlChannelLoop &loop = msg_loop_; timeout_us = phy_plan_ctx->get_timeout_timestamp() - get_timestamp(); - bool server_all_alive = true; /** * 开始收下一个消息。 */ if (OB_FAIL(check_all_sqc(active_dfos, times_offset, all_dfo_terminate, - last_timestamp, server_all_alive))) { + last_timestamp))) { LOG_WARN("fail to check sqc"); } else if (all_dfo_terminate) { wait_msg = false; - collect_trans_result_ok = server_all_alive; + collect_trans_result_ok = true; LOG_TRACE("all dfo has been terminate", K(ret)); break; } else if (OB_FAIL(ctx_.fast_check_status_ignore_interrupt())) { @@ -750,12 +749,13 @@ int ObPxCoordOp::wait_all_running_dfos_exit() && OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER != coord_info_.first_error_code_) { ret = coord_info_.first_error_code_; } - if (!collect_trans_result_ok && OB_FAIL(ret)) { + if (!collect_trans_result_ok) { ObSQLSessionInfo *session = ctx_.get_my_session(); session->get_trans_result().set_incomplete(); LOG_WARN("collect trans_result fail", K(ret), "session_id", session->get_sessid(), "trans_result", session->get_trans_result()); + } return ret; } @@ -763,12 +763,10 @@ int ObPxCoordOp::wait_all_running_dfos_exit() int ObPxCoordOp::check_all_sqc(ObIArray &active_dfos, int64_t ×_offset, bool &all_dfo_terminate, - int64_t &last_timestamp, - bool &server_all_alive) + int64_t &last_timestamp) { int ret = OB_SUCCESS; all_dfo_terminate = true; - server_all_alive = true; for (int64_t i = 0; i < active_dfos.count() && all_dfo_terminate && OB_SUCC(ret); ++i) { ObArray sqcs; if (OB_FAIL(active_dfos.at(i)->get_sqcs(sqcs))) { @@ -794,7 +792,14 @@ int ObPxCoordOp::check_all_sqc(ObIArray &active_dfos, all_dfo_terminate = false; break; } else if (sqc->is_server_not_alive()) { - server_all_alive = false; + sqc->set_server_not_alive(false); + const DASTabletLocIArray &access_locations = sqc->get_access_table_locations(); + for (int64_t i = 0; i < access_locations.count() && OB_SUCC(ret); i++) { + if (OB_FAIL(ctx_.get_my_session()->get_trans_result().add_touched_ls(access_locations.at(i)->ls_id_))) { + LOG_WARN("add touched ls failed", K(ret)); + } + } + LOG_WARN("server not alive", K(access_locations), K(sqc->get_access_table_location_keys())); } } } diff --git a/src/sql/engine/px/ob_px_coord_op.h b/src/sql/engine/px/ob_px_coord_op.h index d79b9375fb..70ba62f8b1 100644 --- a/src/sql/engine/px/ob_px_coord_op.h +++ b/src/sql/engine/px/ob_px_coord_op.h @@ -118,8 +118,7 @@ protected: int check_all_sqc(common::ObIArray &active_dfos, int64_t &time_offset, bool &all_dfo_terminate, - int64_t &cur_timestamp, - bool &server_all_alive); + int64_t &cur_timestamp); int calc_admited_worker_count(int64_t px_expected, int64_t query_expected, diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index 0c751bb2cf..f9c8731774 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -262,8 +262,8 @@ void ObFastInitSqcReportQCMessageCall::operator()(hash::HashMapPairset_need_report(false); - if (!sqc_alive_) { - sqc_->set_server_not_alive(); + if (need_set_not_alive_) { + sqc_->set_server_not_alive(true); } } } @@ -476,10 +476,7 @@ int ObFastInitSqcCB::deal_with_rpc_timeout_err_safely() { int ret = OB_SUCCESS; - // only if it's sure init_sqc msg is not sent to sqc successfully, we can retry the query. - bool init_sqc_not_send_out = (get_error() == EASY_TIMEOUT_NOT_SENT_OUT - || get_error() == EASY_DISCONNECT_NOT_SENT_OUT); - ObDealWithRpcTimeoutCall call(addr_, retry_info_, timeout_ts_, trace_id_, init_sqc_not_send_out); + ObDealWithRpcTimeoutCall call(addr_, retry_info_, timeout_ts_, trace_id_); call.ret_ = OB_TIMEOUT; ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance(); if (OB_NOT_NULL(manager)) { @@ -495,7 +492,11 @@ void ObFastInitSqcCB::interrupt_qc(int err, bool is_timeout) int ret = OB_SUCCESS; ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance(); if (OB_NOT_NULL(manager)) { - ObFastInitSqcReportQCMessageCall call(sqc_, err, timeout_ts_, !is_timeout); + // if we are sure init_sqc msg is not sent to sqc successfully, we don't have to set sqc not alive. + bool init_sqc_not_send_out = (get_error() == EASY_TIMEOUT_NOT_SENT_OUT + || get_error() == EASY_DISCONNECT_NOT_SENT_OUT); + const bool need_set_not_alive = is_timeout && !init_sqc_not_send_out; + ObFastInitSqcReportQCMessageCall call(sqc_, err, timeout_ts_, need_set_not_alive); if (OB_FAIL(manager->get_map().atomic_refactored(interrupt_id_, call))) { LOG_WARN("fail to set need report", K(interrupt_id_)); } else if (!call.need_interrupt_) { @@ -532,12 +533,7 @@ void ObDealWithRpcTimeoutCall::deal_with_rpc_timeout_err() LOG_WARN("fail to add invalid server distinctly", K_(trace_id), K(a_ret), K_(addr)); } } - if (can_retry_) { - // return OB_RPC_CONNECT_ERROR to retry. - ret_ = OB_RPC_CONNECT_ERROR; - } else { - ret_ = OB_PACKET_STATUS_UNKNOWN; - } + ret_ = OB_RPC_CONNECT_ERROR; } else { LOG_DEBUG("rpc return OB_TIMEOUT, and it is actually timeout, " "do not change error code", K(ret_), diff --git a/src/sql/engine/px/ob_px_rpc_processor.h b/src/sql/engine/px/ob_px_rpc_processor.h index 25d1df8713..cc57aaf417 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.h +++ b/src/sql/engine/px/ob_px_rpc_processor.h @@ -96,9 +96,9 @@ public: ObFastInitSqcReportQCMessageCall(ObPxSqcMeta *sqc, int err, int64_t timeout_ts, - bool sqc_alive) : sqc_(sqc), err_(err), + bool need_set_not_alive) : sqc_(sqc), err_(err), need_interrupt_(false), timeout_ts_(timeout_ts), - sqc_alive_(sqc_alive) + need_set_not_alive_(need_set_not_alive) { need_interrupt_ = true; } @@ -111,7 +111,7 @@ public: int err_; bool need_interrupt_; int64_t timeout_ts_; - bool sqc_alive_; + bool need_set_not_alive_; }; class ObDealWithRpcTimeoutCall @@ -120,9 +120,8 @@ public: ObDealWithRpcTimeoutCall(common::ObAddr addr, ObQueryRetryInfo *retry_info, int64_t timeout_ts, - common::ObCurTraceId::TraceId &trace_id, - bool retry) : addr_(addr), retry_info_(retry_info), - timeout_ts_(timeout_ts), trace_id_(trace_id), ret_(common::OB_TIMEOUT), can_retry_(retry) {} + common::ObCurTraceId::TraceId &trace_id) : addr_(addr), retry_info_(retry_info), + timeout_ts_(timeout_ts), trace_id_(trace_id), ret_(common::OB_TIMEOUT) {} ~ObDealWithRpcTimeoutCall() = default; void operator() (hash::HashMapPair &entry); @@ -133,7 +132,6 @@ public: int64_t timeout_ts_; common::ObCurTraceId::TraceId trace_id_; int ret_; - bool can_retry_; }; class ObFastInitSqcCB diff --git a/src/sql/engine/px/ob_px_sqc_async_proxy.cpp b/src/sql/engine/px/ob_px_sqc_async_proxy.cpp index 64b5a2dcb3..f9fd566871 100644 --- a/src/sql/engine/px/ob_px_sqc_async_proxy.cpp +++ b/src/sql/engine/px/ob_px_sqc_async_proxy.cpp @@ -185,14 +185,7 @@ int ObPxSqcAsyncProxy::wait_all() { if (phy_plan_ctx_->get_timeout_timestamp() - ObTimeUtility::current_time() > 0) { error_index_ = idx; - bool init_sqc_not_send_out = (callback.get_error() == EASY_TIMEOUT_NOT_SENT_OUT - || callback.get_error() == EASY_DISCONNECT_NOT_SENT_OUT); - if (init_sqc_not_send_out) { - // only if it's sure init_sqc msg is not sent to sqc successfully, return OB_RPC_CONNECT_ERROR to retry. - ret = OB_RPC_CONNECT_ERROR; - } else { - ret = OB_PACKET_STATUS_UNKNOWN; - } + ret = OB_RPC_CONNECT_ERROR; } else { ret = OB_TIMEOUT; } diff --git a/src/sql/engine/px/ob_px_util.cpp b/src/sql/engine/px/ob_px_util.cpp index ef42ad3274..3539bf34cb 100644 --- a/src/sql/engine/px/ob_px_util.cpp +++ b/src/sql/engine/px/ob_px_util.cpp @@ -3506,7 +3506,7 @@ int ObExtraServerAliveCheck::do_check() const query_start_time_))) { sqcs.at(j).set_need_report(false); sqcs.at(j).set_thread_finish(true); - sqcs.at(j).set_server_not_alive(); + sqcs.at(j).set_server_not_alive(true); if (!sqcs.at(j).is_ignore_vtable_error()) { ret = OB_RPC_CONNECT_ERROR; LOG_WARN("server not in communication, maybe crashed.", K(ret), diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index b6ba234be9..e06b51b991 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -140,6 +140,7 @@ OB_INLINE int ObResultSet::open_plan() int ObResultSet::open() { int ret = OB_SUCCESS; + my_session_.set_process_query_time(ObTimeUtility::current_time()); LinkExecCtxGuard link_guard(my_session_, get_exec_context()); if (lib::is_oracle_mode() && get_exec_context().get_nested_level() >= OB_MAX_RECURSIVE_SQL_LEVELS) { diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index 82737f03dc..1399b390a3 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -3722,7 +3722,6 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt, pc_ctx.neg_param_index_.reset(); bool plan_added = false; bool need_get_baseline = false; - session.set_process_query_time(ObTimeUtility::current_time()); LOG_DEBUG("gen plan info", K(context.spm_ctx_.bl_key_), K(get_plan_err)); // for batched multi stmt, we only parse and optimize the first statement // only in multi_query, need do this