fix 4377 caused by px retry
This commit is contained in:
@ -227,7 +227,7 @@ public:
|
||||
int64_t get_px_sequence_id() { return px_sequence_id_; }
|
||||
void set_ignore_vtable_error(bool flag) { ignore_vtable_error_ = flag; }
|
||||
bool is_ignore_vtable_error() { return ignore_vtable_error_; }
|
||||
void set_server_not_alive() { server_not_alive_ = true; }
|
||||
void set_server_not_alive(bool not_alive) { server_not_alive_ = not_alive; }
|
||||
bool is_server_not_alive() { return server_not_alive_; }
|
||||
ObPxTransmitDataChannelMsg &get_transmit_channel_msg() { return transmit_channel_; }
|
||||
ObPxReceiveDataChannelMsg &get_receive_channel_msg() { return receive_channel_; }
|
||||
|
||||
@ -492,7 +492,7 @@ int ObSerialDfoScheduler::dispatch_sqcs(ObExecContext &exec_ctx,
|
||||
LOG_WARN("no memory", K(ret));
|
||||
}
|
||||
}
|
||||
bool ignore_vtable_error = true;
|
||||
bool ignore_vtable_error = dfo.is_ignore_vtable_error();
|
||||
if (OB_SUCC(ret)) {
|
||||
ObDfo *child_dfo = nullptr;
|
||||
for (int i = 0; i < dfo.get_child_count() && OB_SUCC(ret); ++i) {
|
||||
@ -561,7 +561,7 @@ int ObSerialDfoScheduler::dispatch_sqcs(ObExecContext &exec_ctx,
|
||||
.fast_init_sqc(args, &sqc_cb))) {
|
||||
LOG_WARN("fail to init sqc", K(ret), K(sqc));
|
||||
sqc.set_need_report(false);
|
||||
sqc.set_server_not_alive();
|
||||
sqc.set_server_not_alive(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1139,11 +1139,20 @@ int ObParallelDfoScheduler::dispatch_sqc(ObExecContext &exec_ctx,
|
||||
const ObArray<ObSqcAsyncCB *> &callbacks = proxy.get_callbacks();
|
||||
for (int i = 0; i < callbacks.count(); ++i) {
|
||||
cb = callbacks.at(i);
|
||||
ObPxSqcMeta &sqc = *sqcs.at(i);
|
||||
if (OB_NOT_NULL(cb) && cb->is_processed() &&
|
||||
OB_SUCCESS == cb->get_ret_code().rcode_ &&
|
||||
OB_SUCCESS == cb->get_result().rc_) {
|
||||
ObPxSqcMeta &sqc = *sqcs.at(i);
|
||||
sqc.set_need_report(true);
|
||||
} else if (!cb->is_processed()) {
|
||||
// if init_sqc_msg is not processed and the msg may be sent successfully, set server not alive.
|
||||
// then when qc waiting_all_dfo_exit, it will push sqc.access_table_locations into trans_result,
|
||||
// and the query can be retried.
|
||||
bool msg_not_send_out = (cb->get_error() == EASY_TIMEOUT_NOT_SENT_OUT
|
||||
|| cb->get_error() == EASY_DISCONNECT_NOT_SENT_OUT);
|
||||
if (!msg_not_send_out) {
|
||||
sqc.set_server_not_alive(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -687,16 +687,15 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
|
||||
while (OB_SUCC(ret) && wait_msg) {
|
||||
ObDtlChannelLoop &loop = msg_loop_;
|
||||
timeout_us = phy_plan_ctx->get_timeout_timestamp() - get_timestamp();
|
||||
bool server_all_alive = true;
|
||||
/**
|
||||
* 开始收下一个消息。
|
||||
*/
|
||||
if (OB_FAIL(check_all_sqc(active_dfos, times_offset, all_dfo_terminate,
|
||||
last_timestamp, server_all_alive))) {
|
||||
last_timestamp))) {
|
||||
LOG_WARN("fail to check sqc");
|
||||
} else if (all_dfo_terminate) {
|
||||
wait_msg = false;
|
||||
collect_trans_result_ok = server_all_alive;
|
||||
collect_trans_result_ok = true;
|
||||
LOG_TRACE("all dfo has been terminate", K(ret));
|
||||
break;
|
||||
} else if (OB_FAIL(ctx_.fast_check_status_ignore_interrupt())) {
|
||||
@ -750,12 +749,13 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
|
||||
&& OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER != coord_info_.first_error_code_) {
|
||||
ret = coord_info_.first_error_code_;
|
||||
}
|
||||
if (!collect_trans_result_ok && OB_FAIL(ret)) {
|
||||
if (!collect_trans_result_ok) {
|
||||
ObSQLSessionInfo *session = ctx_.get_my_session();
|
||||
session->get_trans_result().set_incomplete();
|
||||
LOG_WARN("collect trans_result fail", K(ret),
|
||||
"session_id", session->get_sessid(),
|
||||
"trans_result", session->get_trans_result());
|
||||
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -763,12 +763,10 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
|
||||
int ObPxCoordOp::check_all_sqc(ObIArray<ObDfo *> &active_dfos,
|
||||
int64_t ×_offset,
|
||||
bool &all_dfo_terminate,
|
||||
int64_t &last_timestamp,
|
||||
bool &server_all_alive)
|
||||
int64_t &last_timestamp)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
all_dfo_terminate = true;
|
||||
server_all_alive = true;
|
||||
for (int64_t i = 0; i < active_dfos.count() && all_dfo_terminate && OB_SUCC(ret); ++i) {
|
||||
ObArray<ObPxSqcMeta *> sqcs;
|
||||
if (OB_FAIL(active_dfos.at(i)->get_sqcs(sqcs))) {
|
||||
@ -794,7 +792,14 @@ int ObPxCoordOp::check_all_sqc(ObIArray<ObDfo *> &active_dfos,
|
||||
all_dfo_terminate = false;
|
||||
break;
|
||||
} else if (sqc->is_server_not_alive()) {
|
||||
server_all_alive = false;
|
||||
sqc->set_server_not_alive(false);
|
||||
const DASTabletLocIArray &access_locations = sqc->get_access_table_locations();
|
||||
for (int64_t i = 0; i < access_locations.count() && OB_SUCC(ret); i++) {
|
||||
if (OB_FAIL(ctx_.get_my_session()->get_trans_result().add_touched_ls(access_locations.at(i)->ls_id_))) {
|
||||
LOG_WARN("add touched ls failed", K(ret));
|
||||
}
|
||||
}
|
||||
LOG_WARN("server not alive", K(access_locations), K(sqc->get_access_table_location_keys()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,8 +118,7 @@ protected:
|
||||
int check_all_sqc(common::ObIArray<ObDfo *> &active_dfos,
|
||||
int64_t &time_offset,
|
||||
bool &all_dfo_terminate,
|
||||
int64_t &cur_timestamp,
|
||||
bool &server_all_alive);
|
||||
int64_t &cur_timestamp);
|
||||
|
||||
int calc_admited_worker_count(int64_t px_expected,
|
||||
int64_t query_expected,
|
||||
|
||||
@ -262,8 +262,8 @@ void ObFastInitSqcReportQCMessageCall::operator()(hash::HashMapPair<ObInterrupti
|
||||
mock_sqc_finish_msg();
|
||||
} else {
|
||||
sqc_->set_need_report(false);
|
||||
if (!sqc_alive_) {
|
||||
sqc_->set_server_not_alive();
|
||||
if (need_set_not_alive_) {
|
||||
sqc_->set_server_not_alive(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -476,10 +476,7 @@ int ObFastInitSqcCB::deal_with_rpc_timeout_err_safely()
|
||||
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// only if it's sure init_sqc msg is not sent to sqc successfully, we can retry the query.
|
||||
bool init_sqc_not_send_out = (get_error() == EASY_TIMEOUT_NOT_SENT_OUT
|
||||
|| get_error() == EASY_DISCONNECT_NOT_SENT_OUT);
|
||||
ObDealWithRpcTimeoutCall call(addr_, retry_info_, timeout_ts_, trace_id_, init_sqc_not_send_out);
|
||||
ObDealWithRpcTimeoutCall call(addr_, retry_info_, timeout_ts_, trace_id_);
|
||||
call.ret_ = OB_TIMEOUT;
|
||||
ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance();
|
||||
if (OB_NOT_NULL(manager)) {
|
||||
@ -495,7 +492,11 @@ void ObFastInitSqcCB::interrupt_qc(int err, bool is_timeout)
|
||||
int ret = OB_SUCCESS;
|
||||
ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance();
|
||||
if (OB_NOT_NULL(manager)) {
|
||||
ObFastInitSqcReportQCMessageCall call(sqc_, err, timeout_ts_, !is_timeout);
|
||||
// if we are sure init_sqc msg is not sent to sqc successfully, we don't have to set sqc not alive.
|
||||
bool init_sqc_not_send_out = (get_error() == EASY_TIMEOUT_NOT_SENT_OUT
|
||||
|| get_error() == EASY_DISCONNECT_NOT_SENT_OUT);
|
||||
const bool need_set_not_alive = is_timeout && !init_sqc_not_send_out;
|
||||
ObFastInitSqcReportQCMessageCall call(sqc_, err, timeout_ts_, need_set_not_alive);
|
||||
if (OB_FAIL(manager->get_map().atomic_refactored(interrupt_id_, call))) {
|
||||
LOG_WARN("fail to set need report", K(interrupt_id_));
|
||||
} else if (!call.need_interrupt_) {
|
||||
@ -532,12 +533,7 @@ void ObDealWithRpcTimeoutCall::deal_with_rpc_timeout_err()
|
||||
LOG_WARN("fail to add invalid server distinctly", K_(trace_id), K(a_ret), K_(addr));
|
||||
}
|
||||
}
|
||||
if (can_retry_) {
|
||||
// return OB_RPC_CONNECT_ERROR to retry.
|
||||
ret_ = OB_RPC_CONNECT_ERROR;
|
||||
} else {
|
||||
ret_ = OB_PACKET_STATUS_UNKNOWN;
|
||||
}
|
||||
ret_ = OB_RPC_CONNECT_ERROR;
|
||||
} else {
|
||||
LOG_DEBUG("rpc return OB_TIMEOUT, and it is actually timeout, "
|
||||
"do not change error code", K(ret_),
|
||||
|
||||
@ -96,9 +96,9 @@ public:
|
||||
ObFastInitSqcReportQCMessageCall(ObPxSqcMeta *sqc,
|
||||
int err,
|
||||
int64_t timeout_ts,
|
||||
bool sqc_alive) : sqc_(sqc), err_(err),
|
||||
bool need_set_not_alive) : sqc_(sqc), err_(err),
|
||||
need_interrupt_(false), timeout_ts_(timeout_ts),
|
||||
sqc_alive_(sqc_alive)
|
||||
need_set_not_alive_(need_set_not_alive)
|
||||
{
|
||||
need_interrupt_ = true;
|
||||
}
|
||||
@ -111,7 +111,7 @@ public:
|
||||
int err_;
|
||||
bool need_interrupt_;
|
||||
int64_t timeout_ts_;
|
||||
bool sqc_alive_;
|
||||
bool need_set_not_alive_;
|
||||
};
|
||||
|
||||
class ObDealWithRpcTimeoutCall
|
||||
@ -120,9 +120,8 @@ public:
|
||||
ObDealWithRpcTimeoutCall(common::ObAddr addr,
|
||||
ObQueryRetryInfo *retry_info,
|
||||
int64_t timeout_ts,
|
||||
common::ObCurTraceId::TraceId &trace_id,
|
||||
bool retry) : addr_(addr), retry_info_(retry_info),
|
||||
timeout_ts_(timeout_ts), trace_id_(trace_id), ret_(common::OB_TIMEOUT), can_retry_(retry) {}
|
||||
common::ObCurTraceId::TraceId &trace_id) : addr_(addr), retry_info_(retry_info),
|
||||
timeout_ts_(timeout_ts), trace_id_(trace_id), ret_(common::OB_TIMEOUT) {}
|
||||
~ObDealWithRpcTimeoutCall() = default;
|
||||
void operator() (hash::HashMapPair<ObInterruptibleTaskID,
|
||||
ObInterruptCheckerNode *> &entry);
|
||||
@ -133,7 +132,6 @@ public:
|
||||
int64_t timeout_ts_;
|
||||
common::ObCurTraceId::TraceId trace_id_;
|
||||
int ret_;
|
||||
bool can_retry_;
|
||||
};
|
||||
|
||||
class ObFastInitSqcCB
|
||||
|
||||
@ -185,14 +185,7 @@ int ObPxSqcAsyncProxy::wait_all() {
|
||||
if (phy_plan_ctx_->get_timeout_timestamp() -
|
||||
ObTimeUtility::current_time() > 0) {
|
||||
error_index_ = idx;
|
||||
bool init_sqc_not_send_out = (callback.get_error() == EASY_TIMEOUT_NOT_SENT_OUT
|
||||
|| callback.get_error() == EASY_DISCONNECT_NOT_SENT_OUT);
|
||||
if (init_sqc_not_send_out) {
|
||||
// only if it's sure init_sqc msg is not sent to sqc successfully, return OB_RPC_CONNECT_ERROR to retry.
|
||||
ret = OB_RPC_CONNECT_ERROR;
|
||||
} else {
|
||||
ret = OB_PACKET_STATUS_UNKNOWN;
|
||||
}
|
||||
ret = OB_RPC_CONNECT_ERROR;
|
||||
} else {
|
||||
ret = OB_TIMEOUT;
|
||||
}
|
||||
|
||||
@ -3506,7 +3506,7 @@ int ObExtraServerAliveCheck::do_check() const
|
||||
query_start_time_))) {
|
||||
sqcs.at(j).set_need_report(false);
|
||||
sqcs.at(j).set_thread_finish(true);
|
||||
sqcs.at(j).set_server_not_alive();
|
||||
sqcs.at(j).set_server_not_alive(true);
|
||||
if (!sqcs.at(j).is_ignore_vtable_error()) {
|
||||
ret = OB_RPC_CONNECT_ERROR;
|
||||
LOG_WARN("server not in communication, maybe crashed.", K(ret),
|
||||
|
||||
@ -140,6 +140,7 @@ OB_INLINE int ObResultSet::open_plan()
|
||||
int ObResultSet::open()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
my_session_.set_process_query_time(ObTimeUtility::current_time());
|
||||
LinkExecCtxGuard link_guard(my_session_, get_exec_context());
|
||||
if (lib::is_oracle_mode() &&
|
||||
get_exec_context().get_nested_level() >= OB_MAX_RECURSIVE_SQL_LEVELS) {
|
||||
|
||||
@ -3722,7 +3722,6 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
|
||||
pc_ctx.neg_param_index_.reset();
|
||||
bool plan_added = false;
|
||||
bool need_get_baseline = false;
|
||||
session.set_process_query_time(ObTimeUtility::current_time());
|
||||
LOG_DEBUG("gen plan info", K(context.spm_ctx_.bl_key_), K(get_plan_err));
|
||||
// for batched multi stmt, we only parse and optimize the first statement
|
||||
// only in multi_query, need do this
|
||||
|
||||
Reference in New Issue
Block a user