fix runtime filter bug in pwj and pkey-none scene

This commit is contained in:
obdev 2024-03-25 08:16:22 +00:00 committed by ob-robot
parent 45d42b517c
commit 02791e55d3
3 changed files with 22 additions and 15 deletions

View File

@ -201,7 +201,8 @@ int ObJoinFilterOpInput::init_shared_msgs(
} else if (OB_FAIL(msg_ptr->init(spec.rf_infos_.at(i).p2p_datahub_id_,
px_sequence_id_, 0/*task_id*/, tenant_id, timeout_ts, register_dm_info_))) {
LOG_WARN("fail to init msg", K(ret));
} else if (OB_FAIL(construct_msg_details(spec, sqc_proxy, config_, *msg_ptr, sqc_count))) {
} else if (OB_FAIL(construct_msg_details(spec, sqc_proxy, config_, *msg_ptr, sqc_count,
spec.filter_len_))) {
LOG_WARN("fail to construct msg details", K(ret), K(tenant_id));
}
}
@ -232,7 +233,8 @@ int ObJoinFilterOpInput::construct_msg_details(
ObPxSQCProxy *sqc_proxy,
ObJoinFilterRuntimeConfig &config,
ObP2PDatahubMsgBase &msg,
int64_t sqc_count)
int64_t sqc_count,
int64_t estimated_rows)
{
int ret = OB_SUCCESS;
switch(msg.get_msg_type()) {
@ -244,7 +246,7 @@ int ObJoinFilterOpInput::construct_msg_details(
ObSArray<ObAddr> *target_addrs = nullptr;
ObRFBloomFilterMsg &bf_msg = static_cast<ObRFBloomFilterMsg &>(msg);
ObPxSQCProxy::SQCP2PDhMap &dh_map = sqc_proxy->get_p2p_dh_map();
if (OB_FAIL(bf_msg.bloom_filter_.init(spec.filter_len_,
if (OB_FAIL(bf_msg.bloom_filter_.init(estimated_rows,
bf_msg.get_allocator(),
bf_msg.get_tenant_id(),
config.bloom_filter_ratio_,
@ -959,7 +961,7 @@ int ObJoinFilterOp::open_join_filter_create()
filter_input->px_sequence_id_, filter_input->task_id_, tenant_id, timeout_ts, filter_input->register_dm_info_))) {
LOG_WARN("fail to init msg", K(ret));
} else if (OB_FAIL(ObJoinFilterOpInput::construct_msg_details(MY_SPEC, sqc_proxy,
filter_input->config_, *msg_ptr, 1))) {
filter_input->config_, *msg_ptr, 1, filter_len))) {
LOG_WARN("fail to construct msg details", K(ret));
} else if (OB_FAIL(lucky_devil_champions_.push_back(false))) {
LOG_WARN("fail to push back flag", K(ret));
@ -1140,8 +1142,9 @@ int ObJoinFilterOp::init_local_msg_from_shared_msg(ObP2PDatahubMsgBase &msg)
msg.get_px_seq_id(), 0/*task_id*/, msg.get_tenant_id(),
msg.get_timeout_ts(), filter_input->register_dm_info_))) {
LOG_WARN("fail to init msg", K(ret));
} else if (OB_FAIL(ObJoinFilterOpInput::construct_msg_details(MY_SPEC,
sqc_proxy, filter_input->config_, *range_ptr, msg.get_msg_receive_expect_cnt()))) {
} else if (OB_FAIL(ObJoinFilterOpInput::construct_msg_details(
MY_SPEC, sqc_proxy, filter_input->config_, *range_ptr,
msg.get_msg_receive_expect_cnt(), MY_SPEC.filter_len_))) {
LOG_WARN("fail to construct msg details", K(ret));
}
break;
@ -1161,8 +1164,9 @@ int ObJoinFilterOp::init_local_msg_from_shared_msg(ObP2PDatahubMsgBase &msg)
msg.get_px_seq_id(), 0/*task_id*/, msg.get_tenant_id(),
msg.get_timeout_ts(), filter_input->register_dm_info_))) {
LOG_WARN("fail to init msg", K(ret));
} else if (OB_FAIL(ObJoinFilterOpInput::construct_msg_details(MY_SPEC,
sqc_proxy, filter_input->config_, *range_ptr, msg.get_msg_receive_expect_cnt()))) {
} else if (OB_FAIL(ObJoinFilterOpInput::construct_msg_details(
MY_SPEC, sqc_proxy, filter_input->config_, *range_ptr,
msg.get_msg_receive_expect_cnt(), MY_SPEC.filter_len_))) {
LOG_WARN("fail to construct msg details", K(ret));
}
break;
@ -1182,8 +1186,9 @@ int ObJoinFilterOp::init_local_msg_from_shared_msg(ObP2PDatahubMsgBase &msg)
msg.get_px_seq_id(), 0/*task_id*/, msg.get_tenant_id(),
msg.get_timeout_ts(), filter_input->register_dm_info_))) {
LOG_WARN("fail to init msg", K(ret));
} else if (OB_FAIL(ObJoinFilterOpInput::construct_msg_details(MY_SPEC,
sqc_proxy, filter_input->config_, *in_ptr, msg.get_msg_receive_expect_cnt()))) {
} else if (OB_FAIL(ObJoinFilterOpInput::construct_msg_details(
MY_SPEC, sqc_proxy, filter_input->config_, *in_ptr,
msg.get_msg_receive_expect_cnt(), MY_SPEC.filter_len_))) {
LOG_WARN("fail to construct msg details", K(ret));
}
break;
@ -1203,8 +1208,9 @@ int ObJoinFilterOp::init_local_msg_from_shared_msg(ObP2PDatahubMsgBase &msg)
msg.get_px_seq_id(), 0/*task_id*/, msg.get_tenant_id(),
msg.get_timeout_ts(), filter_input->register_dm_info_))) {
LOG_WARN("fail to init msg", K(ret));
} else if (OB_FAIL(ObJoinFilterOpInput::construct_msg_details(MY_SPEC,
sqc_proxy, filter_input->config_, *in_ptr, msg.get_msg_receive_expect_cnt()))) {
} else if (OB_FAIL(ObJoinFilterOpInput::construct_msg_details(
MY_SPEC, sqc_proxy, filter_input->config_, *in_ptr,
msg.get_msg_receive_expect_cnt(), MY_SPEC.filter_len_))) {
LOG_WARN("fail to construct msg details", K(ret));
}
break;

