[FEAT MERGE]: online optimizer stats gather.

Co-authored-by: obdev <obdev@oceanbase.com>
This commit is contained in:
Monk-Liu
2023-01-28 13:54:38 +08:00
committed by ob-robot
parent 735401c8d7
commit 9484175163
74 changed files with 3416 additions and 85 deletions

View File

@ -0,0 +1,156 @@
// Copyright 1999-2022 Alibaba Inc. All Rights Reserved.
// Author:
// liuqifan.lqf@oceanbase.com
//
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/px/datahub/components/ob_dh_opt_stats_gather.h"
#include "sql/engine/px/datahub/ob_dh_msg_ctx.h"
#include "sql/engine/px/ob_dfo.h"
#include "sql/engine/px/ob_px_util.h"
#include "sql/engine/px/datahub/ob_dh_msg.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/engine/opt_statistics/ob_optimizer_stats_gathering_op.h"
using namespace oceanbase::sql;
using namespace oceanbase::common;
OB_DEF_SERIALIZE(ObOptStatsGatherPieceMsg)
{
int ret = OB_SUCCESS;
ret = ObDatahubPieceMsg::serialize(buf, buf_len, pos);
if (OB_SUCC(ret)) {
OB_UNIS_ENCODE(table_stats_.count());
for (int64_t i = 0; i < table_stats_.count(); i++) {
OB_UNIS_ENCODE(*table_stats_.at(i));
}
OB_UNIS_ENCODE(column_stats_.count());
for (int64_t i = 0; i < column_stats_.count(); i++) {
OB_UNIS_ENCODE(column_stats_.at(i)->size());
OB_UNIS_ENCODE(*column_stats_.at(i));
}
OB_UNIS_ENCODE(target_osg_id_);
}
return ret;
}
OB_DEF_DESERIALIZE(ObOptStatsGatherPieceMsg)
{
int ret = OB_SUCCESS;
ret = ObDatahubPieceMsg::deserialize(buf, data_len, pos);
if (OB_SUCC(ret)) {
int64_t size = 0;
OB_UNIS_DECODE(size)
for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) {
void *tmp_buf = arena_.alloc(sizeof(ObOptTableStat));
if (OB_ISNULL(tmp_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else {
ObOptTableStat *tmp_stat = new (tmp_buf) ObOptTableStat;
if (OB_FAIL(tmp_stat->deserialize(buf, data_len, pos))) {
LOG_WARN("deserialize datum store failed", K(ret), K(i));
} else if (OB_FAIL(table_stats_.push_back(tmp_stat))) {
LOG_WARN("push back datum store failed", K(ret), K(i));
}
}
}
size = 0;
OB_UNIS_DECODE(size)
for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) {
int col_stat_size = 0;
OB_UNIS_DECODE(col_stat_size);
void *tmp_buf = arena_.alloc(col_stat_size);
if (OB_ISNULL(tmp_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else {
ObOptColumnStat *tmp_stat = new (tmp_buf) ObOptColumnStat(arena_);
if (OB_FAIL(tmp_stat->deserialize(buf, data_len, pos))) {
LOG_WARN("deserialize datum store failed", K(ret), K(i));
} else if (OB_FAIL(column_stats_.push_back(tmp_stat))) {
LOG_WARN("push back datum store failed", K(ret), K(i));
}
}
}
OB_UNIS_DECODE(target_osg_id_);
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObOptStatsGatherPieceMsg)
{
int64_t len = 0;
len += ObDatahubPieceMsg::get_serialize_size();
OB_UNIS_ADD_LEN(table_stats_.count());
for (int64_t i = 0; i < table_stats_.count(); i++) {
OB_UNIS_ADD_LEN(*table_stats_.at(i));
}
OB_UNIS_ADD_LEN(column_stats_.count());
for (int64_t i = 0; i < column_stats_.count(); i++) {
OB_UNIS_ADD_LEN(column_stats_.at(i)->size());
OB_UNIS_ADD_LEN(*column_stats_.at(i));
}
OB_UNIS_ADD_LEN(target_osg_id_);
return len;
}
OB_SERIALIZE_MEMBER((ObOptStatsGatherWholeMsg, ObDatahubWholeMsg), ready_state_);
int ObOptStatsGatherPieceMsgListener::on_message(
ObOptStatsGatherPieceMsgCtx &piece_ctx,
common::ObIArray<ObPxSqcMeta *> &sqcs,
const ObOptStatsGatherPieceMsg &pkt)
{
int ret = OB_SUCCESS;
if (pkt.op_id_ != piece_ctx.op_id_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected piece msg", K(ret), K(pkt), K(piece_ctx));
} else {
ObOptimizerStatsGatheringOp *osg_op = NULL;
piece_ctx.received_++;
// get the merge osg, pass piece msg to osg.
ObOperatorKit *kit = piece_ctx.osg_info_.op_kit_;
if (OB_ISNULL(kit) || OB_ISNULL(kit->op_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null op kit", K(ret), K(kit));
} else if (OB_UNLIKELY(PHY_OPTIMIZER_STATS_GATHERING != kit->op_->get_spec().type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected op", K(ret), K(kit->op_->get_spec().type_));
} else {
osg_op = static_cast<ObOptimizerStatsGatheringOp *>(kit->op_);
if (OB_FAIL(osg_op->on_piece_msg(pkt))) {
LOG_WARN("fail to call on piece msg", K(ret));
}
}
}
return ret;
}
int ObOptStatsGatherPieceMsgCtx::alloc_piece_msg_ctx(const ObOptStatsGatherPieceMsg &pkt,
ObPxCoordInfo &,
ObExecContext &ctx,
int64_t task_cnt,
ObPieceMsgCtx *&msg_ctx)
{
int ret = OB_SUCCESS;
void *buf = ctx.get_allocator().alloc(sizeof(ObOptStatsGatherPieceMsgCtx));
if (OB_ISNULL(ctx.get_physical_plan_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("physical plan ctx is null", K(ret));
} else if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
msg_ctx = new (buf) ObOptStatsGatherPieceMsgCtx(pkt.op_id_, task_cnt,
ctx.get_physical_plan_ctx()->get_timeout_timestamp());
if (OB_ISNULL(ctx.get_operator_kit(pkt.target_osg_id_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null operator kit", K(ret), K(pkt));
} else {
static_cast<ObOptStatsGatherPieceMsgCtx*>(msg_ctx)->osg_info_.op_kit_ = ctx.get_operator_kit(pkt.target_osg_id_);
}
}
return ret;
}

View File

@ -0,0 +1,137 @@
// Copyright 1999-2022 Alibaba Inc. All Rights Reserved.
// Author:
// liuqifan.lqf@oceanbase.com
//
#ifndef __OB_SQL_ENG_PX_DH_OPT_STATS_GATHER_H__
#define __OB_SQL_ENG_PX_DH_OPT_STATS_GATHER_H__
#include "sql/engine/px/datahub/ob_dh_msg.h"
#include "sql/engine/px/datahub/ob_dh_dtl_proc.h"
#include "sql/engine/px/datahub/ob_dh_msg_ctx.h"
#include "sql/engine/px/datahub/ob_dh_msg_provider.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase {
namespace sql {
class ObOptStatsGatherPieceMsg;
class ObOptStatsGatherWholeMsg;
typedef ObPieceMsgP<ObOptStatsGatherPieceMsg> ObOptStatsGatherPieceMsgP;
typedef ObWholeMsgP<ObOptStatsGatherWholeMsg> ObOptStatsGatherWholeMsgP;
class ObOptStatsGatherPieceMsgListener;
class ObOptStatsGatherPieceMsgCtx;
class ObPxCoordInfo;
struct OSGInfo {
OSGInfo() = default;
~OSGInfo() = default;
ObOperatorKit *op_kit_;
};
class ObOptStatsGatherPieceMsg
: public ObDatahubPieceMsg<dtl::ObDtlMsgType::DH_OPT_STATS_GATHER_PIECE_MSG>
{
OB_UNIS_VERSION_V(1);
public:
using PieceMsgListener = ObOptStatsGatherPieceMsgListener;
using PieceMsgCtx = ObOptStatsGatherPieceMsgCtx;
public:
ObOptStatsGatherPieceMsg() :
column_stats_(),
table_stats_(),
arena_("ObOSGPieceMsg"),
target_osg_id_(0) {};
~ObOptStatsGatherPieceMsg() { reset(); };
void reset() {
for (int64_t i = 0; i < column_stats_.count(); i++) {
if (OB_NOT_NULL(column_stats_.at(i))) {
// to clear memory allocate by inner_alloc in ObOptColumnStat
column_stats_.at(i)->reset();
column_stats_.at(i) = NULL;
}
}
column_stats_.reset();
table_stats_.reset();
arena_.reset();
};
int assign(const ObOptStatsGatherPieceMsg &other) {
// do assign
return common::OB_SUCCESS;
}
INHERIT_TO_STRING_KV("meta", ObDatahubPieceMsg<dtl::ObDtlMsgType::DH_OPT_STATS_GATHER_PIECE_MSG>,
K_(op_id),
K_(column_stats),
K_(table_stats),
K_(target_osg_id));
inline void set_osg_id(uint64_t target_id) { target_osg_id_ = target_id; };
inline uint64_t get_osg_id() { return target_osg_id_; };
inline void set_tenant_id(uint64_t tenant_id) { arena_.set_tenant_id(tenant_id); };
common::ObSEArray<ObOptColumnStat*, 4> column_stats_;
common::ObSEArray<ObOptTableStat*, 4> table_stats_;
// to restore history
ObArenaAllocator arena_;
uint64_t target_osg_id_;
private:
DISALLOW_COPY_AND_ASSIGN(ObOptStatsGatherPieceMsg);
};
// whole msg is useless, since we don't need the feed back. px only pass the piece msg to osg.
class ObOptStatsGatherWholeMsg
: public ObDatahubWholeMsg<dtl::ObDtlMsgType::DH_OPT_STATS_GATHER_WHOLE_MSG>
{
OB_UNIS_VERSION_V(1);
public:
using WholeMsgProvider = ObWholeMsgProvider<ObOptStatsGatherWholeMsg>;
public:
ObOptStatsGatherWholeMsg() : ready_state_(0) {};
~ObOptStatsGatherWholeMsg() = default;
int assign(const ObOptStatsGatherWholeMsg &other)
{
ready_state_ = other.ready_state_;
return common::OB_SUCCESS;
}
void reset()
{
ready_state_ = 0;
}
VIRTUAL_TO_STRING_KV(K_(ready_state));
int ready_state_;
};
class ObOptStatsGatherPieceMsgCtx : public ObPieceMsgCtx
{
public:
ObOptStatsGatherPieceMsgCtx(uint64_t op_id, int64_t task_cnt, int64_t timeout_ts)
: ObPieceMsgCtx(op_id, task_cnt, timeout_ts), received_(0), osg_info_() {}
~ObOptStatsGatherPieceMsgCtx() = default;
static int alloc_piece_msg_ctx(const ObOptStatsGatherPieceMsg &pkt,
ObPxCoordInfo &coord_info,
ObExecContext &ctx,
int64_t task_cnt,
ObPieceMsgCtx *&msg_ctx);
INHERIT_TO_STRING_KV("meta", ObPieceMsgCtx, K_(received));
int received_;
OSGInfo osg_info_; // store the op_kit for OSG_MERGE. So that we can call API in OSG.
private:
DISALLOW_COPY_AND_ASSIGN(ObOptStatsGatherPieceMsgCtx);
};
class ObOptStatsGatherPieceMsgListener
{
public:
ObOptStatsGatherPieceMsgListener() = default;
~ObOptStatsGatherPieceMsgListener() = default;
static int on_message(
ObOptStatsGatherPieceMsgCtx &piece_ctx,
common::ObIArray<ObPxSqcMeta *> &sqcs,
const ObOptStatsGatherPieceMsg &pkt);
private:
DISALLOW_COPY_AND_ASSIGN(ObOptStatsGatherPieceMsgListener);
};
}
}
#endif

View File

@ -48,7 +48,8 @@ ObPxFifoCoordOp::ObPxFifoCoordOp(ObExecContext &exec_ctx, const ObOpSpec &spec,
interrupt_proc_(exec_ctx, msg_proc_),
sample_piece_msg_proc_(exec_ctx, msg_proc_),
rollup_key_piece_msg_proc_(exec_ctx, msg_proc_),
rd_wf_piece_msg_proc_(exec_ctx, msg_proc_)
rd_wf_piece_msg_proc_(exec_ctx, msg_proc_),
opt_stats_gather_piece_msg_proc_(exec_ctx, msg_proc_)
{}
int ObPxFifoCoordOp::inner_open()
@ -96,6 +97,7 @@ int ObPxFifoCoordOp::setup_loop_proc()
.register_processor(sample_piece_msg_proc_)
.register_processor(rollup_key_piece_msg_proc_)
.register_processor(rd_wf_piece_msg_proc_)
.register_processor(opt_stats_gather_piece_msg_proc_)
.register_interrupt_processor(interrupt_proc_);
return ret;
}
@ -205,6 +207,7 @@ int ObPxFifoCoordOp::fetch_rows(const int64_t row_cnt)
case ObDtlMsgType::DH_DYNAMIC_SAMPLE_PIECE_MSG:
case ObDtlMsgType::DH_ROLLUP_KEY_PIECE_MSG:
case ObDtlMsgType::DH_RANGE_DIST_WF_PIECE_MSG:
case ObDtlMsgType::DH_OPT_STATS_GATHER_PIECE_MSG:
// all message processed in callback
break;
default:

View File

@ -101,6 +101,7 @@ private:
ObDynamicSamplePieceMsgP sample_piece_msg_proc_;
ObRollupKeyPieceMsgP rollup_key_piece_msg_proc_;
ObRDWFPieceMsgP rd_wf_piece_msg_proc_;
ObOptStatsGatherPieceMsgP opt_stats_gather_piece_msg_proc_;
};
} // end namespace sql

View File

@ -94,6 +94,7 @@ ObPxMSCoordOp::ObPxMSCoordOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOp
sample_piece_msg_proc_(exec_ctx, msg_proc_),
rollup_key_piece_msg_proc_(exec_ctx, msg_proc_),
rd_wf_piece_msg_proc_(exec_ctx, msg_proc_),
opt_stats_gather_piece_msg_proc_(exec_ctx, msg_proc_),
store_rows_(),
last_pop_row_(nullptr),
row_heap_(),
@ -168,6 +169,7 @@ int ObPxMSCoordOp::setup_loop_proc()
.register_processor(sample_piece_msg_proc_)
.register_processor(rollup_key_piece_msg_proc_)
.register_processor(rd_wf_piece_msg_proc_)
.register_processor(opt_stats_gather_piece_msg_proc_)
.register_interrupt_processor(interrupt_proc_);
msg_loop_.set_tenant_id(ctx_.get_my_session()->get_effective_tenant_id());
return ret;
@ -407,6 +409,7 @@ int ObPxMSCoordOp::inner_get_next_row()
case ObDtlMsgType::DH_DYNAMIC_SAMPLE_PIECE_MSG:
case ObDtlMsgType::DH_ROLLUP_KEY_PIECE_MSG:
case ObDtlMsgType::DH_RANGE_DIST_WF_PIECE_MSG:
case ObDtlMsgType::DH_OPT_STATS_GATHER_PIECE_MSG:
// 这几种消息都在 process 回调函数里处理了
break;
default:

View File

@ -156,6 +156,7 @@ private:
ObDynamicSamplePieceMsgP sample_piece_msg_proc_;
ObRollupKeyPieceMsgP rollup_key_piece_msg_proc_;
ObRDWFPieceMsgP rd_wf_piece_msg_proc_;
ObOptStatsGatherPieceMsgP opt_stats_gather_piece_msg_proc_;
// 存储merge sort的每一路的当前行
ObArray<ObChunkDatumStore::LastStoredRow*> store_rows_;
ObChunkDatumStore::LastStoredRow* last_pop_row_;

View File

@ -50,6 +50,7 @@ ObPxOrderedCoordOp::ObPxOrderedCoordOp(ObExecContext &exec_ctx, const ObOpSpec &
sample_piece_msg_proc_(exec_ctx, msg_proc_),
rollup_key_piece_msg_proc_(exec_ctx, msg_proc_),
rd_wf_piece_msg_proc_(exec_ctx, msg_proc_),
opt_stats_gather_piece_msg_proc_(exec_ctx, msg_proc_),
readers_(NULL),
receive_order_(),
reader_cnt_(0),
@ -110,6 +111,7 @@ int ObPxOrderedCoordOp::setup_loop_proc()
.register_processor(sample_piece_msg_proc_)
.register_processor(rollup_key_piece_msg_proc_)
.register_processor(rd_wf_piece_msg_proc_)
.register_processor(opt_stats_gather_piece_msg_proc_)
.register_interrupt_processor(interrupt_proc_);
return ret;
}
@ -204,6 +206,7 @@ int ObPxOrderedCoordOp::inner_get_next_row()
case ObDtlMsgType::DH_DYNAMIC_SAMPLE_PIECE_MSG:
case ObDtlMsgType::DH_ROLLUP_KEY_PIECE_MSG:
case ObDtlMsgType::DH_RANGE_DIST_WF_PIECE_MSG:
case ObDtlMsgType::DH_OPT_STATS_GATHER_PIECE_MSG:
// 这几种消息都在 process 回调函数里处理了
break;
default:

View File

@ -139,6 +139,7 @@ private:
ObDynamicSamplePieceMsgP sample_piece_msg_proc_;
ObRollupKeyPieceMsgP rollup_key_piece_msg_proc_;
ObRDWFPieceMsgP rd_wf_piece_msg_proc_;
ObOptStatsGatherPieceMsgP opt_stats_gather_piece_msg_proc_;
ObReceiveRowReader *readers_;
ObOrderedReceiveFilter receive_order_;
int64_t reader_cnt_;

View File

@ -125,3 +125,10 @@ int ObPxSubCoordMsgProc::on_whole_msg(
ObDhWholeeMsgProc<ObRDWFWholeMsg> proc;
return proc.on_whole_msg(sqc_ctx_, pkt);
}
int ObPxSubCoordMsgProc::on_whole_msg(
const ObOptStatsGatherWholeMsg &pkt) const
{
ObDhWholeeMsgProc<ObOptStatsGatherWholeMsg> proc;
return proc.on_whole_msg(sqc_ctx_, pkt);
}

View File

@ -31,6 +31,8 @@ class ObDynamicSamplePieceMsg;
class ObDynamicSampleWholeMsg;
class ObRDWFPieceMsg;
class ObRDWFWholeMsg;
class ObOptStatsGatherPieceMsg;
class ObOptStatsGatherWholeMsg;
// 抽象出本接口类的目的是为了 MsgProc 和 ObPxCoord 解耦
class ObIPxCoordMsgProc
{
@ -46,6 +48,7 @@ public:
virtual int on_piece_msg(ObExecContext &ctx, const ObDynamicSamplePieceMsg &pkt) = 0;
virtual int on_piece_msg(ObExecContext &ctx, const ObRollupKeyPieceMsg &pkt) = 0;
virtual int on_piece_msg(ObExecContext &ctx, const ObRDWFPieceMsg &pkt) = 0;
virtual int on_piece_msg(ObExecContext &ctx, const ObOptStatsGatherPieceMsg &pkt) = 0;
};
class ObIPxSubCoordMsgProc
@ -69,6 +72,8 @@ public:
const ObRollupKeyWholeMsg &pkt) const = 0;
virtual int on_whole_msg(
const ObRDWFWholeMsg &pkt) const = 0;
virtual int on_whole_msg(
const ObOptStatsGatherWholeMsg &pkt) const = 0;
// SQC 被中断
virtual int on_interrupted(const ObInterruptCode &ic) const = 0;
};
@ -99,6 +104,8 @@ public:
const ObRollupKeyWholeMsg &pkt) const;
virtual int on_whole_msg(
const ObRDWFWholeMsg &pkt) const;
virtual int on_whole_msg(
const ObOptStatsGatherWholeMsg &pkt) const;
private:
ObSqcCtx &sqc_ctx_;
};

