diff --git a/src/sql/engine/aggregate/ob_merge_groupby_op.cpp b/src/sql/engine/aggregate/ob_merge_groupby_op.cpp index 2d50e2265b..4d988dd1b0 100644 --- a/src/sql/engine/aggregate/ob_merge_groupby_op.cpp +++ b/src/sql/engine/aggregate/ob_merge_groupby_op.cpp @@ -482,7 +482,7 @@ int ObMergeGroupByOp::process_parallel_rollup_key(ObRollupNDVInfo &ndv_info) piece.source_dfo_id_ = proxy.get_dfo_id(); piece.target_dfo_id_ = proxy.get_dfo_id(); piece.rollup_ndv_ = ndv_info; - if (OB_FAIL(proxy.get_dh_msg(MY_SPEC.id_, + if (OB_FAIL(proxy.get_dh_msg_sync(MY_SPEC.id_, dtl::DH_ROLLUP_KEY_WHOLE_MSG, piece, temp_whole_msg, diff --git a/src/sql/engine/pdml/static/ob_pdml_op_data_driver.cpp b/src/sql/engine/pdml/static/ob_pdml_op_data_driver.cpp index ec694f8fd0..ee8f983a96 100644 --- a/src/sql/engine/pdml/static/ob_pdml_op_data_driver.cpp +++ b/src/sql/engine/pdml/static/ob_pdml_op_data_driver.cpp @@ -324,11 +324,11 @@ int ObPDMLOpDataDriver::barrier(ObExecContext &ctx) piece.op_id_ = op_id_; piece.thread_id_ = GETTID(); const ObBarrierWholeMsg *whole = nullptr; - if (OB_FAIL(proxy.get_dh_msg(op_id_, - dtl::DH_BARRIER_WHOLE_MSG, - piece, - whole, - ctx.get_physical_plan_ctx()->get_timeout_timestamp()))) { + if (OB_FAIL(proxy.get_dh_msg_sync(op_id_, + dtl::DH_BARRIER_WHOLE_MSG, + piece, + whole, + ctx.get_physical_plan_ctx()->get_timeout_timestamp()))) { LOG_WARN("fail get barrier msg", K(ret)); } } diff --git a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp index 638edc7791..2bbaffca63 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp @@ -298,6 +298,8 @@ int ObRDWFPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) void ObRDWFPieceMsgCtx::reset_resource() { received_ = 0; + infos_.reset(); + arena_alloc_.reset(); } int ObRDWFWholeMsg::assign(const ObRDWFWholeMsg &msg) diff --git a/src/sql/engine/px/datahub/ob_dh_msg_ctx.h b/src/sql/engine/px/datahub/ob_dh_msg_ctx.h index dad7cfcfab..717f3d0b78 100644 --- a/src/sql/engine/px/datahub/ob_dh_msg_ctx.h +++ b/src/sql/engine/px/datahub/ob_dh_msg_ctx.h @@ -49,12 +49,13 @@ public: } } ctxs_.reset(); + types_.reset(); } - int find_piece_ctx(uint64_t op_id, ObPieceMsgCtx *&ctx) + int find_piece_ctx(uint64_t op_id, dtl::ObDtlMsgType type, ObPieceMsgCtx *&ctx) { int ret = common::OB_ENTRY_NOT_EXIST;; for (int i = 0; i < ctxs_.count(); ++i) { - if (ctxs_.at(i)->op_id_ == op_id) { + if (ctxs_.at(i)->op_id_ == op_id && types_.at(i) == type) { ret = common::OB_SUCCESS; ctx = ctxs_.at(i); break; @@ -62,12 +63,17 @@ public: } return ret; } - int add_piece_ctx(ObPieceMsgCtx *ctx) + int add_piece_ctx(ObPieceMsgCtx *ctx, dtl::ObDtlMsgType type) { - return ctxs_.push_back(ctx); + int ret = OB_SUCCESS; + if (OB_FAIL(ctxs_.push_back(ctx))) { + } else if (OB_FAIL(types_.push_back(type))) { + } + return ret; } private: common::ObSEArray ctxs_; + common::ObSEArray types_; }; } diff --git a/src/sql/engine/px/datahub/ob_dh_msg_provider.h b/src/sql/engine/px/datahub/ob_dh_msg_provider.h index 5df3d86846..70008b045d 100644 --- a/src/sql/engine/px/datahub/ob_dh_msg_provider.h +++ b/src/sql/engine/px/datahub/ob_dh_msg_provider.h @@ -25,20 +25,27 @@ namespace sql class ObPxDatahubDataProvider { public: + ObPxDatahubDataProvider() + : op_id_(-1), msg_type_(dtl::TESTING), msg_set_(false), dh_msg_cnt_(0), rescan_cnt_(0) + { + } virtual int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) = 0; virtual void reset() {} TO_STRING_KV(K_(op_id), K_(msg_type)); uint64_t op_id_; // 注册本 provider 的算子 id,用于 provder 数组里寻址对应 provider dtl::ObDtlMsgType msg_type_; + bool msg_set_; + volatile int64_t dh_msg_cnt_; + volatile int64_t rescan_cnt_; }; template class ObWholeMsgProvider : public ObPxDatahubDataProvider { public: - ObWholeMsgProvider() : msg_set_(false) {} + ObWholeMsgProvider() {} virtual ~ObWholeMsgProvider() = default; - virtual void reset() override { msg_.reset(); msg_set_ = false; } + virtual void reset() override { msg_.reset();} int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) { int ret = OB_SUCCESS; @@ -83,7 +90,6 @@ private: return ret; } private: - bool msg_set_; T msg_; common::ObThreadCond msg_ready_cond_; }; diff --git a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp index a33538dd10..a9e03ce3a2 100644 --- a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp @@ -1025,7 +1025,7 @@ int ObPxTransmitOp::do_datahub_dynamic_sample(int64_t op_id, ObDynamicSamplePiec bool send_piece = true; if (OB_FAIL(proxy.make_sqc_sample_piece_msg(piece_msg, send_piece))) { LOG_WARN("fail to make sqc sample piece msg", K(ret)); - } else if (OB_FAIL(proxy.get_dh_msg(op_id, + } else if (OB_FAIL(proxy.get_dh_msg_sync(op_id, DH_DYNAMIC_SAMPLE_WHOLE_MSG, proxy.get_piece_sample_msg(), temp_whole_msg, diff --git a/src/sql/engine/px/ob_px_scheduler.cpp b/src/sql/engine/px/ob_px_scheduler.cpp index c44a9efc25..3d8b418356 100644 --- a/src/sql/engine/px/ob_px_scheduler.cpp +++ b/src/sql/engine/px/ob_px_scheduler.cpp @@ -76,7 +76,7 @@ public: } else if (OB_ISNULL(source_dfo) || OB_ISNULL(target_dfo)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("NULL ptr or null session ptr", KP(source_dfo), KP(target_dfo), K(pkt), K(ret)); - } else if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.find_piece_ctx(pkt.op_id_, piece_ctx))) { + } else if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.find_piece_ctx(pkt.op_id_, pkt.type(), piece_ctx))) { // 如果找不到则创建一个 ctx // NOTE: 这里新建一个 piece_ctx 的方式不会出现并发问题, // 因为 QC 是单线程消息循环,逐个处理 SQC 发来的消息 @@ -86,7 +86,7 @@ public: source_dfo->get_total_task_count(), piece_ctx))) { LOG_WARN("fail to alloc piece msg", K(ret)); } else if (nullptr != piece_ctx) { - if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.add_piece_ctx(piece_ctx))) { + if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.add_piece_ctx(piece_ctx, pkt.type()))) { LOG_WARN("fail add barrier piece ctx", K(ret)); } } diff --git a/src/sql/engine/px/ob_px_sqc_proxy.cpp b/src/sql/engine/px/ob_px_sqc_proxy.cpp index 76cad037f0..e5818653c0 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.cpp +++ b/src/sql/engine/px/ob_px_sqc_proxy.cpp @@ -25,6 +25,8 @@ using namespace oceanbase::common; using namespace oceanbase::sql; using namespace oceanbase::sql::dtl; +#define BREAK_TASK_CNT(a) ((a) + 1) + int ObBloomFilterSendCtx::generate_filter_indexes( int64_t each_group_size, int64_t channel_count) @@ -638,3 +640,32 @@ int64_t ObPxSQCProxy::get_query_timeout_ts() } int64_t ObPxSQCProxy::get_task_count() const { return sqc_ctx_.get_task_count(); } + +int ObPxSQCProxy::sync_wait_all(ObPxDatahubDataProvider &provider) +{ + int ret = OB_SUCCESS; + const int64_t task_cnt = get_task_count(); + const int64_t idx = ATOMIC_AAF(&provider.dh_msg_cnt_, 1); + const int64_t curr_rescan_cnt = provider.rescan_cnt_ + 1; + int64_t loop_cnt = 0; + // The whole message should be reset in next rescan, we reset it after last piece msg + // firstly do sync wait until all piece threads are in loop + do { + ++loop_cnt; + if (task_cnt == idx % (BREAK_TASK_CNT(task_cnt))) { // last thread + provider.msg_set_ = false; + provider.reset(); // reset whole message + ATOMIC_AAF(&provider.rescan_cnt_, 1); + ATOMIC_AAF(&provider.dh_msg_cnt_, 1); // to break the loop + } else { + ob_usleep(1000); + if (0 == loop_cnt % 64) { + if (OB_FAIL(THIS_WORKER.check_status())) { + LOG_WARN("failed to sync wait", K(ret), K(task_cnt), K(provider.dh_msg_cnt_)); + } + } + } + } while (OB_SUCC(ret) && provider.dh_msg_cnt_ < BREAK_TASK_CNT(task_cnt) * curr_rescan_cnt); + + return ret; +} diff --git a/src/sql/engine/px/ob_px_sqc_proxy.h b/src/sql/engine/px/ob_px_sqc_proxy.h index c63f30fc4f..dc50cc2d3e 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.h +++ b/src/sql/engine/px/ob_px_sqc_proxy.h @@ -126,10 +126,8 @@ public: int64_t timeout_ts, bool is_transmit); - // for peek datahub whole msg template - int get_dh_msg( - uint64_t op_id, + int get_dh_msg_sync(uint64_t op_id, dtl::ObDtlMsgType msg_type, const PieceMsg &piece, const WholeMsg *&whole, @@ -137,6 +135,16 @@ public: bool send_piece = true, bool need_wait_whole_msg = true); + template + int get_dh_msg(uint64_t op_id, + dtl::ObDtlMsgType msg_type, + const PieceMsg &piece, + const WholeMsg *&whole, + int64_t timeout_ts, + bool send_piece = true, + bool need_wait_whole_msg = true); + + // 用于 worker 汇报执行结果 int report_task_finish_status(int64_t task_idx, int rc); @@ -168,6 +176,9 @@ public: int append_bf_send_ctx(int64_t &bf_send_ctx_idx); int64_t get_task_count() const; common::ObThreadCond &get_msg_ready_cond() { return msg_ready_cond_; } + int64_t get_dh_msg_cnt() const; + void atomic_inc_dh_msg_cnt(); + int64_t atomic_add_and_fetch_dh_msg_cnt(); private: /* functions */ int setup_loop_proc(ObSqcCtx &sqc_ctx); @@ -180,6 +191,18 @@ private: int get_whole_msg_provider(uint64_t op_id, dtl::ObDtlMsgType msg_type, ObPxDatahubDataProvider *&provider); int64_t get_process_query_time(); int64_t get_query_timeout_ts(); + int sync_wait_all(ObPxDatahubDataProvider &provider); + // for peek datahub whole msg + template + int inner_get_dh_msg( + uint64_t op_id, + dtl::ObDtlMsgType msg_type, + const PieceMsg &piece, + const WholeMsg *&whole, + int64_t timeout_ts, + bool need_sync, + bool send_piece, + bool need_wait_whole_msg); /* variables */ public: ObSqcCtx &sqc_ctx_; @@ -201,12 +224,41 @@ private: template -int ObPxSQCProxy::get_dh_msg( +int ObPxSQCProxy::get_dh_msg_sync(uint64_t op_id, + dtl::ObDtlMsgType msg_type, + const PieceMsg &piece, + const WholeMsg *&whole, + int64_t timeout_ts, + bool send_piece, + bool need_wait_whole_msg) +{ + return inner_get_dh_msg(op_id, msg_type, piece, whole, + timeout_ts, true, send_piece, + need_wait_whole_msg); +} + +template +int ObPxSQCProxy::get_dh_msg(uint64_t op_id, + dtl::ObDtlMsgType msg_type, + const PieceMsg &piece, + const WholeMsg *&whole, + int64_t timeout_ts, + bool send_piece, + bool need_wait_whole_msg) +{ + return inner_get_dh_msg(op_id, msg_type, piece, whole, + timeout_ts, false, send_piece, + need_wait_whole_msg); +} + +template +int ObPxSQCProxy::inner_get_dh_msg( uint64_t op_id, dtl::ObDtlMsgType msg_type, const PieceMsg &piece, const WholeMsg *&whole, int64_t timeout_ts, + bool need_sync, bool send_piece /*= true*/, bool need_wait_whole_msg /*= true*/) { @@ -214,6 +266,8 @@ int ObPxSQCProxy::get_dh_msg( ObPxDatahubDataProvider *provider = nullptr; if (OB_FAIL(get_whole_msg_provider(op_id, msg_type, provider))) { SQL_LOG(WARN, "fail get provider", K(ret)); + } else if (need_sync && OB_FAIL(sync_wait_all(*provider))) { + SQL_LOG(WARN, "failed to sync wait", K(ret)); } else { if (send_piece) { ObLockGuard lock_guard(dtl_lock_); @@ -238,7 +292,7 @@ int ObPxSQCProxy::get_dh_msg( ret = process_dtl_msg(timeout_ts); SQL_LOG(DEBUG, "process dtl msg done", K(ret)); } - if (OB_SUCC(ret)) { + if (OB_EAGAIN == ret || OB_SUCCESS == ret) { const dtl::ObDtlMsg *msg = nullptr; if (OB_FAIL(p->get_msg_nonblock(msg, timeout_ts))) { SQL_LOG(TRACE, "fail get msg", K(timeout_ts), K(ret)); diff --git a/src/sql/engine/px/ob_sqc_ctx.cpp b/src/sql/engine/px/ob_sqc_ctx.cpp index 02ae590900..9525342ecd 100644 --- a/src/sql/engine/px/ob_sqc_ctx.cpp +++ b/src/sql/engine/px/ob_sqc_ctx.cpp @@ -36,7 +36,7 @@ ObSqcCtx::ObSqcCtx(ObPxRpcInitSqcArgs &sqc_arg) : msg_loop_(), interrupted_(false), bf_ch_provider_(sqc_proxy_.get_msg_ready_cond()), px_bloom_filter_msg_proc_(msg_proc_), - opt_stats_gather_whole_msg_proc_(msg_proc_) {} + opt_stats_gather_whole_msg_proc_(msg_proc_){} int ObSqcCtx::add_whole_msg_provider(uint64_t op_id, dtl::ObDtlMsgType msg_type, ObPxDatahubDataProvider &provider) { diff --git a/src/sql/engine/window_function/ob_window_function_op.cpp b/src/sql/engine/window_function/ob_window_function_op.cpp index ba3f1d04b8..d97439681f 100644 --- a/src/sql/engine/window_function/ob_window_function_op.cpp +++ b/src/sql/engine/window_function/ob_window_function_op.cpp @@ -2368,7 +2368,7 @@ int ObWindowFunctionOp::rd_fetch_patch() const ObRDWFWholeMsg *whole_msg = NULL; if (OB_FAIL(ret)) { - } else if (OB_FAIL(handler->get_sqc_proxy().get_dh_msg( + } else if (OB_FAIL(handler->get_sqc_proxy().get_dh_msg_sync( MY_SPEC.id_, dtl::DH_RANGE_DIST_WF_PIECE_MSG, piece_msg, whole_msg, ctx_.get_physical_plan_ctx()->get_timeout_timestamp()))) { LOG_WARN("get range distribute window function msg failed", K(ret)); @@ -3169,7 +3169,7 @@ int ObWindowFunctionOp::get_participator_whole_msg( piece.target_dfo_id_ = proxy.get_dfo_id(); piece.pby_hash_value_array_ = pby_hash_value_array; if (OB_SUCC(ret)) { - if (OB_FAIL(proxy.get_dh_msg(MY_SPEC.id_, dtl::DH_SECOND_STAGE_REPORTING_WF_WHOLE_MSG, + if (OB_FAIL(proxy.get_dh_msg_sync(MY_SPEC.id_, dtl::DH_SECOND_STAGE_REPORTING_WF_WHOLE_MSG, piece, temp_whole_msg, ctx_.get_physical_plan_ctx()->get_timeout_timestamp()))) { LOG_WARN("fail to get reporting wf whole msg", K(ret), K(piece), KPC(temp_whole_msg)); } else if (OB_ISNULL(temp_whole_msg)) {