fix bug: thread count in compile and execute time not consistent

This commit is contained in:
obdev
2023-09-21 02:47:53 +00:00
committed by ob-robot
parent 550c8ffaaf
commit 062982af03
9 changed files with 206 additions and 28 deletions

View File

@ -518,6 +518,8 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
gi_spec->bf_info_.p2p_dh_id_,
node))) {
LOG_WARN("fail to set p2p dh id to map", K(ret));
} else if (OB_FAIL(px_coord_info.rf_dpd_info_.rf_use_ops_.push_back(phy_op))) {
LOG_WARN("failed to push back parition filter gi op");
} else {
parent_dfo->set_need_p2p_info(true);
}

View File

@ -482,7 +482,7 @@ OB_DEF_SERIALIZE_SIZE(ObPxBloomFilter)
int ObPxBFStaticInfo::init(int64_t tenant_id, int64_t filter_id,
int64_t server_id, bool is_shared,
bool skip_subpart, int64_t p2p_dh_id,
bool is_shuffle)
bool is_shuffle, ObLogJoinFilter *log_join_filter_create_op)
{
int ret = OB_SUCCESS;
if (is_inited_){
@ -496,6 +496,7 @@ int ObPxBFStaticInfo::init(int64_t tenant_id, int64_t filter_id,
skip_subpart_ = skip_subpart;
p2p_dh_id_ = p2p_dh_id;
is_shuffle_ = is_shuffle;
log_join_filter_create_op_ = log_join_filter_create_op;
is_inited_ = true;
}
return ret;

View File

@ -130,6 +130,8 @@ public:
DISALLOW_COPY_AND_ASSIGN(ObPxBloomFilter);
};
class ObLogJoinFilter;
struct ObPxBFStaticInfo
{
OB_UNIS_VERSION(1);
@ -138,12 +140,12 @@ public:
: is_inited_(false), tenant_id_(common::OB_INVALID_TENANT_ID),
filter_id_(common::OB_INVALID_ID), server_id_(common::OB_INVALID_ID),
is_shared_(false), skip_subpart_(false),
p2p_dh_id_(OB_INVALID_ID), is_shuffle_(false)
p2p_dh_id_(OB_INVALID_ID), is_shuffle_(false), log_join_filter_create_op_(nullptr)
{}
int init(int64_t tenant_id, int64_t filter_id,
int64_t server_id, bool is_shared,
bool skip_subpart, int64_t p2p_dh_id,
bool is_shuffle);
bool is_shuffle, ObLogJoinFilter *log_join_filter_create_op);
bool is_inited_;
int64_t tenant_id_;
int64_t filter_id_;
@ -155,6 +157,7 @@ public:
TO_STRING_KV(K(is_inited_), K(tenant_id_), K(filter_id_),
K(server_id_), K(is_shared_), K(skip_subpart_),
K(is_shuffle_), K(p2p_dh_id_));
ObLogJoinFilter *log_join_filter_create_op_; // not need to serialize, only used in optimizor
};
class ObPXBloomFilterHashWrapper

View File

