Enable retrive USER_ERROR and USER_WARN message from PX worker
This commit is contained in:
@ -45,7 +45,7 @@ public:
|
||||
|
||||
int32_t rcode_;
|
||||
char msg_[common::OB_MAX_ERROR_MSG_LEN];
|
||||
common::ObSEArray<common::ObWarningBuffer::WarningItem, 2> warnings_;
|
||||
common::ObSEArray<common::ObWarningBuffer::WarningItem, 1> warnings_;
|
||||
};
|
||||
|
||||
} // end of namespace obrpc
|
||||
|
@ -54,13 +54,10 @@ class ObDtlPacketProc
|
||||
{
|
||||
public:
|
||||
ObDtlMsgType get_proc_type() const override { return Packet::type(); }
|
||||
int decode(const ObDtlLinkedBuffer &buffer);
|
||||
int process(const ObDtlLinkedBuffer &buffer, bool &transferred) override;
|
||||
virtual int init(const ObDtlLinkedBuffer &buffer) { UNUSED(buffer); return common::OB_SUCCESS; }
|
||||
|
||||
private:
|
||||
int decode(const ObDtlLinkedBuffer &buffer);
|
||||
virtual int process(const Packet &pkt) = 0;
|
||||
|
||||
private:
|
||||
Packet pkt_;
|
||||
};
|
||||
@ -99,6 +96,48 @@ int ObDtlPacketProc<Packet>::process(const ObDtlLinkedBuffer &buffer, bool &tran
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <class Packet>
|
||||
class ObDtlPacketEmptyProc
|
||||
: public ObDtlPacketProcBase
|
||||
{
|
||||
public:
|
||||
ObDtlPacketEmptyProc() = default;
|
||||
virtual ~ObDtlPacketEmptyProc() = default;
|
||||
ObDtlMsgType get_proc_type() const override { return Packet::type(); }
|
||||
int process(const ObDtlLinkedBuffer &buffer, bool &transferred) override;
|
||||
private:
|
||||
int decode(const ObDtlLinkedBuffer &buffer);
|
||||
};
|
||||
|
||||
template <class Packet>
|
||||
int ObDtlPacketEmptyProc<Packet>::decode(const ObDtlLinkedBuffer &buffer)
|
||||
{
|
||||
using common::OB_SUCCESS;
|
||||
int ret = OB_SUCCESS;
|
||||
const char *buf = buffer.buf();
|
||||
int64_t size = buffer.size();
|
||||
int64_t &pos = buffer.pos();
|
||||
Packet pkt;
|
||||
if (OB_FAIL(common::serialization::decode(buf, size, pos, pkt))) {
|
||||
SQL_DTL_LOG(WARN, "decode DTL packet fail", K(size), K(pos));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <class Packet>
|
||||
int ObDtlPacketEmptyProc<Packet>::process(const ObDtlLinkedBuffer &buffer, bool &transferred)
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
transferred = false;
|
||||
if (buffer.pos() == buffer.size()) {
|
||||
// last row
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_FAIL(decode(buffer))) {
|
||||
// do nothing after decode. as we intend to discard the packet
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // dtl
|
||||
} // sql
|
||||
} // oceanbase
|
||||
|
@ -911,7 +911,8 @@ public:
|
||||
interm_result_ids_(),
|
||||
tx_desc_(NULL),
|
||||
is_use_local_thread_(false),
|
||||
fb_info_()
|
||||
fb_info_(),
|
||||
err_msg_()
|
||||
{
|
||||
|
||||
}
|
||||
@ -1004,6 +1005,8 @@ public:
|
||||
void set_use_local_thread(bool flag) { is_use_local_thread_ = flag; }
|
||||
bool is_use_local_thread() { return is_use_local_thread_; }
|
||||
ObExecFeedbackInfo &get_feedback_info() { return fb_info_; };
|
||||
const ObPxUserErrorMsg &get_err_msg() const { return err_msg_; }
|
||||
ObPxUserErrorMsg &get_err_msg() { return err_msg_; }
|
||||
public:
|
||||
// 小于等于0表示设置了rc 值, task default ret值为1
|
||||
static const int64_t TASK_DEFAULT_RET_VALUE = 1;
|
||||
@ -1032,6 +1035,7 @@ public:
|
||||
transaction::ObTxDesc *tx_desc_; // transcation information
|
||||
bool is_use_local_thread_;
|
||||
ObExecFeedbackInfo fb_info_; //for feedback info
|
||||
ObPxUserErrorMsg err_msg_; // for error msg & warning msg
|
||||
};
|
||||
|
||||
class ObPxRpcInitTaskArgs
|
||||
@ -1108,7 +1112,7 @@ public:
|
||||
ObPhysicalPlan *des_phy_plan_;
|
||||
ObOpSpec *op_spec_root_;
|
||||
ObOperator *static_engine_root_;
|
||||
ObPxTask *sqc_task_ptr_; // 指针指向 SQC SubCoord 中的对应 task 内存
|
||||
ObPxTask *sqc_task_ptr_; // 指针指向 SQC Ctx task 数组中对应的 task
|
||||
ObIAllocator *des_allocator_;
|
||||
ObPxSqcHandler *sqc_handler_; // 指向 SQC Handler 内存
|
||||
};
|
||||
|
@ -124,19 +124,6 @@ private:
|
||||
ObSqcCtx &sqc_ctx_;
|
||||
};
|
||||
|
||||
|
||||
//update the error code if it is OB_HASH_NOT_EXIST or OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER
|
||||
OB_INLINE void update_error_code(int ¤t_error_code, const int new_error_code)
|
||||
{
|
||||
if (new_error_code != ObPxTask::TASK_DEFAULT_RET_VALUE) {
|
||||
if ((OB_SUCCESS == current_error_code) ||
|
||||
((OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER == current_error_code ||
|
||||
OB_GOT_SIGNAL_ABORTING == current_error_code) &&
|
||||
OB_SUCCESS != new_error_code)) {
|
||||
current_error_code = new_error_code;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -699,15 +699,15 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
|
||||
ObPxTerminateMsgProc terminate_msg_proc(coord_info_, listener);
|
||||
ObPxFinishSqcResultP sqc_finish_msg_proc(ctx_, terminate_msg_proc);
|
||||
ObPxInitSqcResultP sqc_init_msg_proc(ctx_, terminate_msg_proc);
|
||||
ObBarrierPieceMsgP barrier_piece_msg_proc(ctx_, terminate_msg_proc);
|
||||
ObWinbufPieceMsgP winbuf_piece_msg_proc(ctx_, terminate_msg_proc);
|
||||
ObDynamicSamplePieceMsgP sample_piece_msg_proc(ctx_, terminate_msg_proc);
|
||||
ObRollupKeyPieceMsgP rollup_key_piece_msg_proc(ctx_, terminate_msg_proc);
|
||||
ObRDWFPieceMsgP rd_wf_piece_msg_proc(ctx_, terminate_msg_proc);
|
||||
ObInitChannelPieceMsgP init_channel_piece_msg_proc(ctx_, terminate_msg_proc);
|
||||
ObReportingWFPieceMsgP reporting_wf_piece_msg_proc(ctx_, terminate_msg_proc);
|
||||
ObPxQcInterruptedP interrupt_proc(ctx_, terminate_msg_proc);
|
||||
ObOptStatsGatherPieceMsgP opt_stats_gather_piece_msg_proc(ctx_, terminate_msg_proc);
|
||||
dtl::ObDtlPacketEmptyProc<ObBarrierPieceMsg> barrier_piece_msg_proc;
|
||||
dtl::ObDtlPacketEmptyProc<ObWinbufPieceMsg> winbuf_piece_msg_proc;
|
||||
dtl::ObDtlPacketEmptyProc<ObDynamicSamplePieceMsg> sample_piece_msg_proc;
|
||||
dtl::ObDtlPacketEmptyProc<ObRollupKeyPieceMsg> rollup_key_piece_msg_proc;
|
||||
dtl::ObDtlPacketEmptyProc<ObRDWFPieceMsg> rd_wf_piece_msg_proc;
|
||||
dtl::ObDtlPacketEmptyProc<ObInitChannelPieceMsg> init_channel_piece_msg_proc;
|
||||
dtl::ObDtlPacketEmptyProc<ObReportingWFPieceMsg> reporting_wf_piece_msg_proc;
|
||||
dtl::ObDtlPacketEmptyProc<ObOptStatsGatherPieceMsg> opt_stats_gather_piece_msg_proc;
|
||||
|
||||
// 这个注册会替换掉旧的proc.
|
||||
(void)msg_loop_.clear_all_proc();
|
||||
|
@ -28,10 +28,10 @@ OB_SERIALIZE_MEMBER((ObPxTaskChSet, dtl::ObDtlChSet), sqc_id_, task_id_);
|
||||
OB_SERIALIZE_MEMBER(ObPxPartChMapItem, first_, second_, third_);
|
||||
OB_SERIALIZE_MEMBER(ObPxReceiveDataChannelMsg, child_dfo_id_, ch_sets_, ch_total_info_, has_filled_channel_);
|
||||
OB_SERIALIZE_MEMBER(ObPxTransmitDataChannelMsg, ch_sets_, part_affinity_map_, ch_total_info_, has_filled_channel_);
|
||||
OB_SERIALIZE_MEMBER(ObPxInitSqcResultMsg, dfo_id_, sqc_id_, rc_, task_count_);
|
||||
OB_SERIALIZE_MEMBER(ObPxInitSqcResultMsg, dfo_id_, sqc_id_, rc_, task_count_, err_msg_);
|
||||
OB_SERIALIZE_MEMBER(ObPxFinishSqcResultMsg, dfo_id_, sqc_id_, rc_, trans_result_,
|
||||
task_monitor_info_array_, sqc_affected_rows_, dml_row_info_,
|
||||
temp_table_id_, interm_result_ids_, fb_info_);
|
||||
temp_table_id_, interm_result_ids_, fb_info_, err_msg_);
|
||||
OB_SERIALIZE_MEMBER(ObPxFinishTaskResultMsg, dfo_id_, sqc_id_, task_id_, rc_);
|
||||
OB_SERIALIZE_MEMBER((ObPxBloomFilterChInfo, dtl::ObDtlChTotalInfo), filter_id_);
|
||||
OB_SERIALIZE_MEMBER((ObPxBloomFilterChSet, dtl::ObDtlChSet), filter_id_, sqc_id_);
|
||||
|
@ -32,6 +32,8 @@ namespace oceanbase
|
||||
namespace sql
|
||||
{
|
||||
|
||||
typedef obrpc::ObRpcResultCode ObPxUserErrorMsg;
|
||||
|
||||
struct ObPxTabletInfo
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
@ -351,7 +353,8 @@ public:
|
||||
: dfo_id_(common::OB_INVALID_ID),
|
||||
sqc_id_(common::OB_INVALID_ID),
|
||||
rc_(common::OB_SUCCESS),
|
||||
task_count_(0) {}
|
||||
task_count_(0),
|
||||
err_msg_() {}
|
||||
virtual ~ObPxInitSqcResultMsg() = default;
|
||||
void reset() {}
|
||||
TO_STRING_KV(K_(dfo_id), K_(sqc_id), K_(rc), K_(task_count));
|
||||
@ -360,6 +363,7 @@ public:
|
||||
int64_t sqc_id_;
|
||||
int rc_; // 错误码
|
||||
int64_t task_count_;
|
||||
ObPxUserErrorMsg err_msg_; // for error msg & warning msg
|
||||
// No need to serialize
|
||||
ObSEArray<ObPxTabletInfo, 8> tablets_info_;
|
||||
};
|
||||
@ -380,7 +384,8 @@ public:
|
||||
dml_row_info_(),
|
||||
temp_table_id_(common::OB_INVALID_ID),
|
||||
interm_result_ids_(),
|
||||
fb_info_() {}
|
||||
fb_info_(),
|
||||
err_msg_() {}
|
||||
virtual ~ObPxFinishSqcResultMsg() = default;
|
||||
const transaction::ObTxExecResult &get_trans_result() const { return trans_result_; }
|
||||
transaction::ObTxExecResult &get_trans_result() { return trans_result_; }
|
||||
@ -393,6 +398,7 @@ public:
|
||||
task_monitor_info_array_.reset();
|
||||
dml_row_info_.reset();
|
||||
fb_info_.reset();
|
||||
err_msg_.reset();
|
||||
}
|
||||
TO_STRING_KV(K_(dfo_id), K_(sqc_id), K_(rc), K_(sqc_affected_rows));
|
||||
public:
|
||||
@ -406,6 +412,7 @@ public:
|
||||
uint64_t temp_table_id_;
|
||||
ObSEArray<uint64_t, 8> interm_result_ids_;
|
||||
ObExecFeedbackInfo fb_info_;
|
||||
ObPxUserErrorMsg err_msg_; // for error msg & warning msg
|
||||
};
|
||||
|
||||
class ObPxFinishTaskResultMsg
|
||||
|
@ -172,7 +172,7 @@ int ObPxMsgProc::on_sqc_init_msg(ObExecContext &ctx, const ObPxInitSqcResultMsg
|
||||
} else {
|
||||
if (OB_SUCCESS != pkt.rc_) {
|
||||
ret = pkt.rc_;
|
||||
update_error_code(coord_info_.first_error_code_, pkt.rc_);
|
||||
ObPxErrorUtil::update_qc_error_code(coord_info_.first_error_code_, pkt.rc_, pkt.err_msg_);
|
||||
LOG_WARN("fail init sqc, please check remote server log for details",
|
||||
"remote_server", sqc->get_exec_addr(), K(pkt), KP(ret));
|
||||
} else if (pkt.task_count_ <= 0) {
|
||||
@ -380,7 +380,7 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx,
|
||||
* 发送这个finish消息的sqc(包括它的worker)其实已经结束了,需要将它
|
||||
* 但是因为出错了,后续的调度流程不需要继续了,后面流程会进行错误处理。
|
||||
*/
|
||||
update_error_code(coord_info_.first_error_code_, pkt.rc_);
|
||||
ObPxErrorUtil::update_qc_error_code(coord_info_.first_error_code_, pkt.rc_, pkt.err_msg_);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(pkt.rc_)) {
|
||||
LOG_WARN("sqc fail, abort qc", K(pkt), K(ret), "sqc_addr", sqc->get_exec_addr());
|
||||
@ -569,7 +569,7 @@ int ObPxTerminateMsgProc::on_sqc_init_msg(ObExecContext &ctx, const ObPxInitSqcR
|
||||
if (pkt.rc_ != OB_SUCCESS) {
|
||||
LOG_DEBUG("receive error code from sqc init msg", K(coord_info_.first_error_code_), K(pkt.rc_));
|
||||
}
|
||||
update_error_code(coord_info_.first_error_code_, pkt.rc_);
|
||||
ObPxErrorUtil::update_qc_error_code(coord_info_.first_error_code_, pkt.rc_, pkt.err_msg_);
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -638,7 +638,7 @@ int ObPxTerminateMsgProc::on_sqc_finish_msg(ObExecContext &ctx, const ObPxFinish
|
||||
if (pkt.rc_ != OB_SUCCESS) {
|
||||
LOG_DEBUG("receive error code from sqc finish msg", K(coord_info_.first_error_code_), K(pkt.rc_));
|
||||
}
|
||||
update_error_code(coord_info_.first_error_code_, pkt.rc_);
|
||||
ObPxErrorUtil::update_qc_error_code(coord_info_.first_error_code_, pkt.rc_, pkt.err_msg_);
|
||||
|
||||
NG_TRACE_EXT(sqc_finish,
|
||||
OB_ID(dfo_id), sqc->get_dfo_id(),
|
||||
@ -716,74 +716,6 @@ int ObPxTerminateMsgProc::startup_msg_loop(ObExecContext &ctx)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTerminateMsgProc::on_piece_msg(
|
||||
ObExecContext &ctx,
|
||||
const ObBarrierPieceMsg &pkt)
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
UNUSED(ctx);
|
||||
UNUSED(pkt);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTerminateMsgProc::on_piece_msg(
|
||||
ObExecContext &ctx,
|
||||
const ObWinbufPieceMsg &pkt)
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
UNUSED(ctx);
|
||||
UNUSED(pkt);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTerminateMsgProc::on_piece_msg(
|
||||
ObExecContext &ctx,
|
||||
const ObDynamicSamplePieceMsg &pkt)
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
UNUSED(ctx);
|
||||
UNUSED(pkt);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTerminateMsgProc::on_piece_msg(
|
||||
ObExecContext &ctx,
|
||||
const ObRollupKeyPieceMsg &pkt)
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
UNUSED(ctx);
|
||||
UNUSED(pkt);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTerminateMsgProc::on_piece_msg(
|
||||
ObExecContext &,
|
||||
const ObRDWFPieceMsg &)
|
||||
{
|
||||
return common::OB_SUCCESS;
|
||||
}
|
||||
|
||||
int ObPxTerminateMsgProc::on_piece_msg(
|
||||
ObExecContext &,
|
||||
const ObInitChannelPieceMsg &)
|
||||
{
|
||||
return common::OB_SUCCESS;
|
||||
}
|
||||
|
||||
int ObPxTerminateMsgProc::on_piece_msg(
|
||||
ObExecContext &,
|
||||
const ObReportingWFPieceMsg &)
|
||||
{
|
||||
return common::OB_SUCCESS;
|
||||
}
|
||||
|
||||
int ObPxTerminateMsgProc::on_piece_msg(
|
||||
ObExecContext &,
|
||||
const ObOptStatsGatherPieceMsg &)
|
||||
{
|
||||
return common::OB_SUCCESS;
|
||||
}
|
||||
|
||||
int ObPxCoordInfo::init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -171,17 +171,17 @@ public:
|
||||
int on_sqc_init_fail(ObDfo &dfo, ObPxSqcMeta &sqc);
|
||||
int on_interrupted(ObExecContext &ctx, const common::ObInterruptCode &pkt);
|
||||
int startup_msg_loop(ObExecContext &ctx);
|
||||
// begin DATAHUB msg processing
|
||||
int on_piece_msg(ObExecContext &ctx, const ObBarrierPieceMsg &pkt);
|
||||
int on_piece_msg(ObExecContext &ctx, const ObWinbufPieceMsg &pkt);
|
||||
int on_piece_msg(ObExecContext &ctx, const ObDynamicSamplePieceMsg &pkt);
|
||||
int on_piece_msg(ObExecContext &ctx, const ObRollupKeyPieceMsg &pkt);
|
||||
int on_piece_msg(ObExecContext &ctx, const ObRDWFPieceMsg &pkt);
|
||||
int on_piece_msg(ObExecContext &ctx, const ObInitChannelPieceMsg &pkt);
|
||||
int on_piece_msg(ObExecContext &ctx, const ObReportingWFPieceMsg &pkt);
|
||||
int on_piece_msg(ObExecContext &ctx, const ObOptStatsGatherPieceMsg &pkt);
|
||||
// end DATAHUB msg processing
|
||||
|
||||
// Begin Datahub processing
|
||||
// Don't need to process datahub message in terminate message processor
|
||||
int on_piece_msg(ObExecContext &ctx, const ObBarrierPieceMsg &pkt) { UNUSED(ctx); UNUSED(pkt); return common::OB_NOT_SUPPORTED; }
|
||||
int on_piece_msg(ObExecContext &ctx, const ObWinbufPieceMsg &pkt) { UNUSED(ctx); UNUSED(pkt); return common::OB_NOT_SUPPORTED; }
|
||||
int on_piece_msg(ObExecContext &ctx, const ObDynamicSamplePieceMsg &pkt) { UNUSED(ctx); UNUSED(pkt); return common::OB_NOT_SUPPORTED; }
|
||||
int on_piece_msg(ObExecContext &ctx, const ObRollupKeyPieceMsg &pkt) { UNUSED(ctx); UNUSED(pkt); return common::OB_NOT_SUPPORTED; }
|
||||
int on_piece_msg(ObExecContext &ctx, const ObRDWFPieceMsg &pkt) { UNUSED(ctx); UNUSED(pkt); return common::OB_NOT_SUPPORTED; }
|
||||
int on_piece_msg(ObExecContext &ctx, const ObInitChannelPieceMsg &pkt) { UNUSED(ctx); UNUSED(pkt); return common::OB_NOT_SUPPORTED; }
|
||||
int on_piece_msg(ObExecContext &ctx, const ObReportingWFPieceMsg &pkt) { UNUSED(ctx); UNUSED(pkt); return common::OB_NOT_SUPPORTED; }
|
||||
int on_piece_msg(ObExecContext &ctx, const ObOptStatsGatherPieceMsg &pkt) { UNUSED(ctx); UNUSED(pkt); return common::OB_NOT_SUPPORTED; }
|
||||
// End Datahub processing
|
||||
ObPxCoordInfo &coord_info_;
|
||||
ObIPxCoordEventListener &listener_;
|
||||
};
|
||||
|
@ -59,7 +59,7 @@ public:
|
||||
void set_invalid(bool value) { is_invalid_ = value; }
|
||||
bool is_invalid() const { return is_invalid_; }
|
||||
bool is_processed() const { return is_processed_; }
|
||||
const obrpc::ObRpcResultCode get_ret_code() const { return rcode_; }
|
||||
const ObPxUserErrorMsg get_ret_code() const { return rcode_; }
|
||||
const common::ObAddr &get_dst() const { return dst_; }
|
||||
int64_t get_timeout() const { return timeout_; }
|
||||
// to string
|
||||
|
@ -449,14 +449,13 @@ int ObPxSQCProxy::report(int end_ret) const
|
||||
// 第一版暂不支持重试
|
||||
int sqc_ret = OB_SUCCESS;
|
||||
auto &tasks = sqc_ctx.get_tasks();
|
||||
update_error_code(sqc_ret, end_ret);
|
||||
ObSQLSessionInfo *session = NULL;
|
||||
CK(OB_NOT_NULL(sqc_arg.exec_ctx_) &&
|
||||
OB_NOT_NULL(session = GET_MY_SESSION(*sqc_arg.exec_ctx_)));
|
||||
for (int64_t i = 0; i < tasks.count(); ++i) {
|
||||
// overwrite ret
|
||||
ObPxTask &task = tasks.at(i);
|
||||
update_error_code(sqc_ret, task.get_result());
|
||||
ObPxErrorUtil::update_sqc_error_code(sqc_ret, task.get_result(), task.err_msg_, finish_msg.err_msg_);
|
||||
affected_rows += task.get_affected_rows();
|
||||
finish_msg.dml_row_info_.add_px_dml_row_info(task.dml_row_info_);
|
||||
finish_msg.temp_table_id_ = task.temp_table_id_;
|
||||
@ -481,6 +480,7 @@ int ObPxSQCProxy::report(int end_ret) const
|
||||
|
||||
OZ(append(finish_msg.interm_result_ids_, task.interm_result_ids_));
|
||||
}
|
||||
ObPxErrorUtil::update_error_code(sqc_ret, end_ret);
|
||||
if (OB_SUCCESS != ret && OB_SUCCESS == sqc_ret) {
|
||||
sqc_ret = ret;
|
||||
}
|
||||
|
@ -125,6 +125,8 @@ int ObPxTaskProcess::process()
|
||||
{
|
||||
ObActiveSessionGuard::get_stat().in_px_execution_ = true;
|
||||
int ret = OB_SUCCESS;
|
||||
common::ob_setup_default_tsi_warning_buffer();
|
||||
common::ob_reset_tsi_warning_buffer();
|
||||
enqueue_timestamp_ = ObTimeUtility::current_time();
|
||||
process_timestamp_ = enqueue_timestamp_;
|
||||
ObExecRecord exec_record;
|
||||
@ -456,6 +458,9 @@ int ObPxTaskProcess::do_process()
|
||||
if (OB_NOT_NULL(arg_.exec_ctx_)) {
|
||||
DAS_CTX(*arg_.exec_ctx_).get_location_router().refresh_location_cache(true, ret);
|
||||
}
|
||||
|
||||
// for forward warning msg and user error msg
|
||||
(void)record_user_error_msg(ret);
|
||||
// for transaction
|
||||
(void)record_tx_desc();
|
||||
// for exec feedback info
|
||||
@ -485,6 +490,35 @@ int ObPxTaskProcess::do_process()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTaskProcess::record_user_error_msg(int retcode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
CK (OB_NOT_NULL(arg_.sqc_task_ptr_));
|
||||
if (OB_SUCC(ret)) {
|
||||
ObPxUserErrorMsg &rcode = arg_.sqc_task_ptr_->get_err_msg();
|
||||
rcode.rcode_ = retcode;
|
||||
common::ObWarningBuffer *wb = common::ob_get_tsi_warning_buffer();
|
||||
if (wb) {
|
||||
if (retcode != common::OB_SUCCESS) {
|
||||
(void)snprintf(rcode.msg_, common::OB_MAX_ERROR_MSG_LEN, "%s", wb->get_err_msg());
|
||||
}
|
||||
//always add warning buffer
|
||||
bool not_null = true;
|
||||
for (uint32_t idx = 0; OB_SUCC(ret) && not_null && idx < wb->get_readable_warning_count(); idx++) {
|
||||
const common::ObWarningBuffer::WarningItem *item = wb->get_warning_item(idx);
|
||||
if (item != NULL) {
|
||||
if (OB_FAIL(rcode.warnings_.push_back(*item))) {
|
||||
RPC_OBRPC_LOG(WARN, "Failed to add warning", K(ret));
|
||||
}
|
||||
} else {
|
||||
not_null = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxTaskProcess::record_exec_feedback_info()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -115,6 +115,7 @@ private:
|
||||
{ ObExecStatUtils::record_exec_timestamp(*this, is_first, exec_timestamp); }
|
||||
int record_tx_desc();
|
||||
int record_exec_feedback_info();
|
||||
int record_user_error_msg(int retcode);
|
||||
void release();
|
||||
/* variables */
|
||||
const observer::ObGlobalContext &gctx_;
|
||||
|
@ -570,6 +570,73 @@ public:
|
||||
static bool is_in_blacklist(const common::ObAddr &addr, int64_t server_start_time);
|
||||
};
|
||||
|
||||
class ObPxErrorUtil
|
||||
{
|
||||
public:
|
||||
static inline void update_qc_error_code(int ¤t_error_code,
|
||||
const int new_error_code,
|
||||
const ObPxUserErrorMsg &from)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// **replace** error code & error msg
|
||||
if (new_error_code != ObPxTask::TASK_DEFAULT_RET_VALUE) {
|
||||
if ((OB_SUCCESS == current_error_code) ||
|
||||
((OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER == current_error_code ||
|
||||
OB_GOT_SIGNAL_ABORTING == current_error_code) &&
|
||||
OB_SUCCESS != new_error_code)) {
|
||||
current_error_code = new_error_code;
|
||||
FORWARD_USER_ERROR(new_error_code, from.msg_);
|
||||
}
|
||||
}
|
||||
// **append** warning msg
|
||||
for (int i = 0; i < from.warnings_.count(); ++i) {
|
||||
const common::ObWarningBuffer::WarningItem &warning_item = from.warnings_.at(i);
|
||||
if (ObLogger::USER_WARN == warning_item.log_level_) {
|
||||
FORWARD_USER_WARN(warning_item.code_, warning_item.msg_);
|
||||
} else if (ObLogger::USER_NOTE == warning_item.log_level_) {
|
||||
FORWARD_USER_NOTE(warning_item.code_, warning_item.msg_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static inline void update_sqc_error_code(int ¤t_error_code,
|
||||
const int new_error_code,
|
||||
const ObPxUserErrorMsg &from,
|
||||
ObPxUserErrorMsg &to)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// **replace** error code & error msg
|
||||
if (new_error_code != ObPxTask::TASK_DEFAULT_RET_VALUE) {
|
||||
if ((OB_SUCCESS == current_error_code) ||
|
||||
((OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER == current_error_code ||
|
||||
OB_GOT_SIGNAL_ABORTING == current_error_code) &&
|
||||
OB_SUCCESS != new_error_code)) {
|
||||
current_error_code = new_error_code;
|
||||
(void)snprintf(to.msg_, common::OB_MAX_ERROR_MSG_LEN, "%s", from.msg_);
|
||||
}
|
||||
}
|
||||
// **append** warning msg
|
||||
for (int i = 0; i < from.warnings_.count(); ++i) {
|
||||
if (OB_FAIL(to.warnings_.push_back(from.warnings_.at(i)))) {
|
||||
SQL_LOG(WARN, "Failed to add warning. ignore error & continue", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//update the error code if it is OB_HASH_NOT_EXIST or OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER
|
||||
static inline void update_error_code(int ¤t_error_code, const int new_error_code)
|
||||
{
|
||||
if (new_error_code != ObPxTask::TASK_DEFAULT_RET_VALUE) {
|
||||
if ((OB_SUCCESS == current_error_code) ||
|
||||
((OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER == current_error_code ||
|
||||
OB_GOT_SIGNAL_ABORTING == current_error_code) &&
|
||||
OB_SUCCESS != new_error_code)) {
|
||||
current_error_code = new_error_code;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template<class T>
|
||||
static int get_location_addrs(const T &locations,
|
||||
ObIArray<ObAddr> &addrs)
|
||||
|
Reference in New Issue
Block a user