diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index bb9c66e776..16d208d8bc 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -638,7 +638,7 @@ ob_set_subtarget(ob_sql engine_px engine/px/exchange/ob_row_heap.cpp engine/px/exchange/ob_transmit_op.cpp engine/px/datahub/components/ob_dh_barrier.cpp - engine/px/datahub/components/ob_dh_rolluo_key.cpp + engine/px/datahub/components/ob_dh_rollup_key.cpp engine/px/datahub/components/ob_dh_sample.cpp engine/px/datahub/components/ob_dh_winbuf.cpp engine/px/datahub/components/ob_dh_range_dist_wf.cpp diff --git a/src/sql/engine/px/datahub/components/ob_dh_rolluo_key.cpp b/src/sql/engine/px/datahub/components/ob_dh_rolluo_key.cpp index d0e2f4cca0..e69de29bb2 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_rolluo_key.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_rolluo_key.cpp @@ -1,168 +0,0 @@ -/** - * Copyright (c) 2021 OceanBase - * OceanBase CE is licensed under Mulan PubL v2. - * You can use this software according to the terms and conditions of the Mulan PubL v2. - * You may obtain a copy of Mulan PubL v2 at: - * http://license.coscl.org.cn/MulanPubL-2.0 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PubL v2 for more details. - */ - -#define USING_LOG_PREFIX SQL_ENG -#include "sql/engine/px/datahub/components/ob_dh_barrier.h" -#include "sql/engine/px/datahub/ob_dh_msg_ctx.h" -#include "sql/engine/px/ob_dfo.h" -#include "sql/engine/px/ob_px_util.h" -#include "sql/engine/px/datahub/ob_dh_msg.h" -#include "sql/engine/px/datahub/components/ob_dh_rollup_key.h" - -using namespace oceanbase::sql; -using namespace oceanbase::common; - -OB_SERIALIZE_MEMBER(ObRollupNDVInfo, ndv_, n_keys_, dop_, max_keys_); -OB_SERIALIZE_MEMBER((ObRollupKeyPieceMsg, ObDatahubPieceMsg), rollup_ndv_); -OB_SERIALIZE_MEMBER((ObRollupKeyWholeMsg, ObDatahubWholeMsg), rollup_ndv_); - -int ObRollupKeyPieceMsgListener::on_message( - ObRollupKeyPieceMsgCtx &ctx, - common::ObIArray &sqcs, - const ObRollupKeyPieceMsg &pkt) -{ - int ret = OB_SUCCESS; - if (pkt.op_id_ != ctx.op_id_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected piece msg", K(pkt), K(ctx)); - } else if (ctx.received_ >= ctx.task_cnt_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("should not receive any more pkt. already get all pkt expected", - K(pkt), K(ctx)); - } else if (OB_FAIL(ctx.received_msgs_.push_back(pkt))) { - LOG_WARN("failed to push back pkt", K(pkt), K(ret)); - } - if (OB_SUCC(ret)) { - ctx.received_++; - LOG_TRACE("got a win buf picece msg", "all_got", ctx.received_, "expected", ctx.task_cnt_); - } - if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) { - // all piece msg has been received - ctx.whole_msg_.op_id_ = ctx.op_id_; - if (OB_FAIL(ctx.process_ndv())) { - LOG_WARN("failed to process ndv", K(ret)); - } - ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { - dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); - if (OB_ISNULL(ch)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("null expected", K(ret)); - } else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) { - LOG_WARN("fail push data to channel", K(ret)); - } else if (OB_FAIL(ch->flush(true, false))) { - LOG_WARN("fail flush dtl data", K(ret)); - } else { - LOG_DEBUG("dispatched winbuf whole msg", - K(idx), K(cnt), K(ctx.whole_msg_), K(*ch)); - } - if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { - LOG_WARN("failed to wait response", K(ret)); - } - } - ctx.destroy(); - } - return ret; -} - -// find keys that ndv >> dop -int ObRollupKeyPieceMsgCtx::process_ndv() -{ - int ret = OB_SUCCESS; - // analyze all rollup keys and get optimal keys that make the data evenly distributed - int64_t dop = 0; - ObRollupNDVInfo optimal_rollup_ndv; - ObRollupNDVInfo max_rollup_ndv; - optimal_rollup_ndv.n_keys_ = INT64_MAX; - max_rollup_ndv.n_keys_ = 0; - for (int64_t i = 0; OB_SUCC(ret) && i < received_msgs_.count(); ++i) { - ObRollupNDVInfo &rollup_ndv = received_msgs_.at(i).rollup_ndv_; - if (0 == dop) { - dop = rollup_ndv.dop_; - } else if (dop != rollup_ndv.dop_) { - LOG_WARN("unexpected status: dop is not match", K(dop), K(rollup_ndv.dop_)); - } - if (rollup_ndv.ndv_ >= rollup_ndv.dop_ * FAR_GREATER_THAN_RATIO && - optimal_rollup_ndv.n_keys_ > rollup_ndv.n_keys_) { - optimal_rollup_ndv.n_keys_ = rollup_ndv.n_keys_; - optimal_rollup_ndv.ndv_ = rollup_ndv.ndv_; - optimal_rollup_ndv.dop_ = rollup_ndv.dop_; - optimal_rollup_ndv.max_keys_ = rollup_ndv.max_keys_; - } - // set max - if (max_rollup_ndv.n_keys_ < rollup_ndv.n_keys_) { - max_rollup_ndv.n_keys_ = rollup_ndv.n_keys_; - } - if (max_rollup_ndv.ndv_ < rollup_ndv.ndv_) { - max_rollup_ndv.ndv_ = rollup_ndv.ndv_; - } - if (max_rollup_ndv.dop_ < rollup_ndv.dop_) { - max_rollup_ndv.dop_ = rollup_ndv.dop_; - } - if (max_rollup_ndv.max_keys_ < rollup_ndv.max_keys_) { - max_rollup_ndv.max_keys_ = rollup_ndv.max_keys_; - } - } - if (INT64_MAX == optimal_rollup_ndv.n_keys_) { - // can't found ndv that ndv >> dop - optimal_rollup_ndv = max_rollup_ndv; - } - if (0 == optimal_rollup_ndv.n_keys_) { - // it may has no data - optimal_rollup_ndv = max_rollup_ndv; - } - whole_msg_.rollup_ndv_ = optimal_rollup_ndv; - if (OB_SUCC(ret)) { - // set partial rollup keys - ret = E(EventTable::EN_ROLLUP_ADAPTIVE_KEY_NUM) ret; - if (OB_FAIL(ret)) { - whole_msg_.rollup_ndv_.n_keys_ = (-ret); - } - ret = OB_SUCCESS; - } - // FIXME: now use max_keys - // three stage only use max_keys - if (0 < max_rollup_ndv.max_keys_) { - whole_msg_.rollup_ndv_.n_keys_ = max_rollup_ndv.max_keys_; - } - return ret; -} - -int ObRollupKeyPieceMsgCtx::alloc_piece_msg_ctx(const ObRollupKeyPieceMsg &pkt, - ObPxCoordInfo &, - ObExecContext &ctx, - int64_t task_cnt, - ObPieceMsgCtx *&msg_ctx) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(ctx.get_my_session()) || - OB_ISNULL(ctx.get_physical_plan_ctx())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("session is null or physical plan ctx is null", K(ret)); - } else { - void *buf = ctx.get_allocator().alloc(sizeof(ObRollupKeyPieceMsgCtx)); - if (OB_ISNULL(buf)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - } else { - msg_ctx = new (buf) ObRollupKeyPieceMsgCtx(pkt.op_id_, task_cnt, - ctx.get_physical_plan_ctx()->get_timeout_timestamp(), - ctx.get_my_session()->get_effective_tenant_id()); - } - } - return ret; -} - -int ObRollupKeyWholeMsg::assign(const ObRollupKeyWholeMsg &other) -{ - int ret = OB_SUCCESS; - rollup_ndv_ = other.rollup_ndv_; - return ret; -} diff --git a/src/sql/engine/px/datahub/components/ob_dh_rollup_key.cpp b/src/sql/engine/px/datahub/components/ob_dh_rollup_key.cpp new file mode 100644 index 0000000000..d6e1b00c9c --- /dev/null +++ b/src/sql/engine/px/datahub/components/ob_dh_rollup_key.cpp @@ -0,0 +1,167 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SQL_ENG +#include "sql/engine/px/datahub/components/ob_dh_barrier.h" +#include "sql/engine/px/datahub/ob_dh_msg_ctx.h" +#include "sql/engine/px/ob_dfo.h" +#include "sql/engine/px/ob_px_util.h" +#include "sql/engine/px/datahub/ob_dh_msg.h" +#include "sql/engine/px/datahub/components/ob_dh_rollup_key.h" + +using namespace oceanbase::sql; +using namespace oceanbase::common; + +OB_SERIALIZE_MEMBER(ObRollupNDVInfo, ndv_, n_keys_, dop_, max_keys_); +OB_SERIALIZE_MEMBER((ObRollupKeyPieceMsg, ObDatahubPieceMsg), rollup_ndv_); +OB_SERIALIZE_MEMBER((ObRollupKeyWholeMsg, ObDatahubWholeMsg), rollup_ndv_); + +int ObRollupKeyPieceMsgListener::on_message( + ObRollupKeyPieceMsgCtx &ctx, + common::ObIArray &sqcs, + const ObRollupKeyPieceMsg &pkt) +{ + int ret = OB_SUCCESS; + if (pkt.op_id_ != ctx.op_id_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected piece msg", K(pkt), K(ctx)); + } else if (ctx.received_ >= ctx.task_cnt_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("should not receive any more pkt. already get all pkt expected", + K(pkt), K(ctx)); + } else if (OB_FAIL(ctx.received_msgs_.push_back(pkt))) { + LOG_WARN("failed to push back pkt", K(pkt), K(ret)); + } + if (OB_SUCC(ret)) { + ctx.received_++; + LOG_TRACE("got a win buf picece msg", "all_got", ctx.received_, "expected", ctx.task_cnt_); + } + if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) { + // all piece msg has been received + ctx.whole_msg_.op_id_ = ctx.op_id_; + if (OB_FAIL(ctx.process_ndv())) { + LOG_WARN("failed to process ndv", K(ret)); + } + ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) { + dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel(); + if (OB_ISNULL(ch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null expected", K(ret)); + } else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) { + LOG_WARN("fail push data to channel", K(ret)); + } else if (OB_FAIL(ch->flush(true, false))) { + LOG_WARN("fail flush dtl data", K(ret)); + } else { + LOG_DEBUG("dispatched winbuf whole msg", + K(idx), K(cnt), K(ctx.whole_msg_), K(*ch)); + } + if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) { + LOG_WARN("failed to wait response", K(ret)); + } + } + } + return ret; +} + +// find keys that ndv >> dop +int ObRollupKeyPieceMsgCtx::process_ndv() +{ + int ret = OB_SUCCESS; + // analyze all rollup keys and get optimal keys that make the data evenly distributed + int64_t dop = 0; + ObRollupNDVInfo optimal_rollup_ndv; + ObRollupNDVInfo max_rollup_ndv; + optimal_rollup_ndv.n_keys_ = INT64_MAX; + max_rollup_ndv.n_keys_ = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < received_msgs_.count(); ++i) { + ObRollupNDVInfo &rollup_ndv = received_msgs_.at(i).rollup_ndv_; + if (0 == dop) { + dop = rollup_ndv.dop_; + } else if (dop != rollup_ndv.dop_) { + LOG_WARN("unexpected status: dop is not match", K(dop), K(rollup_ndv.dop_)); + } + if (rollup_ndv.ndv_ >= rollup_ndv.dop_ * FAR_GREATER_THAN_RATIO && + optimal_rollup_ndv.n_keys_ > rollup_ndv.n_keys_) { + optimal_rollup_ndv.n_keys_ = rollup_ndv.n_keys_; + optimal_rollup_ndv.ndv_ = rollup_ndv.ndv_; + optimal_rollup_ndv.dop_ = rollup_ndv.dop_; + optimal_rollup_ndv.max_keys_ = rollup_ndv.max_keys_; + } + // set max + if (max_rollup_ndv.n_keys_ < rollup_ndv.n_keys_) { + max_rollup_ndv.n_keys_ = rollup_ndv.n_keys_; + } + if (max_rollup_ndv.ndv_ < rollup_ndv.ndv_) { + max_rollup_ndv.ndv_ = rollup_ndv.ndv_; + } + if (max_rollup_ndv.dop_ < rollup_ndv.dop_) { + max_rollup_ndv.dop_ = rollup_ndv.dop_; + } + if (max_rollup_ndv.max_keys_ < rollup_ndv.max_keys_) { + max_rollup_ndv.max_keys_ = rollup_ndv.max_keys_; + } + } + if (INT64_MAX == optimal_rollup_ndv.n_keys_) { + // can't found ndv that ndv >> dop + optimal_rollup_ndv = max_rollup_ndv; + } + if (0 == optimal_rollup_ndv.n_keys_) { + // it may has no data + optimal_rollup_ndv = max_rollup_ndv; + } + whole_msg_.rollup_ndv_ = optimal_rollup_ndv; + if (OB_SUCC(ret)) { + // set partial rollup keys + ret = E(EventTable::EN_ROLLUP_ADAPTIVE_KEY_NUM) ret; + if (OB_FAIL(ret)) { + whole_msg_.rollup_ndv_.n_keys_ = (-ret); + } + ret = OB_SUCCESS; + } + // FIXME: now use max_keys + // three stage only use max_keys + if (0 < max_rollup_ndv.max_keys_) { + whole_msg_.rollup_ndv_.n_keys_ = max_rollup_ndv.max_keys_; + } + return ret; +} + +int ObRollupKeyPieceMsgCtx::alloc_piece_msg_ctx(const ObRollupKeyPieceMsg &pkt, + ObPxCoordInfo &, + ObExecContext &ctx, + int64_t task_cnt, + ObPieceMsgCtx *&msg_ctx) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(ctx.get_my_session()) || + OB_ISNULL(ctx.get_physical_plan_ctx())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null or physical plan ctx is null", K(ret)); + } else { + void *buf = ctx.get_allocator().alloc(sizeof(ObRollupKeyPieceMsgCtx)); + if (OB_ISNULL(buf)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + } else { + msg_ctx = new (buf) ObRollupKeyPieceMsgCtx(pkt.op_id_, task_cnt, + ctx.get_physical_plan_ctx()->get_timeout_timestamp(), + ctx.get_my_session()->get_effective_tenant_id()); + } + } + return ret; +} + +int ObRollupKeyWholeMsg::assign(const ObRollupKeyWholeMsg &other) +{ + int ret = OB_SUCCESS; + rollup_ndv_ = other.rollup_ndv_; + return ret; +} diff --git a/src/sql/engine/px/datahub/components/ob_dh_rollup_key.h b/src/sql/engine/px/datahub/components/ob_dh_rollup_key.h index 6e456f9543..e767e1f567 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_rollup_key.h +++ b/src/sql/engine/px/datahub/components/ob_dh_rollup_key.h @@ -94,7 +94,7 @@ public: : ObPieceMsgCtx(op_id, task_cnt, timeout_ts), received_(0), tenant_id_(tenant_id), whole_msg_(), received_msgs_() {} ~ObRollupKeyPieceMsgCtx() = default; - void destroy() + virtual void destroy() { received_msgs_.reset(); } diff --git a/src/sql/engine/px/datahub/components/ob_dh_sample.cpp b/src/sql/engine/px/datahub/components/ob_dh_sample.cpp index 5e0d812384..00a9e09963 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_sample.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_sample.cpp @@ -595,7 +595,6 @@ int ObDynamicSamplePieceMsgCtx::on_message( LOG_WARN("failed to wait response", K(ret)); } } - IGNORE_RETURN destroy(); } return ret; } diff --git a/src/sql/engine/px/datahub/components/ob_dh_sample.h b/src/sql/engine/px/datahub/components/ob_dh_sample.h index bef83e7014..83a8f618ba 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_sample.h +++ b/src/sql/engine/px/datahub/components/ob_dh_sample.h @@ -111,7 +111,7 @@ public: const SortDef &sort_def); virtual ~ObDynamicSamplePieceMsgCtx() = default; int init(const ObIArray &tablet_ids); - void destroy(); + virtual void destroy(); int process_piece(const ObDynamicSamplePieceMsg &piece); int split_range( const ObChunkDatumStore *sample_store, diff --git a/src/sql/engine/px/datahub/ob_dh_msg_ctx.h b/src/sql/engine/px/datahub/ob_dh_msg_ctx.h index 9009c54c49..2f25c2b8f9 100644 --- a/src/sql/engine/px/datahub/ob_dh_msg_ctx.h +++ b/src/sql/engine/px/datahub/ob_dh_msg_ctx.h @@ -26,6 +26,7 @@ public: ObPieceMsgCtx(uint64_t op_id, int64_t task_cnt, int64_t timeout_ts) : op_id_(op_id), task_cnt_(task_cnt), timeout_ts_(timeout_ts) {} VIRTUAL_TO_STRING_KV(K_(op_id), K_(task_cnt)); + virtual void destroy() {} uint64_t op_id_; // 哪个算子使用 datahub 服务 int64_t task_cnt_; // 这个 dfo 下实际执行的 task 数, 即:期望收到的 piece 数量 int64_t timeout_ts_; //超时时间, DTL发送消息时会使用 @@ -38,6 +39,11 @@ public: ~ObPieceMsgCtxMgr() = default; void reset() { + for (int i = 0; i < ctxs_.count(); ++i) { + if (OB_NOT_NULL(ctxs_[i])) { + ctxs_[i]->destroy(); + } + } ctxs_.reset(); } int find_piece_ctx(uint64_t op_id, ObPieceMsgCtx *&ctx)