Runtime Filter Optimization

This commit is contained in:
obdev
2023-08-18 12:11:53 +08:00
committed by ob-robot
parent 3c3b68a7f1
commit 3bd00fdbb4
19 changed files with 502 additions and 115 deletions

View File

@ -55,6 +55,7 @@ ObExprJoinFilter::ObExprJoinFilterContext::~ObExprJoinFilterContext()
}
hash_funcs_.reset();
cmp_funcs_.reset();
cur_row_.reset();
}
void ObExprJoinFilter::ObExprJoinFilterContext::reset_monitor_info()
@ -227,6 +228,51 @@ void ObExprJoinFilter::check_need_dynamic_diable_bf(
}
}
void ObExprJoinFilter::collect_sample_info_batch(
ObExprJoinFilter::ObExprJoinFilterContext &join_filter_ctx,
int64_t filter_count, int64_t total_count)
{
if (!join_filter_ctx.dynamic_disable()) {
join_filter_ctx.partial_filter_count_ += filter_count;
join_filter_ctx.partial_total_count_ += total_count;
}
check_need_dynamic_diable_bf_batch(join_filter_ctx);
}
void ObExprJoinFilter::check_need_dynamic_diable_bf_batch(
ObExprJoinFilter::ObExprJoinFilterContext &join_filter_ctx)
{
if (join_filter_ctx.cur_pos_ >= join_filter_ctx.next_check_start_pos_
&& join_filter_ctx.need_reset_sample_info_) {
join_filter_ctx.partial_total_count_ = 0;
join_filter_ctx.partial_filter_count_ = 0;
join_filter_ctx.need_reset_sample_info_ = false;
if (join_filter_ctx.dynamic_disable()) {
join_filter_ctx.dynamic_disable_ = false;
}
} else if (join_filter_ctx.cur_pos_ >=
join_filter_ctx.next_check_start_pos_ + join_filter_ctx.window_size_) {
if (join_filter_ctx.partial_total_count_ -
join_filter_ctx.partial_filter_count_ <
join_filter_ctx.partial_filter_count_) {
// partial_filter_count_ / partial_total_count_ > 0.5
// The optimizer choose the bloom filter when the filter threshold is larger than 0.6
// 0.5 is a acceptable value
// if enabled, the slide window not needs to expand
join_filter_ctx.window_cnt_ = 0;
join_filter_ctx.next_check_start_pos_ = join_filter_ctx.cur_pos_;
} else {
join_filter_ctx.window_cnt_++;
join_filter_ctx.next_check_start_pos_ = join_filter_ctx.cur_pos_ +
(join_filter_ctx.window_size_ * join_filter_ctx.window_cnt_);
join_filter_ctx.dynamic_disable_ = true;
}
join_filter_ctx.partial_total_count_ = 0;
join_filter_ctx.partial_filter_count_ = 0;
join_filter_ctx.need_reset_sample_info_ = true;
}
}
int ObExprJoinFilter::eval_bloom_filter(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res)
{
return eval_filter_internal(expr, ctx, res);

View File

@ -40,8 +40,10 @@ public:
n_times_(0), ready_ts_(0), next_check_start_pos_(0),
window_cnt_(0), window_size_(0),
partial_filter_count_(0), partial_total_count_(0),
cur_pos_(total_count_), flag_(0)
cur_pos_(total_count_), need_reset_sample_info_(false), flag_(0),
cur_row_()
{
cur_row_.set_attr(ObMemAttr(MTL_ID(), "RfCurRow"));
need_wait_rf_ = true;
is_first_ = true;
}
@ -70,6 +72,7 @@ public:
int64_t partial_filter_count_;
int64_t partial_total_count_;
int64_t &cur_pos_;
bool need_reset_sample_info_; // use for check_need_dynamic_diable_bf_batch
union {
uint64_t flag_;
struct {
@ -81,6 +84,7 @@ public:
int32_t reserved_:28;
};
};
ObTMArray<ObDatum> cur_row_;
};
ObExprJoinFilter();
explicit ObExprJoinFilter(common::ObIAllocator& alloc);
@ -118,6 +122,9 @@ public:
static void collect_sample_info(
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx,
bool is_match);
static void collect_sample_info_batch(
ObExprJoinFilter::ObExprJoinFilterContext &join_filter_ctx,
int64_t filter_count, int64_t total_count);
private:
static int check_rf_ready(
ObExecContext &exec_ctx,
@ -125,6 +132,8 @@ private:
static void check_need_dynamic_diable_bf(
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx);
static void check_need_dynamic_diable_bf_batch(
ObExprJoinFilter::ObExprJoinFilterContext &join_filter_ctx);
private:
static const int64_t CHECK_TIMES = 127;
private:

View File

@ -440,7 +440,9 @@ int ObJoinFilterOp::mark_not_need_send_bf_msg()
} else {
// only the msg is a shared and shuffled BLOOM_FILTER_MSG
// let other worker threads stop trying send join_filter
shared_bf_msg->need_send_msg_ = false;
if (!*shared_bf_msg->create_finish_) {
shared_bf_msg->need_send_msg_ = false;
}
}
}
}
@ -448,12 +450,17 @@ int ObJoinFilterOp::mark_not_need_send_bf_msg()
return ret;
}
// need add mark_not_need_send_bf_msg for shared shuffled bloom filter
// for create mode, need add mark_not_need_send_bf_msg for shared shuffled bloom filter
// for use mode, update_plan_monitor_info cause get_next_batch may not get iter end
int ObJoinFilterOp::inner_drain_exch()
{
int ret = OB_SUCCESS;
if (MY_SPEC.is_create_mode()) {
if (row_reach_end_ || batch_reach_end_) {
// already iter end, not need to mark not send or update_plan_monitor_info again (already done in get_next_row/batch)
} else if (MY_SPEC.is_create_mode()) {
ret = mark_not_need_send_bf_msg();
} else if (MY_SPEC.is_use_mode()) {
ret = update_plan_monitor_info();
}
return ret;
}
@ -764,6 +771,8 @@ int ObJoinFilterOp::update_plan_monitor_info()
op_monitor_info_.otherstat_5_value_ = MY_SPEC.filter_id_;
op_monitor_info_.otherstat_6_id_ = ObSqlMonitorStatIds::JOIN_FILTER_LENGTH;
op_monitor_info_.otherstat_6_value_ = MY_SPEC.filter_len_;
int64_t check_count = 0;
int64_t total_count = 0;
for (int i = 0; i < MY_SPEC.rf_infos_.count() && OB_SUCC(ret); ++i) {
if (OB_INVALID_ID != MY_SPEC.rf_infos_.at(i).filter_expr_id_) {
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx = NULL;
@ -772,12 +781,16 @@ int ObJoinFilterOp::update_plan_monitor_info()
LOG_TRACE("join filter expr ctx is null");
} else {
op_monitor_info_.otherstat_1_value_ += join_filter_ctx->filter_count_;
op_monitor_info_.otherstat_2_value_ += join_filter_ctx->total_count_;
op_monitor_info_.otherstat_3_value_ += join_filter_ctx->check_count_;
total_count = max(total_count, join_filter_ctx->total_count_);
check_count = max(check_count, join_filter_ctx->check_count_);
op_monitor_info_.otherstat_4_value_ = max(join_filter_ctx->ready_ts_, op_monitor_info_.otherstat_4_value_);
}
}
}
if (OB_SUCC(ret)) {
op_monitor_info_.otherstat_2_value_ += total_count;
op_monitor_info_.otherstat_3_value_ += check_count;
}
return ret;
}
@ -854,6 +867,8 @@ int ObJoinFilterOp::open_join_filter_use()
LOG_WARN("failed to assign hash_func");
} else if (OB_FAIL(join_filter_ctx->cmp_funcs_.assign(MY_SPEC.cmp_funcs_))) {
LOG_WARN("failed to assign cmp_funcs_");
} else if (OB_FAIL(join_filter_ctx->cur_row_.reserve(MY_SPEC.cmp_funcs_.count()))) {
LOG_WARN("failed to reserve cur_row_");
}
}
} else {

View File

@ -498,7 +498,8 @@ public:
p2p_dh_loc_(nullptr),
need_p2p_info_(false),
p2p_dh_map_info_(),
coord_info_ptr_(nullptr)
coord_info_ptr_(nullptr),
force_bushy_(false)
{
}

View File

@ -419,6 +419,9 @@ int ObDfoMgr::init(ObExecContext &exec_ctx,
} else if (OB_ISNULL(root_dfo_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL dfo unexpected", K(ret));
} else if (!px_coord_info.rf_dpd_info_.is_empty()
&& OB_FAIL(px_coord_info.rf_dpd_info_.describe_dependency(root_dfo_))) {
LOG_WARN("failed to describe rf dependency");
} else if (OB_FAIL(ObDfoSchedOrderGenerator::generate_sched_order(*this))) {
LOG_WARN("fail init dfo mgr", K(ret));
} else if (OB_FAIL(ObDfoSchedDepthGenerator::generate_sched_depth(exec_ctx, *this))) {
@ -521,10 +524,12 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
}
} else if (IS_PX_JOIN_FILTER(phy_op->get_type()) && NULL != parent_dfo) {
const ObJoinFilterSpec *filter_spec = static_cast<const ObJoinFilterSpec *>(phy_op);
if (filter_spec->is_create_mode() && !parent_dfo->force_bushy()) {
parent_dfo->set_force_bushy(true);
if (filter_spec->is_create_mode() && OB_FAIL(px_coord_info.rf_dpd_info_.rf_create_ops_.push_back(phy_op))) {
LOG_WARN("failed to push back create op");
} else if (filter_spec->is_use_mode() && OB_FAIL(px_coord_info.rf_dpd_info_.rf_use_ops_.push_back(phy_op))) {
LOG_WARN("failed to push back use op");
}
if(filter_spec->is_shared_join_filter() && filter_spec->is_shuffle_) {
if (OB_SUCC(ret) && filter_spec->is_shared_join_filter() && filter_spec->is_shuffle_) {
ObP2PDfoMapNode node;
node.target_dfo_id_ = parent_dfo->get_dfo_id();
for (int i = 0; i < filter_spec->rf_infos_.count() && OB_SUCC(ret); ++i) {

View File

@ -39,6 +39,7 @@ ObGIOpInput::ObGIOpInput(ObExecContext &ctx, const ObOpSpec &spec)
worker_id_(common::OB_INVALID_INDEX),
pump_(nullptr),
px_sequence_id_(OB_INVALID_ID),
rf_max_wait_time_(0),
deserialize_allocator_(nullptr)
{}
@ -176,6 +177,7 @@ ObGranuleIteratorOp::ObGranuleIteratorOp(ObExecContext &exec_ctx, const ObOpSpec
total_count_(0),
rf_msg_(NULL),
rf_key_(),
rf_start_wait_time_(0),
tablet2part_id_map_(),
real_child_(NULL),
is_parallel_runtime_filtered_(false)
@ -645,7 +647,9 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool &partition_pruning)
LOG_WARN("fail to fetch rescan pw task infos", K(ret));
}
} else if (OB_FAIL(fetch_normal_pw_task_infos(*op_ids_pointer, gi_prepare_map, gi_task_infos))) {
LOG_WARN("fail to fetch normal pw task infos", K(ret));
if (OB_ITER_END != ret) {
LOG_WARN("fail to fetch normal pw task infos", K(ret));
}
}
}
@ -663,13 +667,12 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool &partition_pruning)
return ret;
}
int ObGranuleIteratorOp::do_join_filter_partition_pruning(
int64_t tablet_id,
bool &partition_pruning)
{
int ObGranuleIteratorOp::wait_runtime_ready(bool &partition_pruning) {
int ret = OB_SUCCESS;
bool is_match = false;
if (0 == rf_start_wait_time_) {
rf_start_wait_time_ = ObTimeUtility::current_time();
}
ObGIOpInput *gi_input = static_cast<ObGIOpInput*>(input_);
while (OB_SUCC(ret) && (OB_ISNULL(rf_msg_) || !rf_msg_->check_ready())) {
if (OB_ISNULL(rf_msg_) && OB_FAIL(PX_P2P_DH.atomic_get_msg(rf_key_, rf_msg_))) {
if (OB_HASH_NOT_EXIST == ret) {
@ -686,13 +689,34 @@ int ObGranuleIteratorOp::do_join_filter_partition_pruning(
if (OB_SUCC(ret)) {
if (OB_ISNULL(rf_msg_) || !rf_msg_->check_ready()) {
if (MY_SPEC.bf_info_.is_shuffle_) {
// For shuffled msg, no waiting time is required.
partition_pruning = false;
break;
} else {
// For local messages, the maximum waiting time does not exceed rf_max_wait_time.
int64_t cur_time = ObTimeUtility::current_time();
if (cur_time - rf_start_wait_time_ > gi_input->get_rf_max_wait_time() * 1000) {
partition_pruning = false;
break;
} else {
ob_usleep(100);
}
}
}
}
}
return ret;
}
int ObGranuleIteratorOp::do_join_filter_partition_pruning(
int64_t tablet_id,
bool &partition_pruning)
{
int ret = OB_SUCCESS;
bool is_match = false;
if (OB_FAIL(wait_runtime_ready(partition_pruning))) {
LOG_WARN("failed to wait wait_runtime_ready");
}
if (OB_SUCC(ret) && OB_NOT_NULL(rf_msg_) && rf_msg_->check_ready()) {
uint64_t hash_val = ObExprJoinFilter::JOIN_FILTER_SEED;
if (MY_SPEC.bf_info_.skip_subpart_) {

View File

@ -59,6 +59,8 @@ public:
int64_t get_px_sequence_id() { return px_sequence_id_; }
void set_px_sequence_id(int64_t id) { px_sequence_id_ = id; }
int add_table_location_keys(common::ObIArray<const ObTableScanSpec*> &tscs);
int64_t get_rf_max_wait_time() { return rf_max_wait_time_; }
void set_rf_max_wait_time(int64_t rf_max_wait_time) { rf_max_wait_time_ = rf_max_wait_time; }
private:
int deep_copy_range(ObIAllocator *allocator, const ObNewRange &src, ObNewRange &dst);
public:
@ -72,6 +74,7 @@ public:
//for partition pruning
common::ObSEArray<uint64_t, 2> table_location_keys_;
int64_t px_sequence_id_;
int64_t rf_max_wait_time_;
private:
common::ObIAllocator *deserialize_allocator_;
};
@ -211,6 +214,7 @@ private:
bool enable_single_runtime_filter_pruning();
int do_single_runtime_filter_pruning(const ObGranuleTaskInfo &gi_task_info, bool &partition_pruning);
int do_parallel_runtime_filter_pruning();
int wait_runtime_ready(bool &partition_pruning);
int do_join_filter_partition_pruning(int64_t tablet_id, bool &partition_pruning);
int try_build_tablet2part_id_map();
//---end----
@ -241,6 +245,7 @@ private:
int64_t total_count_; // total partition count or block count processed, rescan included
ObP2PDatahubMsgBase *rf_msg_;
ObP2PDhKey rf_key_;
int64_t rf_start_wait_time_;
ObPxTablet2PartIdMap tablet2part_id_map_;
ObOperator *real_child_;
bool is_parallel_runtime_filtered_;

View File

@ -40,6 +40,7 @@
#include "sql/engine/px/ob_px_sqc_proxy.h"
#include "storage/tx/ob_trans_service.h"
#include "share/detect/ob_detect_manager_utils.h"
#include "sql/engine/join/ob_join_filter_op.h"
namespace oceanbase
{
@ -716,6 +717,42 @@ int ObPxTerminateMsgProc::startup_msg_loop(ObExecContext &ctx)
return ret;
}
int RuntimeFilterDependencyInfo::describe_dependency(ObDfo *root_dfo)
{
int ret = OB_SUCCESS;
// for each rf create op, find its pair rf use op,
// then get the lowest common ancestor of them, mark force_bushy of the dfo which the ancestor belongs to.
for (int64_t i = 0; i < rf_create_ops_.count() && OB_SUCC(ret); ++i) {
const ObJoinFilterSpec *create_op = static_cast<const ObJoinFilterSpec *>(rf_create_ops_.at(i));
for (int64_t j = 0; j < rf_use_ops_.count() && OB_SUCC(ret); ++j) {
const ObJoinFilterSpec *use_op = static_cast<const ObJoinFilterSpec *>(rf_use_ops_.at(j));
if (create_op->get_filter_id() == use_op->get_filter_id()) {
const ObOpSpec *ancestor_op = nullptr;
ObDfo *op_dfo = nullptr;;
if (OB_FAIL(LowestCommonAncestorFinder::find_op_common_ancestor(create_op, use_op, ancestor_op))) {
LOG_WARN("failed to find op common ancestor");
} else if (OB_ISNULL(ancestor_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("op common ancestor not found");
} else if (OB_FAIL(LowestCommonAncestorFinder::get_op_dfo(ancestor_op, root_dfo, op_dfo))) {
LOG_WARN("failed to find op common ancestor");
} else if (OB_ISNULL(op_dfo)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the dfo of ancestor_op not found");
} else {
// Once the DFO which the ancestor belongs to has set the flag "force_bushy",
// the DfoTreeNormalizer will not attempt to transform a right-deep DFO tree
// into a left-deep DFO tree. Consequently, the "join filter create" operator
// can be scheduled earlier than the "join filter use" operator.
op_dfo->set_force_bushy(true);
}
break;
}
}
}
return ret;
}
int ObPxCoordInfo::init()
{
int ret = OB_SUCCESS;

View File

@ -55,6 +55,26 @@ enum class TableAccessType {
HAS_USER_TABLE
};
// for runtime filter, jf create op must scheduled earlier than jf use op
struct RuntimeFilterDependencyInfo
{
public:
RuntimeFilterDependencyInfo() : rf_create_ops_(), rf_use_ops_() {}
~RuntimeFilterDependencyInfo() = default;
void destroy()
{
rf_create_ops_.reset();
rf_use_ops_.reset();
}
inline bool is_empty() const {
return rf_create_ops_.empty() && rf_use_ops_.empty();
}
int describe_dependency(ObDfo *root_dfo);
public:
ObTMArray<const ObOpSpec *> rf_create_ops_;
ObTMArray<const ObOpSpec *> rf_use_ops_;
};
struct ObP2PDfoMapNode
{
ObP2PDfoMapNode() : target_dfo_id_(OB_INVALID_ID), addrs_() {}
@ -103,7 +123,8 @@ public:
table_access_type_(TableAccessType::NO_TABLE),
qc_detectable_id_(),
p2p_dfo_map_(),
p2p_temp_table_info_()
p2p_temp_table_info_(),
rf_dpd_info_()
{}
virtual ~ObPxCoordInfo() {}
virtual void destroy()
@ -112,6 +133,7 @@ public:
piece_msg_ctx_mgr_.reset();
p2p_dfo_map_.destroy();
p2p_temp_table_info_.reset();
rf_dpd_info_.destroy();
}
void reset_for_rescan()
{
@ -153,6 +175,7 @@ public:
// key = p2p_dh_id value = dfo_id + target_addrs
hash::ObHashMap<int64_t, ObP2PDfoMapNode, hash::NoPthreadDefendMode> p2p_dfo_map_;
ObTempTableP2PInfo p2p_temp_table_info_;
RuntimeFilterDependencyInfo rf_dpd_info_;
};
class ObDfoSchedulerBasic;

View File

@ -613,6 +613,9 @@ int ObPxTaskProcess::OpPreparation::apply(ObExecContext &ctx,
} else {
input->set_worker_id(task_id_);
input->set_px_sequence_id(task_->px_int_id_.px_interrupt_id_.first_);
if (OB_NOT_NULL(ctx.get_my_session())) {
input->set_rf_max_wait_time(ctx.get_my_session()->get_runtime_filter_wait_time_ms());
}
if (ObGranuleUtil::pwj_gi(gi->gi_attri_flag_)) {
pw_gi_spec_ = gi;
on_set_tscs_ = true;

View File

@ -3835,4 +3835,80 @@ bool ObPxCheckAlive::is_in_blacklist(const common::ObAddr &addr, int64_t server_
LOG_WARN("server in blacklist", K(addr), K(server_start_time), K(alive_data.start_service_time_));
}
return in_blacklist;
}
}
int LowestCommonAncestorFinder::find_op_common_ancestor(
const ObOpSpec *left, const ObOpSpec *right, const ObOpSpec *&ancestor)
{
int ret = OB_SUCCESS;
ObSEArray<const ObOpSpec *, 32> ancestors;
const ObOpSpec *parent = left;
while (OB_NOT_NULL(parent) && OB_SUCC(ret)) {
if (OB_FAIL(ancestors.push_back(parent))) {
LOG_WARN("failed to push back");
} else {
parent = parent->get_parent();
}
}
parent = right;
bool find = false;
while (OB_NOT_NULL(parent) && OB_SUCC(ret) && !find) {
for (int64_t i = 0; i < ancestors.count() && OB_SUCC(ret); ++i) {
if (parent == ancestors.at(i)) {
find = true;
ancestor = parent;
break;
}
}
parent = parent->get_parent();
}
return ret;
}
int LowestCommonAncestorFinder::get_op_dfo(const ObOpSpec *op, ObDfo *root_dfo, ObDfo *&op_dfo)
{
int ret = OB_SUCCESS;
const ObOpSpec *parent = op;
const ObOpSpec *dfo_root_op = nullptr;
while (OB_NOT_NULL(parent) && OB_SUCC(ret)) {
if (IS_PX_COORD(parent->type_) || IS_PX_TRANSMIT(parent->type_)) {
dfo_root_op = parent;
break;
} else {
parent = parent->get_parent();
}
}
ObDfo *dfo = nullptr;
bool find = false;
ObSEArray<ObDfo *, 16> dfo_queue;
int64_t cur_que_front = 0;
if (OB_FAIL(dfo_queue.push_back(root_dfo))) {
LOG_WARN("failed to push back");
}
while (cur_que_front < dfo_queue.count() && !find && OB_SUCC(ret)) {
int64_t cur_que_size = dfo_queue.count() - cur_que_front;
for (int64_t i = 0; i < cur_que_size && OB_SUCC(ret); ++i) {
dfo = dfo_queue.at(cur_que_front);
if (dfo->get_root_op_spec() == dfo_root_op) {
op_dfo = dfo;
find = true;
break;
} else {
// push child into the queue
for (int64_t child_idx = 0; OB_SUCC(ret) && child_idx < dfo->get_child_count(); ++child_idx) {
if (OB_FAIL(dfo_queue.push_back(dfo->get_child_dfos().at(child_idx)))) {
LOG_WARN("failed to push back child dfo");
}
}
}
if (OB_SUCC(ret)) {
cur_que_front++;
}
}
}
return ret;
}

View File

@ -665,6 +665,14 @@ static int get_location_addrs(const T &locations,
return ret;
}
class LowestCommonAncestorFinder
{
public:
static int find_op_common_ancestor(
const ObOpSpec *left, const ObOpSpec *right, const ObOpSpec *&ancestor);
static int get_op_dfo(const ObOpSpec *op, ObDfo *root_dfo, ObDfo *&op_dfo);
};
}
}

View File

@ -188,18 +188,6 @@ int ObP2PDatahubManager::P2PMsgMergeCall::operator() (common::hash::HashMapPair<
return ret;
}
int ObP2PDatahubManager::P2PRegenerateCall::operator() (common::hash::HashMapPair<ObP2PDhKey,
ObP2PDatahubMsgBase *> &entry)
{
int ret = OB_SUCCESS;
if (OB_FAIL(entry.second->regenerate())) {
LOG_WARN("fail to do regenerate", K(ret));
}
ret_ = ret;
return ret;
}
int ObP2PDatahubManager::send_local_msg(ObP2PDatahubMsgBase *msg)
{
int ret = OB_SUCCESS;
@ -212,7 +200,7 @@ int ObP2PDatahubManager::send_local_msg(ObP2PDatahubMsgBase *msg)
msg->get_task_id(),
ObTimeUtility::current_time(), msg->get_timeout_ts());
if (OB_FAIL(map_.set_refactored(dh_key, msg))) {
LOG_TRACE("fail to insert p2p dh msg", K(ret));
LOG_TRACE("fail to insert p2p dh msg", K(ret), K(dh_key));
} else {
msg->set_is_ready(true);
}
@ -266,7 +254,10 @@ int ObP2PDatahubManager::erase_msg_if(ObP2PDhKey &dh_key,
}
PX_P2P_DH.free_msg(msg);
} else {
ret = OB_ERR_UNEXPECTED;
// If erase failed, means other threads still referencing the msg.
// If the caller is an RPC thread, the clearing task will be delegated to DM;
// If the caller is DM, the retry policy is utilized to ensure that the message is deleted.
ret = OB_EAGAIN;
LOG_WARN("failed to erase msg, other threads still referencing it", K(dh_key), K(need_unreg_dm));
}
return ret;
@ -368,7 +359,7 @@ void ObP2PDatahubManager::P2PMsgGetCall::operator() (common::hash::HashMapPair<O
if (OB_NOT_NULL(dh_msg_)) {
dh_msg_->inc_ref_count();
} else {
int ret = OB_UNEXPECT_INTERNAL_ERROR;
int ret = OB_ERR_UNEXPECTED;
LOG_WARN("dh_msg_ is null", K(ret));
}
}
@ -383,7 +374,7 @@ bool ObP2PDatahubManager::P2PMsgEraseIfCall::operator() (common::hash::HashMapPa
need_erase = true;
}
} else {
int ret = OB_UNEXPECT_INTERNAL_ERROR;
int ret = OB_ERR_UNEXPECTED;
LOG_WARN("dh_msg_ is null", K(ret));
}
return need_erase;

View File

@ -36,14 +36,7 @@ public:
ObP2PDatahubMsgBase &dh_msg_;
bool need_free_;
};
struct P2PRegenerateCall
{
P2PRegenerateCall(ObP2PDatahubMsgBase &db_msg) : ret_(OB_SUCCESS), dh_msg_(db_msg) {};
~P2PRegenerateCall() = default;
int operator() (common::hash::HashMapPair<ObP2PDhKey, ObP2PDatahubMsgBase *> &entry);
int ret_;
ObP2PDatahubMsgBase &dh_msg_;
};
struct P2PMsgGetCall
{
P2PMsgGetCall(ObP2PDatahubMsgBase *&db_msg) : dh_msg_(db_msg), ret_(OB_SUCCESS) {};

View File

@ -93,8 +93,8 @@ int ObP2PDatahubMsgBase::process_receive_count(ObP2PDatahubMsgBase &msg)
int ret = OB_SUCCESS;
CK(msg.get_msg_receive_expect_cnt() > 0 && msg_receive_expect_cnt_ > 0);
if (OB_SUCC(ret)) {
ATOMIC_AAF(&msg_receive_cur_cnt_, msg.get_msg_receive_cur_cnt());
if (msg_receive_cur_cnt_ > msg_receive_expect_cnt_) {
int64_t cur_cnt = ATOMIC_AAF(&msg_receive_cur_cnt_, msg.get_msg_receive_cur_cnt());
if (cur_cnt > msg_receive_expect_cnt_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected receive count", K(ret));
}
@ -105,10 +105,8 @@ int ObP2PDatahubMsgBase::process_receive_count(ObP2PDatahubMsgBase &msg)
void ObP2PDatahubMsgBase::check_finish_receive()
{
if (is_active_) {
if (msg_receive_cur_cnt_ == msg_receive_expect_cnt_) {
is_ready_ = true;
}
if (msg_receive_expect_cnt_ == ATOMIC_LOAD(&msg_receive_cur_cnt_)) {
is_ready_ = true;
}
}

View File

@ -86,7 +86,7 @@ public:
virtual int process_msg_internal(bool &need_free);
virtual int reuse() { return OB_SUCCESS; }
virtual int regenerate() { return OB_SUCCESS; }
void check_finish_receive();
virtual void check_finish_receive();
bool check_ready() const { return is_ready_; }
ObP2PDatahubMsgType get_msg_type() const { return msg_type_; }
void set_msg_type(ObP2PDatahubMsgType type) { msg_type_ = type; }
@ -137,7 +137,7 @@ protected:
common::ObArenaAllocator allocator_;
int64_t msg_receive_cur_cnt_;
int64_t msg_receive_expect_cnt_;
bool is_active_;
bool is_active_; //only for ObRFInFilterMsg, when NDV>1024, set is_active_ = false;
bool is_ready_;
bool is_empty_;
int64_t ref_count_;

View File

@ -207,7 +207,6 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free)
int ret = OB_SUCCESS;
ObP2PDhKey dh_key(p2p_datahub_id_, px_sequence_id_, task_id_);
ObP2PDatahubManager::P2PMsgSetCall set_call(dh_key, *this);
ObP2PDatahubManager::P2PMsgMergeCall merge_call(*this);
ObP2PDatahubManager::MsgMap &map = PX_P2P_DH.get_map();
start_time_ = ObTimeUtility::current_time();
@ -231,10 +230,19 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free)
// merge piece bloom filter
if (OB_SUCC(ret) && need_merge) {
if (OB_FAIL(map.read_atomic(dh_key, merge_call))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to merge p2p dh msg", K(ret));
}
// for bloom filter msg, we can merge several msgs concurrently in an atomic manner without holding the map lock.
// thus, we need handle the reference count carefully here to make sure the msg not been destroyed during the merge process.
ObP2PDatahubMsgBase *rf_msg_in_map = nullptr;
ObRFBloomFilterMsg *bf_msg = nullptr;
if (OB_FAIL(PX_P2P_DH.atomic_get_msg(dh_key, rf_msg_in_map))) { // inc ref_count is integrated
LOG_WARN("fail to get msg", K(ret));
} else if (FALSE_IT(bf_msg = static_cast<ObRFBloomFilterMsg *>(rf_msg_in_map))) {
} else if (OB_FAIL(bf_msg->atomic_merge(*this))) {
LOG_WARN("fail to merge p2p dh msg", K(ret));
}
if (OB_NOT_NULL(rf_msg_in_map)) {
// after merge, dec ref_count
rf_msg_in_map->dec_ref_count();
}
}
if (OB_SUCC(ret) && !need_merge) {
@ -287,10 +295,11 @@ int ObRFBloomFilterMsg::process_first_phase_recieve_count(
int ret = OB_SUCCESS;
CK(msg.get_msg_receive_expect_cnt() > 0 && msg_receive_expect_cnt_ > 0);
int64_t begin_idx = msg.bloom_filter_.get_begin_idx();
ATOMIC_INC(&msg_receive_cur_cnt_);
if (msg_receive_cur_cnt_ > msg_receive_expect_cnt_) {
// msg_receive_cur_cnt_ is msg total cnt, msg_receive_expect_cnt_ equals to sqc_count * peice_count
int64_t received_cnt = ATOMIC_AAF(&msg_receive_cur_cnt_, 1);
if (received_cnt > msg_receive_expect_cnt_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to process recieve count", K(ret), K(msg_receive_cur_cnt_),
LOG_WARN("fail to process receive count", K(ret), K(received_cnt),
K(msg_receive_expect_cnt_));
} else if (receive_count_array_.empty()) {
ret = OB_ERR_UNEXPECTED;
@ -299,6 +308,7 @@ int ObRFBloomFilterMsg::process_first_phase_recieve_count(
bool find = false;
for (int i = 0; OB_SUCC(ret) && i < receive_count_array_.count(); ++i) {
if (begin_idx == receive_count_array_.at(i).begin_idx_) {
// receive count of a specific peice msg, expect_first_phase_count_ equals to sqc count
int64_t cur_count = ATOMIC_AAF(&receive_count_array_.at(i).reciv_count_, 1);
first_phase_end = (cur_count == expect_first_phase_count_);
find = true;
@ -445,6 +455,20 @@ int ObRFBloomFilterMsg::regenerate()
return ret;
}
int ObRFBloomFilterMsg::atomic_merge(ObP2PDatahubMsgBase &other_msg)
{
int ret = OB_SUCCESS;
if (!other_msg.is_empty() && (OB_FAIL(merge(other_msg)))) {
LOG_WARN("fail to merge dh msg", K(ret));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(process_receive_count(other_msg))) {
LOG_WARN("fail to process receive count", K(ret));
}
return ret;
}
// the merge process of bloom_filter_ is atomic by using CAS
int ObRFBloomFilterMsg::merge(ObP2PDatahubMsgBase &msg)
{
int ret = OB_SUCCESS;
@ -524,19 +548,24 @@ int ObRFBloomFilterMsg::might_contain_batch(
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
uint64_t *hash_values = reinterpret_cast<uint64_t *>(
ctx.frames_[expr.frame_idx_] + expr.res_buf_off_);
int64_t total_count = 0;
int64_t filter_count = 0;
if (OB_UNLIKELY(is_empty_)) {
if (OB_FAIL(ObBitVector::flip_foreach(skip, batch_size,
[&](int64_t idx) __attribute__((always_inline)) {
int ret = OB_SUCCESS;
eval_flags.set(idx);
results[idx].set_int(0);
++filter_ctx.filter_count_;
++filter_ctx.check_count_;
++filter_ctx.total_count_;
return ret;
++filter_count;
++total_count;
return OB_SUCCESS;
}))) {
LOG_WARN("fail to do for each operation", K(ret));
}
if (OB_SUCC(ret)) {
eval_flags.set_all(true);
filter_ctx.filter_count_ += filter_count;
filter_ctx.check_count_ += total_count;
filter_ctx.total_count_ += total_count;
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < expr.arg_cnt_; ++i) {
ObExpr *e = expr.args_[i];
@ -559,18 +588,21 @@ int ObRFBloomFilterMsg::might_contain_batch(
}))) {
} else if (OB_FAIL(ObBitVector::flip_foreach(skip, batch_size,
[&](int64_t idx) __attribute__((always_inline)) {
ret = bloom_filter_.might_contain(hash_values[idx], is_match);
if (OB_SUCC(ret)) {
filter_ctx.filter_count_ += !is_match;
eval_flags.set(idx);
int tmp_ret = bloom_filter_.might_contain(hash_values[idx], is_match);
if (OB_SUCCESS == tmp_ret) {
filter_count += !is_match;
++total_count;
results[idx].set_int(is_match);
ObExprJoinFilter::collect_sample_info(&filter_ctx, is_match);
++filter_ctx.check_count_;
++filter_ctx.total_count_;
}
return ret;
return tmp_ret;
}))) {
LOG_WARN("failed to process prefetch block", K(ret));
} else {
eval_flags.set_all(true);
filter_ctx.filter_count_ += filter_count;
filter_ctx.check_count_ += total_count;
filter_ctx.total_count_ += total_count;
ObExprJoinFilter::collect_sample_info_batch(filter_ctx, filter_count, total_count);
}
}
return ret;
@ -1163,6 +1195,61 @@ int ObRFRangeFilterMsg::might_contain(const ObExpr &expr,
return ret;
}
int ObRFRangeFilterMsg::do_might_contain_batch(const ObExpr &expr,
ObEvalCtx &ctx,
const ObBitVector &skip,
const int64_t batch_size,
ObExprJoinFilter::ObExprJoinFilterContext &filter_ctx) {
int ret = OB_SUCCESS;
int64_t filter_count = 0;
int64_t total_count = 0;
ObDatum *results = expr.locate_batch_datums(ctx);
for (int idx = 0; OB_SUCC(ret) && idx < expr.arg_cnt_; ++idx) {
if (OB_FAIL(expr.args_[idx]->eval_batch(ctx, skip, batch_size))) {
LOG_WARN("eval failed", K(ret));
}
}
if (OB_SUCC(ret)) {
int cmp_min = 0;
int cmp_max = 0;
ObDatum *datum = nullptr;
bool is_match = true;
for (int64_t batch_i = 0; OB_SUCC(ret) && batch_i < batch_size; ++batch_i) {
if (skip.at(batch_i)) {
continue;
}
cmp_min = 0;
cmp_max = 0;
is_match = true;
total_count++;
for (int arg_i = 0; OB_SUCC(ret) && arg_i < expr.arg_cnt_; ++arg_i) {
datum = &expr.args_[arg_i]->locate_expr_datum(ctx, batch_i);
if (OB_FAIL(filter_ctx.cmp_funcs_.at(arg_i).cmp_func_(*datum, lower_bounds_.at(arg_i), cmp_min))) {
LOG_WARN("fail to compare value", K(ret));
} else if (cmp_min < 0) {
filter_count++;
is_match = false;
break;
} else if (OB_FAIL(filter_ctx.cmp_funcs_.at(arg_i).cmp_func_(*datum, upper_bounds_.at(arg_i), cmp_max))) {
LOG_WARN("fail to compare value", K(ret));
} else if (cmp_max > 0) {
filter_count++;
is_match = false;
break;
}
}
results[batch_i].set_int(is_match ? 1 : 0);
}
}
if (OB_SUCC(ret)) {
filter_ctx.filter_count_ += filter_count;
filter_ctx.total_count_ += total_count;
filter_ctx.check_count_ += total_count;
ObExprJoinFilter::collect_sample_info_batch(filter_ctx, filter_count, total_count);
}
return ret;
}
int ObRFRangeFilterMsg::might_contain_batch(
const ObExpr &expr,
ObEvalCtx &ctx,
@ -1175,25 +1262,15 @@ int ObRFRangeFilterMsg::might_contain_batch(
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx);
batch_info_guard.set_batch_size(batch_size);
for (int64_t i = 0; OB_SUCC(ret) && i < batch_size; ++i) {
if (skip.at(i)) {
continue;
} else if (OB_UNLIKELY(is_empty_)) {
if (OB_UNLIKELY(is_empty_)) {
for (int64_t i = 0; i < batch_size; i++) {
results[i].set_int(0);
eval_flags.set(i);
++filter_ctx.filter_count_;
++filter_ctx.check_count_;
++filter_ctx.total_count_;
} else {
batch_info_guard.set_batch_idx(i);
ObDatum &result = results[i];
if (OB_FAIL(might_contain(expr, ctx, filter_ctx, result))) {
LOG_WARN("fail to check expr value", K(ret));
} else {
++filter_ctx.total_count_;
eval_flags.set(i);
}
}
} else if (OB_FAIL(do_might_contain_batch(expr, ctx, skip, batch_size, filter_ctx))) {
LOG_WARN("failed to do_might_contain_batch");
}
if (OB_SUCC(ret)) {
eval_flags.set_all(batch_size);
}
return ret;
}
@ -1454,7 +1531,8 @@ int ObRFInFilterMsg::might_contain(const ObExpr &expr,
ObDatum *datum = nullptr;
bool is_match = true;
uint64_t hash_val = ObExprJoinFilter::JOIN_FILTER_SEED;
ObSEArray<ObDatum, 4> cur_row;
ObIArray<ObDatum> &cur_row = filter_ctx.cur_row_;
cur_row.reuse();
if (OB_UNLIKELY(!is_active_)) {
res.set_int(1);
} else if (OB_UNLIKELY(is_empty_)) {
@ -1512,6 +1590,77 @@ int ObRFInFilterMsg::reuse()
return ret;
}
void ObRFInFilterMsg::check_finish_receive()
{
if (ATOMIC_LOAD(&is_active_)) {
if (msg_receive_expect_cnt_ == ATOMIC_LOAD(&msg_receive_cur_cnt_)) {
is_ready_ = true;
}
}
}
int ObRFInFilterMsg::do_might_contain_batch(const ObExpr &expr,
ObEvalCtx &ctx,
const ObBitVector &skip,
const int64_t batch_size,
ObExprJoinFilter::ObExprJoinFilterContext &filter_ctx) {
int ret = OB_SUCCESS;
int64_t filter_count = 0;
int64_t total_count = 0;
uint64_t *right_hash_vals = reinterpret_cast<uint64_t *>(
ctx.frames_[expr.frame_idx_] + expr.res_buf_off_);
uint64_t seed = ObExprJoinFilter::JOIN_FILTER_SEED;
for (int idx = 0; OB_SUCC(ret) && idx < expr.arg_cnt_; ++idx) {
if (OB_FAIL(expr.args_[idx]->eval_batch(ctx, skip, batch_size))) {
LOG_WARN("eval failed", K(ret));
} else {
const bool is_batch_seed = (idx > 0);
ObBatchDatumHashFunc hash_func = filter_ctx.hash_funcs_.at(idx).batch_hash_func_;
hash_func(right_hash_vals,
expr.args_[idx]->locate_batch_datums(ctx), expr.args_[idx]->is_batch_result(),
skip, batch_size,
is_batch_seed ? right_hash_vals : &seed,
is_batch_seed);
}
}
ObIArray<ObDatum> &cur_row = filter_ctx.cur_row_;
ObRFInFilterNode node(&filter_ctx.cmp_funcs_, nullptr, &cur_row, 0);
ObDatum *res_datums = expr.locate_batch_datums(ctx);
for (int64_t batch_i = 0; OB_SUCC(ret) && batch_i < batch_size; ++batch_i) {
if (skip.at(batch_i)) {
continue;
}
cur_row.reuse();
total_count++;
node.hash_val_ = right_hash_vals[batch_i];
for (int64_t arg_i = 0; OB_SUCC(ret) && arg_i < expr.arg_cnt_; ++arg_i) {
if (OB_FAIL(cur_row.push_back(expr.args_[arg_i]->locate_expr_datum(ctx, batch_i)))) {
LOG_WARN("failed to push back datum", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(rows_set_.exist_refactored(node))) {
if (OB_HASH_NOT_EXIST == ret) {
res_datums[batch_i].set_int(0);
filter_count++;
ret = OB_SUCCESS;
} else if (OB_HASH_EXIST == ret) {
res_datums[batch_i].set_int(1);
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to check node", K(ret));
}
}
}
if (OB_SUCC(ret)) {
filter_ctx.filter_count_ += filter_count;
filter_ctx.total_count_ += total_count;
filter_ctx.check_count_ += total_count;
ObExprJoinFilter::collect_sample_info_batch(filter_ctx, filter_count, total_count);
}
return ret;
}
int ObRFInFilterMsg::might_contain_batch(
const ObExpr &expr,
ObEvalCtx &ctx,
@ -1524,28 +1673,19 @@ int ObRFInFilterMsg::might_contain_batch(
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx);
batch_info_guard.set_batch_size(batch_size);
for (int64_t i = 0; OB_SUCC(ret) && i < batch_size; ++i) {
if (skip.at(i)) {
continue;
} else if (OB_UNLIKELY(!is_active_)) {
eval_flags.set(i);
if (!is_active_) {
for (int64_t i = 0; i < batch_size; i++) {
results[i].set_int(1);
} else if (OB_UNLIKELY(is_empty_)) {
eval_flags.set(i);
results[i].set_int(0);
++filter_ctx.filter_count_;
++filter_ctx.check_count_;
++filter_ctx.total_count_;
} else {
batch_info_guard.set_batch_idx(i);
ObDatum *result = &results[i];
if (OB_FAIL(might_contain(expr, ctx, filter_ctx, *result))) {
LOG_WARN("fail to check expr value", K(ret));
} else {
++filter_ctx.total_count_;
eval_flags.set(i);
}
}
} else if (OB_UNLIKELY(is_empty_)) {
for (int64_t i = 0; i < batch_size; i++) {
results[i].set_int(0);
}
} else if (OB_FAIL(do_might_contain_batch(expr, ctx, skip, batch_size, filter_ctx))) {
LOG_WARN("failed to do_might_contain_batch");
}
if (OB_SUCC(ret)) {
eval_flags.set_all(batch_size);
}
return ret;
}

View File

@ -79,6 +79,7 @@ public:
ObRFBloomFilterMsg &msg, bool &first_phase_end);
virtual int process_msg_internal(bool &need_free);
virtual int regenerate() override;
int atomic_merge(ObP2PDatahubMsgBase &other_msg);
private:
int calc_hash_value(
const common::ObIArray<ObExpr *> &expr_array,
@ -162,6 +163,13 @@ private:
int get_min(ObCmpFunc &func, ObDatum &l, ObDatum &r, int64_t &cell_size);
int get_max(ObCmpFunc &func, ObDatum &l, ObDatum &r, int64_t &cell_size);
int dynamic_copy_cell(const ObDatum &src, ObDatum &target, int64_t &cell_size);
// only used in might_contain_batch,
// without adding filter_count, total_count, check_count in filter_ctx
int do_might_contain_batch(const ObExpr &expr,
ObEvalCtx &ctx,
const ObBitVector &skip,
const int64_t batch_size,
ObExprJoinFilter::ObExprJoinFilterContext &filter_ctx);
public:
ObFixedArray<ObDatum, common::ObIAllocator> lower_bounds_;
@ -221,9 +229,17 @@ public:
ObEvalCtx &eval_ctx,
uint64_t *batch_hash_values) override;
virtual int reuse() override;
void check_finish_receive() override final;
private:
int append_row();
int insert_node();
// only used in might_contain_batch,
// without adding filter_count, total_count, check_count in filter_ctx
int do_might_contain_batch(const ObExpr &expr,
ObEvalCtx &ctx,
const ObBitVector &skip,
const int64_t batch_size,
ObExprJoinFilter::ObExprJoinFilterContext &filter_ctx);
public:
hash::ObHashSet<ObRFInFilterNode, hash::NoPthreadDefendMode> rows_set_;
ObCmpFuncs cmp_funcs_;