View File

@ -123,7 +123,7 @@ public:
static int construct_msg_details(const ObJoinFilterSpec &spec,
ObPxSQCProxy *sqc_proxy,
ObJoinFilterRuntimeConfig &config,
ObP2PDatahubMsgBase &msg, int64_t sqc_count);
ObP2PDatahubMsgBase &msg, int64_t sqc_count, int64_t estimated_rows);
void set_task_id(int64_t task_id) { task_id_ = task_id; }
inline void set_bf_idx_at_sqc_proxy(int64_t idx) { bf_idx_at_sqc_proxy_ = idx; }

View File

@ -5165,6 +5165,7 @@ int ObLogicalOperator::allocate_partition_join_filter(const ObIArray<JoinFilterI
ObLogJoinFilter *join_filter_create = NULL;
ObLogOperatorFactory &factory = get_plan()->get_log_op_factory();
CK(LOG_JOIN == get_type());
DistAlgo join_dist_algo = static_cast<ObLogJoin*>(this)->get_join_distributed_method();
for (int i = 0; i < infos.count() && OB_SUCC(ret); ++i) {
filter_create = NULL;
bool right_has_exchange = false;
@ -5212,7 +5213,7 @@ int ObLogicalOperator::allocate_partition_join_filter(const ObIArray<JoinFilterI
set_child(first_child, join_filter_create);
join_filter_create->set_filter_length(info.sharding_->get_part_cnt() * 2);
join_filter_create->set_is_use_filter_shuffle(right_has_exchange);
if (is_partition_wise_ && !right_has_exchange) {
if ((is_partition_wise_ || DistAlgo::DIST_PARTITION_NONE == join_dist_algo) && !right_has_exchange) {
join_filter_create->set_is_no_shared_partition_join_filter();
} else {
join_filter_create->set_is_shared_partition_join_filter();
@ -5305,7 +5306,7 @@ int ObLogicalOperator::allocate_normal_join_filter(const ObIArray<JoinFilterInfo
join_filter_create->set_is_use_filter_shuffle(true);
join_filter_use->set_is_use_filter_shuffle(true);
}
if (is_partition_wise_ && !right_has_exchange) {
if ((is_partition_wise_ || DistAlgo::DIST_PARTITION_NONE == join_dist_algo) && !right_has_exchange) {
join_filter_create->set_is_non_shared_join_filter();
join_filter_use->set_is_non_shared_join_filter();
} else {