View File

@ -664,6 +664,7 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
ObRollupKeyPieceMsgP rollup_key_piece_msg_proc(ctx_, terminate_msg_proc);
ObRDWFPieceMsgP rd_wf_piece_msg_proc(ctx_, terminate_msg_proc);
ObPxQcInterruptedP interrupt_proc(ctx_, terminate_msg_proc);
ObOptStatsGatherPieceMsgP opt_stats_gather_piece_msg_proc(ctx_, terminate_msg_proc);
// 这个注册会替换掉旧的proc.
(void)msg_loop_.clear_all_proc();
@ -676,7 +677,8 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
.register_processor(winbuf_piece_msg_proc)
.register_processor(sample_piece_msg_proc)
.register_processor(rollup_key_piece_msg_proc)
.register_processor(rd_wf_piece_msg_proc);
.register_processor(rd_wf_piece_msg_proc)
.register_processor(opt_stats_gather_piece_msg_proc);
loop.ignore_interrupt();
ObPxControlChannelProc control_channels;
@ -735,6 +737,7 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
case ObDtlMsgType::DH_DYNAMIC_SAMPLE_PIECE_MSG:
case ObDtlMsgType::DH_ROLLUP_KEY_PIECE_MSG:
case ObDtlMsgType::DH_RANGE_DIST_WF_PIECE_MSG:
case ObDtlMsgType::DH_OPT_STATS_GATHER_PIECE_MSG:
break;
default:
ret = OB_ERR_UNEXPECTED;

