From c2fea4f4d0d9588f51fec8125a1effd3bf874fca Mon Sep 17 00:00:00 2001 From: qianchanger Date: Thu, 2 Mar 2023 16:55:01 +0000 Subject: [PATCH] Fix dh window function report -4016 --- .../px/datahub/components/ob_dh_barrier.cpp | 51 ++++--- .../px/datahub/components/ob_dh_barrier.h | 2 + .../datahub/components/ob_dh_init_channel.cpp | 59 +++++--- .../datahub/components/ob_dh_init_channel.h | 4 +- .../components/ob_dh_opt_stats_gather.h | 1 + .../components/ob_dh_range_dist_wf.cpp | 126 ++++++++++-------- .../datahub/components/ob_dh_range_dist_wf.h | 3 +- .../datahub/components/ob_dh_rollup_key.cpp | 52 +++++--- .../px/datahub/components/ob_dh_rollup_key.h | 3 +- .../px/datahub/components/ob_dh_sample.cpp | 57 +++++--- .../px/datahub/components/ob_dh_sample.h | 2 + .../ob_dh_second_stage_reporting_wf.cpp | 54 +++++--- .../ob_dh_second_stage_reporting_wf.h | 2 + .../px/datahub/components/ob_dh_winbuf.cpp | 57 +++++--- .../px/datahub/components/ob_dh_winbuf.h | 2 + src/sql/engine/px/datahub/ob_dh_msg_ctx.h | 2 + 16 files changed, 295 insertions(+), 182 deletions(-) diff --git a/src/sql/engine/px/datahub/components/ob_dh_barrier.cpp b/src/sql/engine/px/datahub/components/ob_dh_barrier.cpp index 9f590c0f5f..3b202e3315 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_barrier.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_barrier.cpp @@ -47,29 +47,44 @@ int ObBarrierPieceMsgListener::on_message( // 已经收到所有 piece,发送 sqc 个 whole // 各个 sqc 广播给各自 task if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) { - ObBarrierWholeMsg whole; - whole.op_id_ = ctx.op_id_; - ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { - dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); - if (OB_ISNULL(ch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("null expected", K(ret)); - } else if (OB_FAIL(ch->send(whole, ctx.timeout_ts_))) { - LOG_WARN("fail push data to channel", K(ret)); - } else if (OB_FAIL(ch->flush(true, false))) { - LOG_WARN("fail flush dtl data", K(ret)); - } else { - LOG_DEBUG("dispatched barrier whole msg", - K(idx), K(cnt), K(whole), K(*ch)); - } - } - if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { - LOG_WARN("failed to wait response", K(ret)); + if (OB_FAIL(ctx.send_whole_msg(sqcs))) { + LOG_WARN("fail to send whole msg", K(ret)); } + IGNORE_RETURN ctx.reset_resource(); } return ret; } +int ObBarrierPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) +{ + int ret = OB_SUCCESS; + ObBarrierWholeMsg whole; + whole.op_id_ = op_id_; + ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { + dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); + if (OB_ISNULL(ch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null expected", K(ret)); + } else if (OB_FAIL(ch->send(whole, timeout_ts_))) { + LOG_WARN("fail push data to channel", K(ret)); + } else if (OB_FAIL(ch->flush(true, false))) { + LOG_WARN("fail flush dtl data", K(ret)); + } else { + LOG_DEBUG("dispatched barrier whole msg", + K(idx), K(cnt), K(whole), K(*ch)); + } + } + if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { + LOG_WARN("failed to wait response", K(ret)); + } + return ret; +} + +void ObBarrierPieceMsgCtx::reset_resource() +{ + received_ = 0; +} + int ObBarrierPieceMsgCtx::alloc_piece_msg_ctx(const ObBarrierPieceMsg &pkt, ObPxCoordInfo &, ObExecContext &ctx, diff --git a/src/sql/engine/px/datahub/components/ob_dh_barrier.h b/src/sql/engine/px/datahub/components/ob_dh_barrier.h index bd37c263c1..99441c2d2e 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_barrier.h +++ b/src/sql/engine/px/datahub/components/ob_dh_barrier.h @@ -83,6 +83,8 @@ public: ObBarrierPieceMsgCtx(uint64_t op_id, int64_t task_cnt, int64_t timeout_ts) : ObPieceMsgCtx(op_id, task_cnt, timeout_ts), received_(0) {} ~ObBarrierPieceMsgCtx() = default; + virtual int send_whole_msg(common::ObIArray &sqcs) override; + virtual void reset_resource() override; static int alloc_piece_msg_ctx(const ObBarrierPieceMsg &pkt, ObPxCoordInfo &coord_info, ObExecContext &ctx, diff --git a/src/sql/engine/px/datahub/components/ob_dh_init_channel.cpp b/src/sql/engine/px/datahub/components/ob_dh_init_channel.cpp index 2b5fc7145d..d7e773efc1 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_init_channel.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_init_channel.cpp @@ -33,28 +33,10 @@ int ObInitChannelPieceMsgListener::on_message( // have received all piece from px receive // send whole msg to px transmit if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) { - // get child transmit op's id. - // in current impl. the paired transmit op id = receive op id + 1 - //TODO : https://aone.alibaba-inc.com/task/43312101 - ctx.whole_msg_.op_id_ = ctx.op_id_ + 1; - ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { - dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); - if (OB_ISNULL(ch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("null expected", K(ret)); - } else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) { - LOG_WARN("fail push data to channel", K(ret)); - } else if (OB_FAIL(ch->flush(true, false))) { - LOG_WARN("fail flush dtl data", K(ret)); - } else { - LOG_DEBUG("dispatched winbuf whole msg", - K(idx), K(cnt), K(ctx.whole_msg_), K(*ch)); - } + if (OB_FAIL(ctx.send_whole_msg(sqcs))) { + LOG_WARN("fail to send whole msg", K(ret)); } - if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { - LOG_WARN("failed to wait response", K(ret)); - } - ctx.whole_msg_.reset(); + IGNORE_RETURN ctx.reset_resource(); } return ret; } @@ -91,4 +73,37 @@ int ObInitChannelWholeMsg::assign(const ObInitChannelWholeMsg &other, common::Ob return ret; } -bool ObInitChannelPieceMsgCtx::enable_dh_channel_sync(const bool channel_sync_enabled) { return channel_sync_enabled; } \ No newline at end of file +bool ObInitChannelPieceMsgCtx::enable_dh_channel_sync(const bool channel_sync_enabled) { return channel_sync_enabled; } + +int ObInitChannelPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) +{ + int ret = OB_SUCCESS; + // get child transmit op's id. + // in current impl. the paired transmit op id = receive op id + 1 + //TODO : https://aone.alibaba-inc.com/task/43312101 + whole_msg_.op_id_ = op_id_ + 1; + ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { + dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); + if (OB_ISNULL(ch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null expected", K(ret)); + } else if (OB_FAIL(ch->send(whole_msg_, timeout_ts_))) { + LOG_WARN("fail push data to channel", K(ret)); + } else if (OB_FAIL(ch->flush(true, false))) { + LOG_WARN("fail flush dtl data", K(ret)); + } else { + LOG_DEBUG("dispatched winbuf whole msg", + K(idx), K(cnt), K(whole_msg_), K(*ch)); + } + } + if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { + LOG_WARN("failed to wait response", K(ret)); + } + return ret; +} + +void ObInitChannelPieceMsgCtx::reset_resource() +{ + whole_msg_.reset(); + received_ = 0; +} diff --git a/src/sql/engine/px/datahub/components/ob_dh_init_channel.h b/src/sql/engine/px/datahub/components/ob_dh_init_channel.h index 1dcc1acd2a..e9727b7f4d 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_init_channel.h +++ b/src/sql/engine/px/datahub/components/ob_dh_init_channel.h @@ -71,6 +71,8 @@ public: tenant_id_(tenant_id)/*, whole_msg_()*/ {} ~ObInitChannelPieceMsgCtx() = default; INHERIT_TO_STRING_KV("meta", ObPieceMsgCtx, K_(received)); + virtual int send_whole_msg(common::ObIArray &sqcs) override; + virtual void reset_resource() override; static int alloc_piece_msg_ctx(const ObInitChannelPieceMsg &pkt, ObPxCoordInfo &coord_info, ObExecContext &ctx, @@ -100,4 +102,4 @@ private: } } -#endif /* __OB_SQL_ENG_PX_DH_INIT_CHANNEL_H__ */ \ No newline at end of file +#endif /* __OB_SQL_ENG_PX_DH_INIT_CHANNEL_H__ */ diff --git a/src/sql/engine/px/datahub/components/ob_dh_opt_stats_gather.h b/src/sql/engine/px/datahub/components/ob_dh_opt_stats_gather.h index 27773a849a..cd46dc6157 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_opt_stats_gather.h +++ b/src/sql/engine/px/datahub/components/ob_dh_opt_stats_gather.h @@ -107,6 +107,7 @@ public: ObOptStatsGatherPieceMsgCtx(uint64_t op_id, int64_t task_cnt, int64_t timeout_ts) : ObPieceMsgCtx(op_id, task_cnt, timeout_ts), received_(0), osg_info_() {} ~ObOptStatsGatherPieceMsgCtx() = default; + virtual void reset_resource() {}; static int alloc_piece_msg_ctx(const ObOptStatsGatherPieceMsg &pkt, ObPxCoordInfo &coord_info, ObExecContext &ctx, diff --git a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp index 8c915e61d9..638edc7791 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp @@ -234,6 +234,72 @@ int ObRDWFPieceMsgCtx::formalize_store_row() return ret; } +int ObRDWFPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) +{ + int ret = OB_SUCCESS; + ObOperatorKit *op_kit = exec_ctx_.get_operator_kit(op_id_); + if (NULL == op_kit || NULL == op_kit->spec_ || PHY_WINDOW_FUNCTION != op_kit->spec_->type_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("no window function operator", K(ret), KP(op_kit), K(op_id_)); + } else { + auto wf = static_cast(op_kit->spec_); + if (OB_FAIL(wf->rd_generate_patch(*this))) { + LOG_WARN("calculate range distribution window function final res failed", K(ret)); + } else if (formalize_store_row()) { + LOG_WARN("formalize store row failed", K(ret)); + } else { + LOG_DEBUG("after formalize", K(infos_)); + } + } + ObRDWFWholeMsg *responses = NULL; + if (OB_SUCC(ret)) { + responses = static_cast( + arena_alloc_.alloc(sizeof(ObRDWFWholeMsg) * sqcs.count())); + OV(NULL != responses, OB_ALLOCATE_MEMORY_FAILED); + for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); i++) { + new (&responses[i])ObRDWFWholeMsg(); + } + } + if (OB_SUCC(ret)) { + // order by sqc_id_, thread_id_ + std::sort(infos_.begin(), infos_.end(), [](ObRDWFPartialInfo *l, + ObRDWFPartialInfo *r) { + return std::tie(l->sqc_id_, l->thread_id_) < std::tie(r->sqc_id_, r->thread_id_); + }); + for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); i++) { + auto &sqc = *sqcs.at(i); + auto &msg = responses[i]; + msg.op_id_ = op_id_; + auto it = std::lower_bound(infos_.begin(), infos_.end(), sqc.get_sqc_id(), + [&](ObRDWFPartialInfo *info, int64_t id) + { return info->sqc_id_ < id; }); + if (it == infos_.end() || (*it)->sqc_id_ != sqc.get_sqc_id()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sqc not found", K(ret), K(sqc)); + } else { + while (OB_SUCC(ret) && it != infos_.end() && (*it)->sqc_id_ == sqc.get_sqc_id()) { + OZ(msg.infos_.push_back(*it)); + it++; + } + } + auto ch = sqc.get_qc_channel(); + CK(NULL != ch); + OZ(ch->send(msg, timeout_ts_)); + OZ(ch->flush(true /* wait */, false /* wait response */)); + } + OZ(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs)); + } + for (int64_t i = 0; NULL != responses && i < sqcs.count(); i++) { + responses[i].~ObRDWFWholeMsg(); + } + return ret; +} + +void ObRDWFPieceMsgCtx::reset_resource() +{ + received_ = 0; +} + int ObRDWFWholeMsg::assign(const ObRDWFWholeMsg &msg) { int ret = OB_SUCCESS; @@ -263,64 +329,10 @@ int ObRDWFPieceMsgListener::on_message(ObRDWFPieceMsgCtx &ctx, } if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) { - ObOperatorKit *op_kit = ctx.exec_ctx_.get_operator_kit(ctx.op_id_); - if (NULL == op_kit || NULL == op_kit->spec_ || PHY_WINDOW_FUNCTION != op_kit->spec_->type_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("no window function operator", K(ret), KP(op_kit), K(ctx.op_id_)); - } else { - auto wf = static_cast(op_kit->spec_); - if (OB_FAIL(wf->rd_generate_patch(ctx))) { - LOG_WARN("calculate range distribution window function final res failed", K(ret)); - } else if (ctx.formalize_store_row()) { - LOG_WARN("formalize store row failed", K(ret)); - } else { - LOG_DEBUG("after formalize", K(ctx.infos_)); - } - } - - ObRDWFWholeMsg *responses = NULL; - if (OB_SUCC(ret)) { - responses = static_cast( - ctx.arena_alloc_.alloc(sizeof(ObRDWFWholeMsg) * sqcs.count())); - OV(NULL != responses, OB_ALLOCATE_MEMORY_FAILED); - for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); i++) { - new (&responses[i])ObRDWFWholeMsg(); - } - } - - if (OB_SUCC(ret)) { - // order by sqc_id_, thread_id_ - std::sort(ctx.infos_.begin(), ctx.infos_.end(), [](ObRDWFPartialInfo *l, - ObRDWFPartialInfo *r) { - return std::tie(l->sqc_id_, l->thread_id_) < std::tie(r->sqc_id_, r->thread_id_); - }); - for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); i++) { - auto &sqc = *sqcs.at(i); - auto &msg = responses[i]; - msg.op_id_ = ctx.op_id_; - auto it = std::lower_bound(ctx.infos_.begin(), ctx.infos_.end(), sqc.get_sqc_id(), - [&](ObRDWFPartialInfo *info, int64_t id) - { return info->sqc_id_ < id; }); - if (it == ctx.infos_.end() || (*it)->sqc_id_ != sqc.get_sqc_id()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("sqc not found", K(ret), K(sqc)); - } else { - while (OB_SUCC(ret) && it != ctx.infos_.end() && (*it)->sqc_id_ == sqc.get_sqc_id()) { - OZ(msg.infos_.push_back(*it)); - it++; - } - } - auto ch = sqc.get_qc_channel(); - CK(NULL != ch); - OZ(ch->send(msg, ctx.timeout_ts_)); - OZ(ch->flush(true /* wait */, false /* wait response */)); - } - OZ(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs)); - } - - for (int64_t i = 0; NULL != responses && i < sqcs.count(); i++) { - responses[i].~ObRDWFWholeMsg(); + if (OB_FAIL(ctx.send_whole_msg(sqcs))) { + LOG_WARN("fail to send whole msg", K(ret)); } + IGNORE_RETURN ctx.reset_resource(); } return ret; } diff --git a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.h b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.h index f76dcaf24d..42e1d36f29 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.h +++ b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.h @@ -122,7 +122,8 @@ public: exec_ctx_(exec_ctx), eval_ctx_(exec_ctx) { } - + virtual int send_whole_msg(common::ObIArray &sqcs) override; + virtual void reset_resource() override; static int alloc_piece_msg_ctx(const ObRDWFPieceMsg &pkt, ObPxCoordInfo &coord_info, ObExecContext &ctx, diff --git a/src/sql/engine/px/datahub/components/ob_dh_rollup_key.cpp b/src/sql/engine/px/datahub/components/ob_dh_rollup_key.cpp index 1d705c9793..3c9a1ec182 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_rollup_key.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_rollup_key.cpp @@ -46,28 +46,12 @@ int ObRollupKeyPieceMsgListener::on_message( LOG_TRACE("got a win buf picece msg", "all_got", ctx.received_, "expected", ctx.task_cnt_); } if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) { - // all piece msg has been received - ctx.whole_msg_.op_id_ = ctx.op_id_; if (OB_FAIL(ctx.process_ndv())) { LOG_WARN("failed to process ndv", K(ret)); + } else if (OB_FAIL(ctx.send_whole_msg(sqcs))) { + LOG_WARN("fail to send whole msg", K(ret)); } - ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { - dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); - if (OB_ISNULL(ch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("null expected", K(ret)); - } else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) { - LOG_WARN("fail push data to channel", K(ret)); - } else if (OB_FAIL(ch->flush(true, false))) { - LOG_WARN("fail flush dtl data", K(ret)); - } else { - LOG_DEBUG("dispatched winbuf whole msg", - K(idx), K(cnt), K(ctx.whole_msg_), K(*ch)); - } - if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { - LOG_WARN("failed to wait response", K(ret)); - } - } + IGNORE_RETURN ctx.reset_resource(); } return ret; } @@ -159,6 +143,36 @@ int ObRollupKeyPieceMsgCtx::alloc_piece_msg_ctx(const ObRollupKeyPieceMsg &pkt, return ret; } +int ObRollupKeyPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) +{ + int ret = OB_SUCCESS; + // all piece msg has been received + whole_msg_.op_id_ = op_id_; + ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { + dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); + if (OB_ISNULL(ch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null expected", K(ret)); + } else if (OB_FAIL(ch->send(whole_msg_, timeout_ts_))) { + LOG_WARN("fail push data to channel", K(ret)); + } else if (OB_FAIL(ch->flush(true, false))) { + LOG_WARN("fail flush dtl data", K(ret)); + } else { + LOG_DEBUG("dispatched winbuf whole msg", + K(idx), K(cnt), K(whole_msg_), K(*ch)); + } + if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { + LOG_WARN("failed to wait response", K(ret)); + } + } + return ret; +} + +void ObRollupKeyPieceMsgCtx::reset_resource() +{ + received_ = 0; +} + int ObRollupKeyWholeMsg::assign(const ObRollupKeyWholeMsg &other) { int ret = OB_SUCCESS; diff --git a/src/sql/engine/px/datahub/components/ob_dh_rollup_key.h b/src/sql/engine/px/datahub/components/ob_dh_rollup_key.h index e767e1f567..5c1cf9cc9a 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_rollup_key.h +++ b/src/sql/engine/px/datahub/components/ob_dh_rollup_key.h @@ -99,7 +99,8 @@ public: received_msgs_.reset(); } INHERIT_TO_STRING_KV("meta", ObPieceMsgCtx, K_(received)); - + virtual int send_whole_msg(common::ObIArray &sqcs) override; + virtual void reset_resource() override; static int alloc_piece_msg_ctx(const ObRollupKeyPieceMsg &pkt, ObPxCoordInfo &coord_info, ObExecContext &ctx, diff --git a/src/sql/engine/px/datahub/components/ob_dh_sample.cpp b/src/sql/engine/px/datahub/components/ob_dh_sample.cpp index 41ac416910..beb24a898c 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_sample.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_sample.cpp @@ -574,33 +574,48 @@ int ObDynamicSamplePieceMsgCtx::on_message( // send whole message when all piece received if (OB_SUCC(ret) && received_ == task_cnt_) { - SMART_VAR(ObDynamicSampleWholeMsg, whole) { - whole.op_id_ = op_id_; - if (OB_FAIL(build_whole_msg(whole))) { - LOG_WARN("build sample whole message failed", K(ret), K(*this)); - } - ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { - dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); - if (OB_ISNULL(ch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("null expected", K(ret)); - } else if (OB_FAIL(ch->send(whole, timeout_ts_))) { - LOG_WARN("fail push data to channel", K(ret)); - } else if (OB_FAIL(ch->flush(true, false))) { - LOG_WARN("fail flush dtl data", K(ret)); - } else { - LOG_TRACE("dispatched sample whole msg", - K(idx), K(cnt), K(whole), K(*ch)); - } - } - if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { - LOG_WARN("failed to wait response", K(ret)); + if (OB_FAIL(send_whole_msg(sqcs))) { + LOG_WARN("fail to send whole msg", K(ret)); + } + IGNORE_RETURN reset_resource(); + } + return ret; +} + +int ObDynamicSamplePieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) +{ + int ret = OB_SUCCESS; + SMART_VAR(ObDynamicSampleWholeMsg, whole) { + whole.op_id_ = op_id_; + if (OB_FAIL(build_whole_msg(whole))) { + LOG_WARN("build sample whole message failed", K(ret), K(*this)); + } + ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { + dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); + if (OB_ISNULL(ch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null expected", K(ret)); + } else if (OB_FAIL(ch->send(whole, timeout_ts_))) { + LOG_WARN("fail push data to channel", K(ret)); + } else if (OB_FAIL(ch->flush(true, false))) { + LOG_WARN("fail flush dtl data", K(ret)); + } else { + LOG_TRACE("dispatched sample whole msg", + K(idx), K(cnt), K(whole), K(*ch)); } } + if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { + LOG_WARN("failed to wait response", K(ret)); + } } return ret; } +void ObDynamicSamplePieceMsgCtx::reset_resource() +{ + received_ = 0; +} + int ObDynamicSamplePieceMsgListener::on_message( ObDynamicSamplePieceMsgCtx &ctx, common::ObIArray &sqcs, diff --git a/src/sql/engine/px/datahub/components/ob_dh_sample.h b/src/sql/engine/px/datahub/components/ob_dh_sample.h index 6669af4d7d..a405e986f3 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_sample.h +++ b/src/sql/engine/px/datahub/components/ob_dh_sample.h @@ -111,6 +111,8 @@ public: const SortDef &sort_def); virtual ~ObDynamicSamplePieceMsgCtx() = default; int init(const ObIArray &tablet_ids); + virtual int send_whole_msg(common::ObIArray &sqcs) override; + virtual void reset_resource() override; virtual void destroy(); int process_piece(const ObDynamicSamplePieceMsg &piece); int split_range( diff --git a/src/sql/engine/px/datahub/components/ob_dh_second_stage_reporting_wf.cpp b/src/sql/engine/px/datahub/components/ob_dh_second_stage_reporting_wf.cpp index 3a88c604e6..c03c87ae6a 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_second_stage_reporting_wf.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_second_stage_reporting_wf.cpp @@ -121,6 +121,37 @@ int ObReportingWFPieceMsgCtx::alloc_piece_msg_ctx(const ObReportingWFPieceMsg &p return ret; } +int ObReportingWFPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) +{ + int ret = OB_SUCCESS; + // datahub already received all pieces, will send whole msg to sqc + whole_msg_.op_id_ = op_id_; + // no need to sort here, will use pby_hash_value_array_ to build hash map later + ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { + dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); + if (OB_ISNULL(ch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null expected", K(ret)); + } else if (OB_FAIL(ch->send(whole_msg_, timeout_ts_))) { + LOG_WARN("fail push data to channel", K(ret)); + } else if (OB_FAIL(ch->flush(true /* wait */, false /* wait response */))) { + LOG_WARN("fail flush dtl data", K(ret)); + } else { + LOG_DEBUG("dispatched winbuf whole msg", K(idx), K(cnt), K(whole_msg_), K(*ch)); + } + } + if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { + LOG_WARN("failed to wait response", K(ret)); + } + return ret; +} + +void ObReportingWFPieceMsgCtx::reset_resource() +{ + whole_msg_.reset(); + received_ = 0; +} + int ObReportingWFPieceMsgListener::on_message( ObReportingWFPieceMsgCtx &ctx, common::ObIArray &sqcs, @@ -148,27 +179,10 @@ int ObReportingWFPieceMsgListener::on_message( LOG_TRACE("got a win buf picece msg", "all_got", ctx.received_, "expected", ctx.task_cnt_); } if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) { - // datahub already received all pieces, will send whole msg to sqc - ctx.whole_msg_.op_id_ = ctx.op_id_; - // no need to sort here, will use pby_hash_value_array_ to build hash map later - ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { - dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); - if (OB_ISNULL(ch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("null expected", K(ret)); - } else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) { - LOG_WARN("fail push data to channel", K(ret)); - } else if (OB_FAIL(ch->flush(true /* wait */, false /* wait response */))) { - LOG_WARN("fail flush dtl data", K(ret)); - } else { - LOG_DEBUG("dispatched winbuf whole msg", K(idx), K(cnt), K(ctx.whole_msg_), K(*ch)); - } + if (OB_FAIL(ctx.send_whole_msg(sqcs))) { + LOG_WARN("fail to send whole msg", K(ret)); } - if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { - LOG_WARN("failed to wait response", K(ret)); - } - ctx.received_ = 0; - ctx.whole_msg_.reset(); + IGNORE_RETURN ctx.reset_resource(); } return ret; } diff --git a/src/sql/engine/px/datahub/components/ob_dh_second_stage_reporting_wf.h b/src/sql/engine/px/datahub/components/ob_dh_second_stage_reporting_wf.h index 6eff4fdb2a..4c4347d8d0 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_second_stage_reporting_wf.h +++ b/src/sql/engine/px/datahub/components/ob_dh_second_stage_reporting_wf.h @@ -82,6 +82,8 @@ public: : ObPieceMsgCtx(op_id, task_cnt, timeout_ts), received_(0), tenant_id_(tenant_id), whole_msg_() {} virtual ~ObReportingWFPieceMsgCtx() = default; + virtual int send_whole_msg(common::ObIArray &sqcs) override; + virtual void reset_resource() override; INHERIT_TO_STRING_KV("meta", ObPieceMsgCtx, K_(received)); static int alloc_piece_msg_ctx(const ObReportingWFPieceMsg &pkt, ObPxCoordInfo &coord_info, diff --git a/src/sql/engine/px/datahub/components/ob_dh_winbuf.cpp b/src/sql/engine/px/datahub/components/ob_dh_winbuf.cpp index 54cebe5197..b0b1c06fbd 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_winbuf.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_winbuf.cpp @@ -61,29 +61,10 @@ int ObWinbufPieceMsgListener::on_message( // 已经收到所有 piece,发送 sqc 个 whole // 各个 sqc 广播给各自 task if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) { - ctx.whole_msg_.is_datum_ = pkt.is_datum_; - ctx.whole_msg_.op_id_ = ctx.op_id_; - ctx.whole_msg_.is_empty_ = (!ctx.whole_msg_.row_store_.is_inited()) && - (!ctx.whole_msg_.datum_store_.is_inited()); - ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { - dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); - if (OB_ISNULL(ch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("null expected", K(ret)); - } else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) { - LOG_WARN("fail push data to channel", K(ret)); - } else if (OB_FAIL(ch->flush(true, false))) { - LOG_WARN("fail flush dtl data", K(ret)); - } else { - LOG_DEBUG("dispatched winbuf whole msg", - K(idx), K(cnt), K(ctx.whole_msg_), K(*ch)); - } + if (OB_FAIL(ctx.send_whole_msg(sqcs))) { + LOG_WARN("fail to send whole msg", K(ret)); } - if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { - LOG_WARN("failed to wait response", K(ret)); - } - ctx.whole_msg_.reset(); - ctx.received_ = 0; + IGNORE_RETURN ctx.reset_resource(); } return ret; } @@ -112,6 +93,38 @@ int ObWinbufPieceMsgCtx::alloc_piece_msg_ctx(const ObWinbufPieceMsg &pkt, return ret; } +int ObWinbufPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) +{ + int ret = OB_SUCCESS; + whole_msg_.is_datum_ = true; + whole_msg_.op_id_ = op_id_; + whole_msg_.is_empty_ = (!whole_msg_.datum_store_.is_inited()); + ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { + dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); + if (OB_ISNULL(ch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null expected", K(ret)); + } else if (OB_FAIL(ch->send(whole_msg_, timeout_ts_))) { + LOG_WARN("fail push data to channel", K(ret)); + } else if (OB_FAIL(ch->flush(true, false))) { + LOG_WARN("fail flush dtl data", K(ret)); + } else { + LOG_DEBUG("dispatched winbuf whole msg", + K(idx), K(cnt), K(whole_msg_), K(*ch)); + } + } + if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { + LOG_WARN("failed to wait response", K(ret)); + } + return ret; +} + +void ObWinbufPieceMsgCtx::reset_resource() +{ + whole_msg_.reset(); + received_ = 0; +} + namespace ob_dh_winbuf { template diff --git a/src/sql/engine/px/datahub/components/ob_dh_winbuf.h b/src/sql/engine/px/datahub/components/ob_dh_winbuf.h index 512bcda8fa..55d221db02 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_winbuf.h +++ b/src/sql/engine/px/datahub/components/ob_dh_winbuf.h @@ -112,6 +112,8 @@ public: ObExecContext &ctx, int64_t task_cnt, ObPieceMsgCtx *&msg_ctx); + virtual int send_whole_msg(common::ObIArray &sqcs) override; + virtual void reset_resource() override; int received_; // 已经收到的 piece 数量 int64_t tenant_id_; ObWinbufWholeMsg whole_msg_; diff --git a/src/sql/engine/px/datahub/ob_dh_msg_ctx.h b/src/sql/engine/px/datahub/ob_dh_msg_ctx.h index 831d53f028..dad7cfcfab 100644 --- a/src/sql/engine/px/datahub/ob_dh_msg_ctx.h +++ b/src/sql/engine/px/datahub/ob_dh_msg_ctx.h @@ -26,6 +26,8 @@ public: ObPieceMsgCtx(uint64_t op_id, int64_t task_cnt, int64_t timeout_ts) : op_id_(op_id), task_cnt_(task_cnt), timeout_ts_(timeout_ts) {} virtual ~ObPieceMsgCtx() {} + virtual int send_whole_msg(common::ObIArray &sqcs) { return OB_SUCCESS; }; + virtual void reset_resource() = 0; VIRTUAL_TO_STRING_KV(K_(op_id), K_(task_cnt)); virtual void destroy() {} uint64_t op_id_; // 哪个算子使用 datahub 服务