Add QC thread id and sql text for PX execution.

This commit is contained in:
qianchanger
2023-05-26 02:47:24 +00:00
committed by ob-robot
parent 69bd1a6d04
commit 62b9b3b829
4 changed files with 55 additions and 1 deletions

View File

@ -29,6 +29,8 @@ OB_SERIALIZE_MEMBER(ObP2PDhMapInfo,
p2p_sequence_ids_,
target_addrs_);
OB_SERIALIZE_MEMBER(ObQCMonitoringInfo, sql_, qc_tid_);
OB_SERIALIZE_MEMBER(ObPxSqcMeta,
execution_id_,
qc_id_,
@ -63,7 +65,8 @@ OB_SERIALIZE_MEMBER(ObPxSqcMeta,
access_external_table_files_,
px_detectable_ids_,
p2p_dh_map_info_,
sqc_count_);
sqc_count_,
monitoring_info_);
OB_SERIALIZE_MEMBER(ObPxTask,
qc_id_,
dfo_id_,
@ -93,6 +96,29 @@ OB_SERIALIZE_MEMBER(ObSqcTableLocationKey,
OB_SERIALIZE_MEMBER(ObPxCleanDtlIntermResInfo, ch_total_info_, sqc_id_, task_count_);
OB_SERIALIZE_MEMBER(ObPxCleanDtlIntermResArgs, info_, batch_size_);
int ObQCMonitoringInfo::init(const ObExecContext &exec_ctx) {
int ret = OB_SUCCESS;
qc_tid_ = GETTID();
if (OB_NOT_NULL(exec_ctx.get_sql_ctx())) {
sql_ = exec_ctx.get_sql_ctx()->cur_sql_;
}
if (sql_.length() > ObQCMonitoringInfo::LIMIT_LENGTH) {
sql_.set_length(ObQCMonitoringInfo::LIMIT_LENGTH);
}
return ret;
}
int ObQCMonitoringInfo::assign(const ObQCMonitoringInfo &other) {
int ret = OB_SUCCESS;
sql_ = other.sql_;
qc_tid_ = other.qc_tid_;
return ret;
}
void ObQCMonitoringInfo::reset() {
sql_.reset();
}
int ObPxSqcMeta::assign(const ObPxSqcMeta &other)
{
int ret = OB_SUCCESS;
@ -121,6 +147,8 @@ int ObPxSqcMeta::assign(const ObPxSqcMeta &other)
LOG_WARN("failed to assgin to table location keys.", K(ret));
} else if (OB_FAIL(p2p_dh_map_info_.assign(other.p2p_dh_map_info_))) {
LOG_WARN("fail to assign p2p dh map info", K(ret));
} else if (OB_FAIL(monitoring_info_.assign(other.monitoring_info_))) {
LOG_WARN("fail to assign qc monitoring info", K(ret));
} else {
execution_id_ = other.execution_id_;
qc_id_ = other.qc_id_;

View File

@ -182,6 +182,21 @@ public:
TO_STRING_KV(K_(p2p_sequence_ids), K_(target_addrs));
};
struct ObQCMonitoringInfo {
OB_UNIS_VERSION(1);
public:
int init(const ObExecContext &exec_ctx);
int assign(const ObQCMonitoringInfo &other);
void reset();
public:
common::ObString sql_;
// in nested px situation, it is the current px coordinator's thread id
int64_t qc_tid_;
// no need to deserialize
static constexpr int64_t LIMIT_LENGTH = 100;
TO_STRING_KV(K_(sql), K_(qc_tid));
};
// PX 端描述每个 SQC 的数据结构
class ObPxSqcMeta
{
@ -317,6 +332,7 @@ public:
partition_pruning_table_locations_.reset();
access_external_table_files_.reset();
allocator_.reset();
monitoring_info_.reset();
}
// SQC 端收到 InitSQC 消息后通过 data_channel 信息是否为空
// 来判断 data channel 是否已经预分配好,是否要走轻量调度
@ -347,6 +363,8 @@ public:
ObP2PDhMapInfo &get_p2p_dh_map_info() { return p2p_dh_map_info_;};
void set_sqc_count(int64_t sqc_cnt) { sqc_count_ = sqc_cnt; }
int64_t get_sqc_count() const { return sqc_count_;}
ObQCMonitoringInfo &get_monitoring_info() { return monitoring_info_; }
const ObQCMonitoringInfo &get_monitoring_info() const { return monitoring_info_; }
TO_STRING_KV(K_(need_report), K_(execution_id), K_(qc_id), K_(sqc_id), K_(dfo_id), K_(exec_addr), K_(qc_addr),
K_(qc_ch_info), K_(sqc_ch_info),
K_(task_count), K_(max_task_count), K_(min_task_count),
@ -358,6 +376,7 @@ private:
uint64_t qc_id_;
int64_t sqc_id_;
int64_t dfo_id_;
ObQCMonitoringInfo monitoring_info_;
// The partition location information of the all table_scan op and dml op
// used for px worker execution
// no need serialize

View File

@ -439,6 +439,8 @@ int ObPxTaskProcess::do_process()
if (OB_SUCC(ret)) {
if (nullptr != arg_.op_spec_root_) {
const ObPxSqcMeta &sqc_meta = arg_.sqc_handler_->get_sqc_init_arg().sqc_;
// show monitoring information from qc
LOG_DEBUG("receive monitoring information", K(sqc_meta.get_monitoring_info()));
ObExtraServerAliveCheck qc_alive_checker(sqc_meta.get_qc_addr(),
arg_.exec_ctx_->get_my_session()->get_process_query_time());
ObExtraServerAliveCheck::Guard check_guard(*arg_.exec_ctx_, qc_alive_checker);

View File

@ -488,6 +488,7 @@ int ObPXServerAddrUtil::build_dfo_sqc(ObExecContext &ctx,
sqc.set_qc_server_id(dfo.get_qc_server_id());
sqc.set_parent_dfo_id(dfo.get_parent_dfo_id());
sqc.set_single_tsc_leaf_dfo(dfo.is_single_tsc_leaf_dfo());
sqc.get_monitoring_info().init(ctx);
if (OB_SUCC(ret)) {
if (!dfo.get_p2p_dh_map_info().is_empty()) {
if (OB_FAIL(sqc.get_p2p_dh_map_info().assign(dfo.get_p2p_dh_map_info()))) {
@ -603,6 +604,7 @@ int ObPXServerAddrUtil::alloc_by_temp_child_distribution_inner(ObExecContext &ex
sqc.set_fulltree(child.is_fulltree());
sqc.set_qc_server_id(child.get_qc_server_id());
sqc.set_parent_dfo_id(child.get_parent_dfo_id());
sqc.get_monitoring_info().init(exec_ctx);
if (OB_SUCC(ret)) {
if (!child.get_p2p_dh_map_info().is_empty()) {
if (OB_FAIL(sqc.get_p2p_dh_map_info().assign(child.get_p2p_dh_map_info()))) {
@ -688,6 +690,7 @@ int ObPXServerAddrUtil::alloc_by_child_distribution(const ObDfo &child, ObDfo &p
sqc.set_fulltree(parent.is_fulltree());
sqc.set_qc_server_id(parent.get_qc_server_id());
sqc.set_parent_dfo_id(parent.get_parent_dfo_id());
sqc.get_monitoring_info().assign(child_sqc.get_monitoring_info());
if (OB_FAIL(parent.add_sqc(sqc))) {
LOG_WARN("fail add sqc", K(sqc), K(ret));
}
@ -776,6 +779,7 @@ int ObPXServerAddrUtil::alloc_by_random_distribution(ObExecContext &exec_ctx,
sqc.set_fulltree(parent.is_fulltree());
sqc.set_qc_server_id(parent.get_qc_server_id());
sqc.set_parent_dfo_id(parent.get_parent_dfo_id());
sqc.get_monitoring_info().init(exec_ctx);
if (OB_SUCC(ret)) {
if (!parent.get_p2p_dh_map_info().is_empty()) {
if (OB_FAIL(sqc.get_p2p_dh_map_info().assign(parent.get_p2p_dh_map_info()))) {
@ -824,6 +828,7 @@ int ObPXServerAddrUtil::alloc_by_local_distribution(ObExecContext &exec_ctx,
sqc.set_fulltree(dfo.is_fulltree());
sqc.set_parent_dfo_id(dfo.get_parent_dfo_id());
sqc.set_qc_server_id(dfo.get_qc_server_id());
sqc.get_monitoring_info().init(exec_ctx);
if (!dfo.get_p2p_dh_map_info().is_empty()) {
OZ(sqc.get_p2p_dh_map_info().assign(dfo.get_p2p_dh_map_info()));
}