View File

@ -14,6 +14,7 @@
#include "sql/engine/px/ob_px_scheduler.h"
#include "sql/engine/px/ob_dfo_scheduler.h"
#include "sql/engine/px/ob_dfo_mgr.h"
#include "lib/random/ob_random.h"
#include "share/ob_rpc_share.h"
#include "share/schema/ob_part_mgr_util.h"
@ -99,8 +100,6 @@ public:
}
};
int ObPxMsgProc::on_process_end(ObExecContext &ctx)
{
UNUSED(ctx);
@ -419,6 +418,14 @@ int ObPxMsgProc::on_piece_msg(
return proc.on_piece_msg(coord_info_, ctx, pkt);
}
int ObPxMsgProc::on_piece_msg(
ObExecContext &ctx,
const ObOptStatsGatherPieceMsg &pkt)
{
ObDhPieceMsgProc<ObOptStatsGatherPieceMsg> proc;
return proc.on_piece_msg(coord_info_, ctx, pkt);
}
int ObPxMsgProc::on_eof_row(ObExecContext &ctx)
{
int ret = OB_SUCCESS;
@ -794,5 +801,13 @@ int ObPxTerminateMsgProc::on_piece_msg(
return common::OB_SUCCESS;
}
int ObPxTerminateMsgProc::on_piece_msg(
ObExecContext &,
const ObOptStatsGatherPieceMsg &)
{
return common::OB_SUCCESS;
}
} // end namespace sql
} // end namespace oceanbase