@ -727,11 +727,20 @@ int RuntimeFilterDependencyInfo::describe_dependency(ObDfo *root_dfo)
for (int64_t i = 0; i < rf_create_ops_.count() && OB_SUCC(ret); ++i) {
const ObJoinFilterSpec *create_op = static_cast<const ObJoinFilterSpec *>(rf_create_ops_.at(i));
for (int64_t j = 0; j < rf_use_ops_.count() && OB_SUCC(ret); ++j) {
int64_t use_filter_id = common::OB_INVALID_ID;
if (IS_PX_GI(rf_use_ops_.at(j)->get_type())) {
const ObGranuleIteratorSpec *use_op =
static_cast<const ObGranuleIteratorSpec *>(rf_use_ops_.at(j));
use_filter_id = use_op->bf_info_.filter_id_;
} else {
const ObJoinFilterSpec *use_op = static_cast<const ObJoinFilterSpec *>(rf_use_ops_.at(j));
if (create_op->get_filter_id() == use_op->get_filter_id()) {
use_filter_id = use_op->get_filter_id();
}
if (create_op->get_filter_id() == use_filter_id) {
const ObOpSpec *ancestor_op = nullptr;
ObDfo *op_dfo = nullptr;;
if (OB_FAIL(LowestCommonAncestorFinder::find_op_common_ancestor(create_op, use_op, ancestor_op))) {
if (OB_FAIL(LowestCommonAncestorFinder::find_op_common_ancestor(
create_op, rf_use_ops_.at(j), ancestor_op))) {
LOG_WARN("failed to find op common ancestor");
} else if (OB_ISNULL(ancestor_op)) {
ret = OB_ERR_UNEXPECTED;

View File

@ -30,8 +30,9 @@ public:
ObLogJoinFilter(ObLogPlan &plan) :
ObLogicalOperator(plan), is_create_(false),
filter_id_(common::OB_INVALID_ID),
filter_len_(0), join_exprs_(),
is_use_filter_shuffle_(false),
filter_len_(0), paired_join_filter_(nullptr),
join_exprs_(), is_use_filter_shuffle_(false),
join_filter_cmp_funcs_(),
join_filter_exprs_(),
join_filter_types_(),
p2p_sequence_ids_(),
@ -60,6 +61,8 @@ public:
}
}
inline int64_t get_filter_length() const { return filter_len_; }
inline void set_paired_join_filter(ObLogicalOperator *paired_join_filter) { paired_join_filter_ = paired_join_filter; }
inline ObLogicalOperator *get_paired_join_filter() const { return paired_join_filter_; }
inline void set_is_use_filter_shuffle(bool flag) { is_use_filter_shuffle_ = flag; }
inline bool is_use_filter_shuffle() { return is_use_filter_shuffle_; }
inline bool is_partition_filter() const
@ -109,6 +112,9 @@ private:
bool is_create_; //判断是否是create算子
int64_t filter_id_; //设置filter_id
int64_t filter_len_; //设置filter长度
// if this is a join filter create op, the paired_join_filter_ is join filter use op, vice versa
// if this is a partition join filter create op, the paired_join_filter_ is partition filter gi
ObLogicalOperator *paired_join_filter_;
//equal join condition expr
common::ObSEArray<ObRawExpr *, 8, common::ModulePageAllocator, true> join_exprs_;
bool is_use_filter_shuffle_; // 标记use端filter是否有shuffle

View File

@ -4034,6 +4034,8 @@ int ObLogicalOperator::allocate_granule_nodes_above(AllocGIContext &ctx)
} else {
gi_op->set_tablet_id_expr(tablet_id_expr);
gi_op->set_join_filter_info(table_scan->get_join_filter_info());
ObLogJoinFilter *jf_create_op = gi_op->get_join_filter_info().log_join_filter_create_op_;
jf_create_op->set_paired_join_filter(gi_op);
gi_op->add_flag(GI_USE_PARTITION_FILTER);
}
} else if (LOG_GROUP_BY == get_type()) {
@ -4768,7 +4770,7 @@ int ObLogicalOperator::allocate_partition_join_filter(const ObIArray<JoinFilterI
join_filter_create->is_shared_join_filter(),
info.skip_subpart_,
join_filter_create->get_p2p_sequence_ids().at(0),
right_has_exchange));
right_has_exchange, join_filter_create));
scan_op->set_join_filter_info(bf_info);
scan_op->set_part_join_filter_created(true);
filter_id++;
@ -4821,6 +4823,8 @@ int ObLogicalOperator::allocate_normal_join_filter(const ObIArray<JoinFilterInfo
} else {
join_filter_create = static_cast<ObLogJoinFilter *>(filter_create);
join_filter_use = static_cast<ObLogJoinFilter *>(filter_use);
join_filter_create->set_paired_join_filter(join_filter_use);
join_filter_use->set_paired_join_filter(join_filter_create);
join_filter_create->set_is_create_filter(true);
join_filter_use->set_is_create_filter(false);
join_filter_create->set_filter_id(filter_id);

View File

@ -829,6 +829,11 @@ public:
return parent_;
}
inline const ObLogicalOperator *get_parent() const
{
return parent_;
}
int get_parent(ObLogicalOperator *root, ObLogicalOperator *&parent);
/**
* 目前优化器使用两阶段来生成计划:
@ -1022,7 +1027,7 @@ public:
id_ = id;
}
inline uint64_t get_op_id() { return op_id_; }
inline uint64_t get_op_id() const { return op_id_; }
inline void set_op_id(uint64_t op_id) { op_id_ = op_id; }
inline bool is_partition_wise() const { return is_partition_wise_; }
inline void set_is_partition_wise(bool is_partition_wise)

View File

@ -72,6 +72,42 @@ int SchedOrderGenerator::generate(
// ===================================================================================
int LogRuntimeFilterDependencyInfo::describe_dependency(DfoInfo *root_dfo)
{
int ret = OB_SUCCESS;
// for each rf create op, find its pair rf use op,
// then get the lowest common ancestor of them, mark force_bushy of the dfo which the ancestor belongs to.
for (int64_t i = 0; i < rf_create_ops_.count() && OB_SUCC(ret); ++i) {
const ObLogJoinFilter *create_op = static_cast<const ObLogJoinFilter *>(rf_create_ops_.at(i));
const ObLogicalOperator *use_op = create_op->get_paired_join_filter();
if (OB_ISNULL(use_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("use_op is null");
} else {
const ObLogicalOperator *ancestor_op = nullptr;
DfoInfo *op_dfo = nullptr;;
if (OB_FAIL(LogLowestCommonAncestorFinder::find_op_common_ancestor(create_op, use_op, ancestor_op))) {
LOG_WARN("failed to find op common ancestor");
} else if (OB_ISNULL(ancestor_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("op common ancestor not found");
} else if (OB_FAIL(LogLowestCommonAncestorFinder::get_op_dfo(ancestor_op, root_dfo, op_dfo))) {
LOG_WARN("failed to find op common ancestor");
} else if (OB_ISNULL(op_dfo)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the dfo of ancestor_op not found");
} else {
// Once the DFO which the ancestor belongs to has set the flag "force_bushy",
// the DfoTreeNormalizer will not attempt to transform a right-deep DFO tree
// into a left-deep DFO tree. Consequently, the "join filter create" operator
// can be scheduled earlier than the "join filter use" operator.
op_dfo->set_force_bushy(true);
}
}
}
return ret;
}
int DfoInfo::add_child(DfoInfo *child)
{
@ -255,20 +291,17 @@ int ObPxResourceAnalyzer::do_split(
LOG_WARN("fail create qc for rescan op", K(ret));
}
} else {
if (OB_SUCC(ret)) {
if (log_op_def::LOG_JOIN_FILTER == root_op.get_type()) {
if (OB_NOT_NULL(parent_dfo)) {
if (OB_FAIL(ret)) {
} else if (log_op_def::LOG_JOIN_FILTER == root_op.get_type()) {
ObLogJoinFilter &log_join_filter = static_cast<ObLogJoinFilter &>(root_op);
if (log_join_filter.is_create_filter() && !parent_dfo->force_bushy()) {
parent_dfo->force_bushy_ = true;
if (log_join_filter.is_create_filter()
&& OB_FAIL(px_info.rf_dpd_info_.rf_create_ops_.push_back(&root_op))) {
LOG_WARN("failed to push_back log join filter create", K(ret));
}
}
}
}
if (log_op_def::LOG_EXCHANGE == root_op.get_type() &&
static_cast<const ObLogExchange&>(root_op).is_px_producer()) {
} else if (log_op_def::LOG_EXCHANGE == root_op.get_type()
&& static_cast<const ObLogExchange &>(root_op).is_px_producer()) {
DfoInfo *dfo = nullptr;
if (OB_FAIL(create_dfo(dfo, static_cast<const ObLogExchange&>(root_op).get_parallel()))) {
if (OB_FAIL(create_dfo(dfo, root_op))) {
LOG_WARN("fail create dfo", K(ret));
} else {
if (OB_FAIL(dfo->location_addr_.create(hash::cal_next_prime(10), "PxResourceBucket", "PxResourceNode"))) {
@ -324,9 +357,10 @@ int ObPxResourceAnalyzer::do_split(
return ret;
}
int ObPxResourceAnalyzer::create_dfo(DfoInfo *&dfo, int64_t dop)
int ObPxResourceAnalyzer::create_dfo(DfoInfo *&dfo, ObLogicalOperator &root_op)
{
int ret = OB_SUCCESS;
int64_t dop = static_cast<const ObLogExchange&>(root_op).get_parallel();
void *mem_ptr = dfo_allocator_.alloc(sizeof(DfoInfo));
if (OB_ISNULL(mem_ptr)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -335,6 +369,7 @@ int ObPxResourceAnalyzer::create_dfo(DfoInfo *&dfo, int64_t dop)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Null ptr unexpected", KP(mem_ptr), K(ret));
} else {
dfo->set_root_op(&root_op);
dfo->set_dop(dop);
}
return ret;
@ -428,7 +463,9 @@ int ObPxResourceAnalyzer::walk_through_px_trees(
int64_t group_count = 0;
thread_map.clear();
group_map.clear();
if (OB_FAIL(walk_through_dfo_tree(px_info, thread_count, group_count, thread_map, group_map))) {
if (OB_FAIL(px_info.rf_dpd_info_.describe_dependency(px_info.root_dfo_))) {
LOG_WARN("failed to describe dependency");
} else if (OB_FAIL(walk_through_dfo_tree(px_info, thread_count, group_count, thread_map, group_map))) {
LOG_WARN("fail calc px thread group count", K(i), "total", px_trees.count(), K(ret));
} else if (OB_ISNULL(px_info.root_op_)) {
ret = OB_ERR_UNEXPECTED;
@ -715,3 +752,79 @@ int ObPxResourceAnalyzer::update_max_thead_group_info(
}
return ret;
}
int LogLowestCommonAncestorFinder::find_op_common_ancestor(
const ObLogicalOperator *left, const ObLogicalOperator *right, const ObLogicalOperator *&ancestor)
{
int ret = OB_SUCCESS;
ObSEArray<const ObLogicalOperator *, 32> ancestors;
const ObLogicalOperator *parent = left;
while (OB_NOT_NULL(parent) && OB_SUCC(ret)) {
if (OB_FAIL(ancestors.push_back(parent))) {
LOG_WARN("failed to push back");
} else {
parent = parent->get_parent();
}
}
parent = right;
bool find = false;
while (OB_NOT_NULL(parent) && OB_SUCC(ret) && !find) {
for (int64_t i = 0; i < ancestors.count() && OB_SUCC(ret); ++i) {
if (parent == ancestors.at(i)) {
find = true;
ancestor = parent;
break;
}
}
parent = parent->get_parent();
}
return ret;
}
int LogLowestCommonAncestorFinder::get_op_dfo(const ObLogicalOperator *op, DfoInfo *root_dfo, DfoInfo *&op_dfo)
{
int ret = OB_SUCCESS;
const ObLogicalOperator *parent = op;
const ObLogicalOperator *dfo_root_op = nullptr;
while (OB_NOT_NULL(parent) && OB_SUCC(ret)) {
if (log_op_def::LOG_EXCHANGE == parent->get_type() &&
static_cast<const ObLogExchange&>(*parent).is_px_producer()) {
dfo_root_op = parent;
break;
} else {
parent = parent->get_parent();
}
}
DfoInfo *dfo = nullptr;
bool find = false;
ObSEArray<DfoInfo *, 16> dfo_queue;
int64_t cur_que_front = 0;
if (OB_FAIL(dfo_queue.push_back(root_dfo))) {
LOG_WARN("failed to push back");
}
while (cur_que_front < dfo_queue.count() && !find && OB_SUCC(ret)) {
int64_t cur_que_size = dfo_queue.count() - cur_que_front;
for (int64_t i = 0; i < cur_que_size && OB_SUCC(ret); ++i) {
dfo = dfo_queue.at(cur_que_front);
if (dfo->get_root_op() == dfo_root_op) {
op_dfo = dfo;
find = true;
break;
} else {
// push child into the queue
for (int64_t child_idx = 0; OB_SUCC(ret) && child_idx < dfo->get_child_count(); ++child_idx) {
if (OB_FAIL(dfo_queue.push_back(dfo->child_dfos_.at(child_idx)))) {
LOG_WARN("failed to push back child dfo");
}
}
}
if (OB_SUCC(ret)) {
cur_que_front++;
}
}
}
return ret;
}

View File

@ -43,7 +43,8 @@ struct DfoInfo {
status_(DfoStatus::INIT),
dop_(0),
location_addr_(),
force_bushy_(false)
force_bushy_(false),
root_op_(nullptr)
{}
DfoInfo *parent_;
DfoInfo *depend_sibling_;
@ -52,6 +53,7 @@ struct DfoInfo {
int64_t dop_;
ObHashSet<ObAddr> location_addr_;
bool force_bushy_;
ObLogicalOperator *root_op_;
void reset()
{
@ -61,7 +63,11 @@ struct DfoInfo {
child_dfos_.reset();
location_addr_.destroy();
}
bool force_bushy() { return force_bushy_; }
inline void set_root_op(ObLogicalOperator *root_op) { root_op_ = root_op;}
inline ObLogicalOperator *get_root_op() { return root_op_;}
inline void set_force_bushy(bool flag) { force_bushy_ = flag; }
inline bool force_bushy() { return force_bushy_; }
bool has_sibling() const { return nullptr != depend_sibling_; }
void set_depend_sibling(DfoInfo *sibling) { depend_sibling_ = sibling; }
inline bool has_child() const { return child_dfos_.count() > 0; }
@ -96,12 +102,30 @@ struct DfoInfo {
TO_STRING_KV(K_(status), K_(dop));
};
struct LogRuntimeFilterDependencyInfo
{
public:
LogRuntimeFilterDependencyInfo() : rf_create_ops_() {}
~LogRuntimeFilterDependencyInfo() = default;
void destroy()
{
rf_create_ops_.reset();
}
inline bool is_empty() const {
return rf_create_ops_.empty();
}
int describe_dependency(DfoInfo *root_dfo);
public:
ObTMArray<const ObLogicalOperator *> rf_create_ops_;
};
class ObLogExchange;
struct PxInfo {
PxInfo() : root_op_(nullptr), root_dfo_(nullptr), threads_(0),
acc_threads_(0) {}
acc_threads_(0), rf_dpd_info_() {}
PxInfo(ObLogExchange *root_op, DfoInfo *root_dfo)
: root_op_(root_op), root_dfo_(root_dfo), threads_(0), acc_threads_(0) {}
: root_op_(root_op), root_dfo_(root_dfo), threads_(0), acc_threads_(0), rf_dpd_info_()
{}
void reset_dfo()
{
if (OB_NOT_NULL(root_dfo_)) {
@ -113,6 +137,7 @@ struct PxInfo {
DfoInfo *root_dfo_;
int64_t threads_; // 记录当前 PX 需要的线程组数
int64_t acc_threads_; // 记录当前 PX 计划以及它下面的嵌套 PX 计划线程组数之和
LogRuntimeFilterDependencyInfo rf_dpd_info_;
TO_STRING_KV(K_(threads), K_(acc_threads));
};
@ -159,6 +184,7 @@ private:
ObHashMap<ObAddr, int64_t> &max_parallel_thread_map,
ObHashMap<ObAddr, int64_t> &max_parallel_group_map);
int create_dfo(DfoInfo *&dfo, int64_t dop);
int create_dfo(DfoInfo *&dfo, ObLogicalOperator &root_op);
int get_dfo_addr_set(const ObLogicalOperator &root_op, ObHashSet<ObAddr> &addr_set);
int px_tree_append(ObHashMap<ObAddr, int64_t> &max_parallel_count,
ObHashMap<ObAddr, int64_t> &parallel_count);
@ -280,6 +306,15 @@ int DfoTreeNormalizer<T>::normalize(T &root)
return ret;
}
class LogLowestCommonAncestorFinder
{
public:
// for optimizer
static int find_op_common_ancestor(
const ObLogicalOperator *left, const ObLogicalOperator *right, const ObLogicalOperator *&ancestor);
static int get_op_dfo(const ObLogicalOperator *op, DfoInfo *root_dfo, DfoInfo *&op_dfo);
};
}/* ns sql */
}/* ns oceanbase */