[FEAT MERGE]4_1_sql_feature

Co-authored-by: leslieyuchen <leslieyuchen@gmail.com>
Co-authored-by: Charles0429 <xiezhenjiang@gmail.com>
Co-authored-by: raywill <hustos@gmail.com>
This commit is contained in:
obdev
2023-01-28 16:01:26 +08:00
committed by ob-robot
parent 3080f2b66f
commit 2d19a9d8f5
846 changed files with 161957 additions and 116661 deletions

View File

@ -64,14 +64,18 @@ public:
{
int ret = OB_SUCCESS;
ObArray<ObPxSqcMeta *> sqcs;
ObDfo *dfo = nullptr;
ObDfo *source_dfo = nullptr;
ObDfo *target_dfo = nullptr;
ObPieceMsgCtx *piece_ctx = nullptr;
ObDfo *child_dfo = nullptr;
// FIXME (TODO xiaochu):这个 dfo id 不是必须的,本地可以维护一个 op_id 到 dfo id 的映射
if (OB_FAIL(coord_info.dfo_mgr_.find_dfo_edge(pkt.dfo_id_, dfo))) {
if (OB_FAIL(coord_info.dfo_mgr_.find_dfo_edge(pkt.source_dfo_id_, source_dfo))) {
LOG_WARN("fail find dfo", K(pkt), K(ret));
} else if (OB_ISNULL(dfo)) {
} else if (OB_FAIL(coord_info.dfo_mgr_.find_dfo_edge(pkt.target_dfo_id_, target_dfo))) {
LOG_WARN("fail find dfo", K(pkt), K(ret));
} else if (OB_ISNULL(source_dfo) || OB_ISNULL(target_dfo)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr or null session ptr", KP(dfo), K(pkt), K(ret));
LOG_WARN("NULL ptr or null session ptr", KP(source_dfo), KP(target_dfo), K(pkt), K(ret));
} else if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.find_piece_ctx(pkt.op_id_, piece_ctx))) {
// 如果找不到则创建一个 ctx
// NOTE: 这里新建一个 piece_ctx 的方式不会出现并发问题,
@ -79,7 +83,7 @@ public:
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("fail get ctx", K(pkt), K(ret));
} else if (OB_FAIL(PieceMsg::PieceMsgCtx::alloc_piece_msg_ctx(pkt, coord_info, ctx,
dfo->get_total_task_count(), piece_ctx))) {
source_dfo->get_total_task_count(), piece_ctx))) {
LOG_WARN("fail to alloc piece msg", K(ret));
} else if (nullptr != piece_ctx) {
if (OB_FAIL(coord_info.piece_msg_ctx_mgr_.add_piece_ctx(piece_ctx))) {
@ -90,7 +94,7 @@ public:
if (OB_SUCC(ret)) {
typename PieceMsg::PieceMsgCtx *ctx = static_cast<typename PieceMsg::PieceMsgCtx *>(piece_ctx);
if (OB_FAIL(dfo->get_sqcs(sqcs))) {
if (OB_FAIL(target_dfo->get_sqcs(sqcs))) {
LOG_WARN("fail get qc-sqc channel for QC", K(ret));
} else if (OB_FAIL(PieceMsg::PieceMsgListener::on_message(*ctx, sqcs, pkt))) {
LOG_WARN("fail process piece msg", K(pkt), K(ret));
@ -418,6 +422,22 @@ int ObPxMsgProc::on_piece_msg(
return proc.on_piece_msg(coord_info_, ctx, pkt);
}
int ObPxMsgProc::on_piece_msg(
ObExecContext &ctx,
const ObInitChannelPieceMsg &pkt)
{
ObDhPieceMsgProc<ObInitChannelPieceMsg> proc;
return proc.on_piece_msg(coord_info_, ctx, pkt);
}
int ObPxMsgProc::on_piece_msg(
ObExecContext &ctx,
const ObReportingWFPieceMsg &pkt)
{
ObDhPieceMsgProc<ObReportingWFPieceMsg> proc;
return proc.on_piece_msg(coord_info_, ctx, pkt);
}
int ObPxMsgProc::on_piece_msg(
ObExecContext &ctx,
const ObOptStatsGatherPieceMsg &pkt)
@ -476,86 +496,76 @@ int ObPxMsgProc::on_dfo_pair_thread_inited(ObExecContext &ctx, ObDfo &child, ObD
return ret;
}
int ObPxMsgProc::mark_rpc_filter(ObExecContext &ctx)
int ObPxMsgProc::mark_rpc_filter(ObExecContext &ctx,
ObJoinFilterDataCtx &bf_ctx,
int64_t &each_group_size)
{
int ret = OB_SUCCESS;
ObJoinFilterDataCtx &bf_ctx = ctx.get_bf_ctx();
ObPhysicalPlanCtx *phy_plan_ctx = GET_PHY_PLAN_CTX(ctx);
bf_ctx.filter_ready_ = false;
common::ObArray<dtl::ObDtlChannel *> channels;
uint64_t each_group_size = 0;
bf_ctx.filter_ready_ = false; // make sure there is only one thread of this sqc can mark_rpc_filter
if (OB_ISNULL(bf_ctx.filter_data_) ||
OB_ISNULL(phy_plan_ctx) ||
OB_ISNULL(ctx.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the filter data or phy plan ctx is null", K(ret));
} else if (0 == bf_ctx.ch_provider_ptr_) {
// root dfo
if (bf_ctx.ch_set_.count() <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the count of bloom filter chsets is unexpected",
K(bf_ctx.ch_set_.count()), K(ret));
} else if (OB_FAIL(ObDtlChannelUtil::link_ch_set(bf_ctx.ch_set_, channels,
[&](ObDtlChannel *ch) {
ch->set_join_filter_owner();
ch->set_thread_id(GETTID());
}))) {
LOG_WARN("fail to link ch channel", K(ret));
} else {
bf_ctx.filter_data_->bloom_filter_count_ = 1;
}
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support root dfo send bloom filter", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "root dfo send bloom filter");
} else {
int64_t sqc_count = 0;
int64_t tenant_id = ctx.get_my_session()->get_effective_tenant_id();
ObPxSQCProxy *ch_provider = reinterpret_cast<ObPxSQCProxy *>(bf_ctx.ch_provider_ptr_);
// get channel info(addr) between receive op that will send bloom filter and child dfo sqc thread
if (OB_FAIL(ch_provider->get_bloom_filter_ch(bf_ctx.ch_set_,
sqc_count, phy_plan_ctx->get_timeout_timestamp(), false))) {
LOG_WARN("fail get data ch sets from provider", K(ret));
} else if (OB_FAIL(ObDtlChannelUtil::link_ch_set(bf_ctx.ch_set_, channels,
[&](ObDtlChannel *ch) {
ch->set_join_filter_owner();
ch->set_thread_id(GETTID());
}))) {
LOG_WARN("fail to link ch channel", K(ret));
} else if (channels.count() <= 0) {
} else if (bf_ctx.ch_set_.count() <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("channels count is unexpected", K(ret));
LOG_WARN("ch_info_set count is unexpected", K(ret));
} else {
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (OB_LIKELY(tenant_config.is_valid())) {
const char *ptr = NULL;
if (OB_ISNULL(ptr = tenant_config->_px_bloom_filter_group_size.get_value())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("each group size ptr is null", K(ret));
} else if (0 == ObString::make_string("auto").case_compare(ptr)) {
each_group_size = sqrt(channels.count());
} else {
char *end_ptr = nullptr;
each_group_size = strtoull(ptr, &end_ptr, 10);
if (*end_ptr != '\0') {
if (0 == each_group_size) { // only need calc once
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (OB_LIKELY(tenant_config.is_valid())) {
const char *ptr = NULL;
if (OB_ISNULL(ptr = tenant_config->_px_bloom_filter_group_size.get_value())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("each group size ptr is unexpected", K(ret));
LOG_WARN("each group size ptr is null", K(ret));
} else if (0 == ObString::make_string("auto").case_compare(ptr)) {
each_group_size = sqrt(bf_ctx.ch_set_.count()); // auto calc group size
} else {
char *end_ptr = nullptr;
each_group_size = strtoull(ptr, &end_ptr, 10); // get group size from tenant config
if (*end_ptr != '\0') {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("each group size ptr is unexpected", K(ret));
}
}
}
each_group_size = (each_group_size <= 0 ? 1 : each_group_size);
}
each_group_size = (each_group_size <= 0 ? 1 : each_group_size);
int64_t send_size = GCONF._send_bloom_filter_size * 125;
int64_t send_count = ceil(bf_ctx.filter_data_->filter_.get_bits_array_length() / (double)send_size);
bf_ctx.filter_data_->bloom_filter_count_ = sqc_count * send_count;
ch_provider->set_filter_data(bf_ctx.filter_data_);
OZ(ch_provider->assign_bloom_filter_channels(channels));
OZ(ch_provider->assign_bf_ch_set(bf_ctx.ch_set_));
OZ(ch_provider->generate_filter_indexes(each_group_size, channels.count()));
if (OB_SUCC(ret)) {
ch_provider->set_bf_compress_type(ctx.get_bf_ctx().compressor_type_);
ch_provider->set_per_channel_bf_count(send_count);
ch_provider->set_bloom_filter_ready(true);
int64_t send_size = GCONF._send_bloom_filter_size * 125; // 125 = 1000/8, send size means how many byte of partial bloom filter will be send at once
int64_t send_count = ceil(bf_ctx.filter_data_->filter_.get_bits_array_length() / (double)send_size); // how many piece of partial bloom filter will be send by all threads(sqc level)
bf_ctx.filter_data_->bloom_filter_count_ = sqc_count * send_count; // count of piece of partial bloom filter that child dfo will get
common::ObIArray<ObBloomFilterSendCtx> &sqc_bf_send_ctx_array = ch_provider->get_bf_send_ctx_array();
int64_t bf_idx_at_sqc_proxy = bf_ctx.bf_idx_at_sqc_proxy_;
if (0 > bf_idx_at_sqc_proxy || bf_idx_at_sqc_proxy >= sqc_bf_send_ctx_array.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid bf_idx_at_sqc_proxy", K(bf_idx_at_sqc_proxy), K(sqc_bf_send_ctx_array.count()), K(ret));
} else {
ObBloomFilterSendCtx &bf_send_ctx = sqc_bf_send_ctx_array.at(bf_idx_at_sqc_proxy);
bf_send_ctx.set_filter_data(bf_ctx.filter_data_);
if (OB_FAIL(bf_send_ctx.assign_bf_ch_set(bf_ctx.ch_set_))) {
LOG_WARN("failed to assign bloom filter ch_set", K(ret));
} else if (OB_FAIL(bf_send_ctx.generate_filter_indexes(each_group_size, bf_ctx.ch_set_.count()))) {
LOG_WARN("failed to generate filter indexs", K(ret));
} else {
bf_send_ctx.set_bf_compress_type(bf_ctx.compressor_type_);
bf_send_ctx.set_per_channel_bf_count(send_count);
bf_send_ctx.set_bloom_filter_ready(true); // means bloom filter is ready to be sent
}
}
}
if (OB_FAIL(ret)) {
// if failed , need unlink chset
IGNORE_RETURN ObPxChannelUtil::unlink_ch_set(bf_ctx.ch_set_, nullptr, false);
}
}
return ret;
}
@ -801,6 +811,20 @@ int ObPxTerminateMsgProc::on_piece_msg(
return common::OB_SUCCESS;
}
int ObPxTerminateMsgProc::on_piece_msg(
ObExecContext &,
const ObInitChannelPieceMsg &)
{
return common::OB_SUCCESS;
}
int ObPxTerminateMsgProc::on_piece_msg(
ObExecContext &,
const ObReportingWFPieceMsg &)
{
return common::OB_SUCCESS;
}
int ObPxTerminateMsgProc::on_piece_msg(
ObExecContext &,
const ObOptStatsGatherPieceMsg &)
@ -808,6 +832,5 @@ int ObPxTerminateMsgProc::on_piece_msg(
return common::OB_SUCCESS;
}
} // end namespace sql
} // end namespace oceanbase