View File

@ -27,6 +27,7 @@
#include "sql/engine/px/datahub/components/ob_dh_rollup_key.h"
#include "sql/engine/px/datahub/components/ob_dh_barrier.h"
#include "sql/engine/px/datahub/components/ob_dh_range_dist_wf.h"
#include "sql/engine/px/datahub/components/ob_dh_opt_stats_gather.h"
namespace oceanbase
{
@ -34,6 +35,7 @@ namespace sql
{
class ObPxCoordOp;
class ObPxObDfoMgr;
class ObPxRootDfoAction
{
public:
@ -121,6 +123,7 @@ public:
int on_piece_msg(ObExecContext &ctx, const ObDynamicSamplePieceMsg &pkt);
int on_piece_msg(ObExecContext &ctx, const ObRollupKeyPieceMsg &pkt);
int on_piece_msg(ObExecContext &ctx, const ObRDWFPieceMsg &pkt);
int on_piece_msg(ObExecContext &ctx, const ObOptStatsGatherPieceMsg &pkt);
// end DATAHUB msg processing
ObPxCoordInfo &coord_info_;
@ -155,6 +158,7 @@ public:
int on_piece_msg(ObExecContext &ctx, const ObDynamicSamplePieceMsg &pkt);
int on_piece_msg(ObExecContext &ctx, const ObRollupKeyPieceMsg &pkt);
int on_piece_msg(ObExecContext &ctx, const ObRDWFPieceMsg &pkt);
int on_piece_msg(ObExecContext &ctx, const ObOptStatsGatherPieceMsg &pkt);
// end DATAHUB msg processing
private:
int do_cleanup_dfo(ObDfo &dfo);
@ -170,7 +174,6 @@ private:
ObDfoSchedulerBasic *scheduler_;
};
} // end namespace sql
} // end namespace oceanbase

