diff --git a/src/sql/engine/px/ob_dfo.cpp b/src/sql/engine/px/ob_dfo.cpp index 1368749d40..511b1089c6 100644 --- a/src/sql/engine/px/ob_dfo.cpp +++ b/src/sql/engine/px/ob_dfo.cpp @@ -29,6 +29,8 @@ OB_SERIALIZE_MEMBER(ObP2PDhMapInfo, p2p_sequence_ids_, target_addrs_); +OB_SERIALIZE_MEMBER(ObQCMonitoringInfo, sql_, qc_tid_); + OB_SERIALIZE_MEMBER(ObPxSqcMeta, execution_id_, qc_id_, @@ -63,7 +65,8 @@ OB_SERIALIZE_MEMBER(ObPxSqcMeta, access_external_table_files_, px_detectable_ids_, p2p_dh_map_info_, - sqc_count_); + sqc_count_, + monitoring_info_); OB_SERIALIZE_MEMBER(ObPxTask, qc_id_, dfo_id_, @@ -93,6 +96,29 @@ OB_SERIALIZE_MEMBER(ObSqcTableLocationKey, OB_SERIALIZE_MEMBER(ObPxCleanDtlIntermResInfo, ch_total_info_, sqc_id_, task_count_); OB_SERIALIZE_MEMBER(ObPxCleanDtlIntermResArgs, info_, batch_size_); +int ObQCMonitoringInfo::init(const ObExecContext &exec_ctx) { + int ret = OB_SUCCESS; + qc_tid_ = GETTID(); + if (OB_NOT_NULL(exec_ctx.get_sql_ctx())) { + sql_ = exec_ctx.get_sql_ctx()->cur_sql_; + } + if (sql_.length() > ObQCMonitoringInfo::LIMIT_LENGTH) { + sql_.set_length(ObQCMonitoringInfo::LIMIT_LENGTH); + } + return ret; +} + +int ObQCMonitoringInfo::assign(const ObQCMonitoringInfo &other) { + int ret = OB_SUCCESS; + sql_ = other.sql_; + qc_tid_ = other.qc_tid_; + return ret; +} + +void ObQCMonitoringInfo::reset() { + sql_.reset(); +} + int ObPxSqcMeta::assign(const ObPxSqcMeta &other) { int ret = OB_SUCCESS; @@ -121,6 +147,8 @@ int ObPxSqcMeta::assign(const ObPxSqcMeta &other) LOG_WARN("failed to assgin to table location keys.", K(ret)); } else if (OB_FAIL(p2p_dh_map_info_.assign(other.p2p_dh_map_info_))) { LOG_WARN("fail to assign p2p dh map info", K(ret)); + } else if (OB_FAIL(monitoring_info_.assign(other.monitoring_info_))) { + LOG_WARN("fail to assign qc monitoring info", K(ret)); } else { execution_id_ = other.execution_id_; qc_id_ = other.qc_id_; diff --git a/src/sql/engine/px/ob_dfo.h b/src/sql/engine/px/ob_dfo.h index ba7e919838..1c56c1c6c3 100644 --- a/src/sql/engine/px/ob_dfo.h +++ b/src/sql/engine/px/ob_dfo.h @@ -182,6 +182,21 @@ public: TO_STRING_KV(K_(p2p_sequence_ids), K_(target_addrs)); }; +struct ObQCMonitoringInfo { + OB_UNIS_VERSION(1); +public: + int init(const ObExecContext &exec_ctx); + int assign(const ObQCMonitoringInfo &other); + void reset(); +public: + common::ObString sql_; + // in nested px situation, it is the current px coordinator's thread id + int64_t qc_tid_; + // no need to deserialize + static constexpr int64_t LIMIT_LENGTH = 100; + TO_STRING_KV(K_(sql), K_(qc_tid)); +}; + // PX 端描述每个 SQC 的数据结构 class ObPxSqcMeta { @@ -317,6 +332,7 @@ public: partition_pruning_table_locations_.reset(); access_external_table_files_.reset(); allocator_.reset(); + monitoring_info_.reset(); } // SQC 端收到 InitSQC 消息后通过 data_channel 信息是否为空 // 来判断 data channel 是否已经预分配好,是否要走轻量调度 @@ -347,6 +363,8 @@ public: ObP2PDhMapInfo &get_p2p_dh_map_info() { return p2p_dh_map_info_;}; void set_sqc_count(int64_t sqc_cnt) { sqc_count_ = sqc_cnt; } int64_t get_sqc_count() const { return sqc_count_;} + ObQCMonitoringInfo &get_monitoring_info() { return monitoring_info_; } + const ObQCMonitoringInfo &get_monitoring_info() const { return monitoring_info_; } TO_STRING_KV(K_(need_report), K_(execution_id), K_(qc_id), K_(sqc_id), K_(dfo_id), K_(exec_addr), K_(qc_addr), K_(qc_ch_info), K_(sqc_ch_info), K_(task_count), K_(max_task_count), K_(min_task_count), @@ -358,6 +376,7 @@ private: uint64_t qc_id_; int64_t sqc_id_; int64_t dfo_id_; + ObQCMonitoringInfo monitoring_info_; // The partition location information of the all table_scan op and dml op // used for px worker execution // no need serialize diff --git a/src/sql/engine/px/ob_px_task_process.cpp b/src/sql/engine/px/ob_px_task_process.cpp index 70f60b9cef..eb1c99135b 100644 --- a/src/sql/engine/px/ob_px_task_process.cpp +++ b/src/sql/engine/px/ob_px_task_process.cpp @@ -439,6 +439,8 @@ int ObPxTaskProcess::do_process() if (OB_SUCC(ret)) { if (nullptr != arg_.op_spec_root_) { const ObPxSqcMeta &sqc_meta = arg_.sqc_handler_->get_sqc_init_arg().sqc_; + // show monitoring information from qc + LOG_DEBUG("receive monitoring information", K(sqc_meta.get_monitoring_info())); ObExtraServerAliveCheck qc_alive_checker(sqc_meta.get_qc_addr(), arg_.exec_ctx_->get_my_session()->get_process_query_time()); ObExtraServerAliveCheck::Guard check_guard(*arg_.exec_ctx_, qc_alive_checker); diff --git a/src/sql/engine/px/ob_px_util.cpp b/src/sql/engine/px/ob_px_util.cpp index 7220faff09..fc3c5f3626 100644 --- a/src/sql/engine/px/ob_px_util.cpp +++ b/src/sql/engine/px/ob_px_util.cpp @@ -488,6 +488,7 @@ int ObPXServerAddrUtil::build_dfo_sqc(ObExecContext &ctx, sqc.set_qc_server_id(dfo.get_qc_server_id()); sqc.set_parent_dfo_id(dfo.get_parent_dfo_id()); sqc.set_single_tsc_leaf_dfo(dfo.is_single_tsc_leaf_dfo()); + sqc.get_monitoring_info().init(ctx); if (OB_SUCC(ret)) { if (!dfo.get_p2p_dh_map_info().is_empty()) { if (OB_FAIL(sqc.get_p2p_dh_map_info().assign(dfo.get_p2p_dh_map_info()))) { @@ -603,6 +604,7 @@ int ObPXServerAddrUtil::alloc_by_temp_child_distribution_inner(ObExecContext &ex sqc.set_fulltree(child.is_fulltree()); sqc.set_qc_server_id(child.get_qc_server_id()); sqc.set_parent_dfo_id(child.get_parent_dfo_id()); + sqc.get_monitoring_info().init(exec_ctx); if (OB_SUCC(ret)) { if (!child.get_p2p_dh_map_info().is_empty()) { if (OB_FAIL(sqc.get_p2p_dh_map_info().assign(child.get_p2p_dh_map_info()))) { @@ -688,6 +690,7 @@ int ObPXServerAddrUtil::alloc_by_child_distribution(const ObDfo &child, ObDfo &p sqc.set_fulltree(parent.is_fulltree()); sqc.set_qc_server_id(parent.get_qc_server_id()); sqc.set_parent_dfo_id(parent.get_parent_dfo_id()); + sqc.get_monitoring_info().assign(child_sqc.get_monitoring_info()); if (OB_FAIL(parent.add_sqc(sqc))) { LOG_WARN("fail add sqc", K(sqc), K(ret)); } @@ -776,6 +779,7 @@ int ObPXServerAddrUtil::alloc_by_random_distribution(ObExecContext &exec_ctx, sqc.set_fulltree(parent.is_fulltree()); sqc.set_qc_server_id(parent.get_qc_server_id()); sqc.set_parent_dfo_id(parent.get_parent_dfo_id()); + sqc.get_monitoring_info().init(exec_ctx); if (OB_SUCC(ret)) { if (!parent.get_p2p_dh_map_info().is_empty()) { if (OB_FAIL(sqc.get_p2p_dh_map_info().assign(parent.get_p2p_dh_map_info()))) { @@ -824,6 +828,7 @@ int ObPXServerAddrUtil::alloc_by_local_distribution(ObExecContext &exec_ctx, sqc.set_fulltree(dfo.is_fulltree()); sqc.set_parent_dfo_id(dfo.get_parent_dfo_id()); sqc.set_qc_server_id(dfo.get_qc_server_id()); + sqc.get_monitoring_info().init(exec_ctx); if (!dfo.get_p2p_dh_map_info().is_empty()) { OZ(sqc.get_p2p_dh_map_info().assign(dfo.get_p2p_dh_map_info())); }