View File

@ -99,6 +99,7 @@ int ObPxSQCProxy::setup_loop_proc(ObSqcCtx &sqc_ctx) const
.register_processor(sqc_ctx.sample_whole_msg_proc_)
.register_processor(sqc_ctx.rollup_key_whole_msg_proc_)
.register_processor(sqc_ctx.rd_wf_whole_msg_proc_)
.register_processor(sqc_ctx.opt_stats_gather_whole_msg_proc_)
.register_interrupt_processor(sqc_ctx.interrupt_proc_);
}
return ret;

View File

@ -27,6 +27,7 @@
#include "sql/engine/px/datahub/components/ob_dh_rollup_key.h"
#include "sql/engine/px/datahub/components/ob_dh_sample.h"
#include "sql/engine/px/datahub/components/ob_dh_range_dist_wf.h"
#include "sql/engine/px/datahub/components/ob_dh_opt_stats_gather.h"
namespace oceanbase
{
namespace sql
@ -50,7 +51,8 @@ public:
sqc_proxy_(*this, sqc_arg),
all_tasks_finish_(false),
interrupted_(false),
px_bloom_filter_msg_proc_(msg_proc_)
px_bloom_filter_msg_proc_(msg_proc_),
opt_stats_gather_whole_msg_proc_(msg_proc_)
{}
~ObSqcCtx() { reset(); }
common::ObIArray<ObPxTask> &get_tasks() { return tasks_; }
@ -111,6 +113,7 @@ public:
common::ObSEArray<ObPxTabletInfo, 8> partitions_info_;
ObPxBloomfilterChProvider bf_ch_provider_;
ObPxCreateBloomFilterChannelMsgP px_bloom_filter_msg_proc_;
ObOptStatsGatherWholeMsgP opt_stats_gather_whole_msg_proc_;
// 用于 datahub 中保存 whole msg provider,一般情况下一个子计划里不会
// 超过一个算子会使用 datahub,所以大小默认为 1 即可
common::ObSEArray<ObPxDatahubDataProvider *, 1> whole_msg_provider_list_;