Fix join filter use execution id, use px sequnce id instead

This commit is contained in:
qianchanger
2023-02-24 16:17:53 +00:00
committed by ob-robot
parent 6a7db74811
commit 0d9457b9d4
12 changed files with 36 additions and 25 deletions

View File

@ -173,7 +173,7 @@ int ObDtlSendMessageP::process_px_bloom_filter_data(ObDtlLinkedBuffer *&buffer)
LOG_WARN("fail to decode bloom filter data", K(ret));
} else {
ObPXBloomFilterHashWrapper bf_key(bf_data.tenant_id_, bf_data.filter_id_,
bf_data.server_id_, bf_data.execution_id_, 0/*task_id*/);
bf_data.server_id_, bf_data.px_sequence_id_, 0/*task_id*/);
if (OB_FAIL(ObPxBloomFilterManager::instance().get_px_bf_for_merge_filter(
bf_key, filter))) {
LOG_WARN("fail to get px bloom filter", K(ret));

View File

@ -243,7 +243,7 @@ int ObJoinFilterOp::inner_open()
bf_key_.init(ctx_.get_my_session()->get_effective_tenant_id(),
MY_SPEC.filter_id_,
MY_SPEC.server_id_,
ctx_.get_my_session()->get_current_execution_id() ,
op_input->get_px_sequence_id(),
op_input->task_id_);
if (MY_SPEC.is_use_mode()) {
//在ctx中创建expr ctx, 并初始化bloom filter key
@ -554,7 +554,7 @@ int ObJoinFilterOp::mark_rpc_filter()
filter_data->tenant_id_ = bf_key_.tenant_id_;
filter_data->server_id_ = MY_SPEC.server_id_;
filter_data->filter_id_ = MY_SPEC.filter_id_;
filter_data->execution_id_ = bf_key_.execution_id_;
filter_data->px_sequence_id_ = bf_key_.px_sequence_id_;
filter_data->bloom_filter_count_ = 0;
bf_ctx.filter_data_ = filter_data;
bf_ctx.filter_ready_ = true;

View File

@ -51,6 +51,7 @@ public:
share_info_(),
is_local_create_(true),
task_id_(0),
px_sequence_id_(OB_INVALID_ID),
bf_idx_at_sqc_proxy_(-1)
{}
virtual ~ObJoinFilterOpInput() {}
@ -82,10 +83,13 @@ public:
inline void set_bf_idx_at_sqc_proxy(int64_t idx) { bf_idx_at_sqc_proxy_ = idx; }
inline int64_t get_bf_idx_at_sqc_proxy() { return bf_idx_at_sqc_proxy_; }
void set_px_sequence_id(int64_t id) { px_sequence_id_ = id; }
int64_t get_px_sequence_id() { return px_sequence_id_; }
public:
ObJoinFilterShareInfo share_info_; //bloom filter共享内存
bool is_local_create_; //用于标记create算子是否是local的.
int64_t task_id_; //在pwj join场景中会用到此task_id作为bf_key
int64_t px_sequence_id_;
int64_t bf_idx_at_sqc_proxy_;
DISALLOW_COPY_AND_ASSIGN(ObJoinFilterOpInput);
};

View File

@ -739,7 +739,7 @@ int ObPxReceiveOp::try_send_bloom_filter()
args.bf_key_.init(bf_send_ctx.get_filter_data()->tenant_id_,
bf_send_ctx.get_filter_data()->filter_id_,
bf_send_ctx.get_filter_data()->server_id_,
bf_send_ctx.get_filter_data()->execution_id_);
bf_send_ctx.get_filter_data()->px_sequence_id_);
args.expect_bloom_filter_count_ = bf_send_ctx.get_filter_data()->bloom_filter_count_;
args.current_bloom_filter_count_ = 1;
args.phase_ = ObSendBFPhase::FIRST_LEVEL;

View File

@ -34,6 +34,7 @@ ObGIOpInput::ObGIOpInput(ObExecContext &ctx, const ObOpSpec &spec)
parallelism_(-1),
worker_id_(common::OB_INVALID_INDEX),
pump_(nullptr),
px_sequence_id_(OB_INVALID_ID),
deserialize_allocator_(nullptr)
{}
@ -409,10 +410,11 @@ int ObGranuleIteratorOp::inner_open()
ret = OB_NOT_INIT;
LOG_WARN("child_op is null", K(ret));
} else if (MY_SPEC.bf_info_.is_inited_) {
ObGIOpInput *input = static_cast<ObGIOpInput*>(input_);
bf_key_.init(MY_SPEC.bf_info_.tenant_id_,
MY_SPEC.bf_info_.filter_id_,
MY_SPEC.bf_info_.server_id_,
ctx_.get_my_session()->get_current_execution_id(),
input->get_px_sequence_id(),
MY_SPEC.bf_info_.is_shared_? 0 : worker_id_);
} else if (OB_FAIL(prepare_table_scan())) {
LOG_WARN("prepare table scan failed", K(ret));

View File

@ -56,6 +56,8 @@ public:
void set_parallelism(int64_t parallelism) { parallelism_ = parallelism; }
void set_worker_id(int64_t worker_id) { worker_id_ = worker_id; }
int64_t get_worker_id() { return worker_id_; }
int64_t get_px_sequence_id() { return px_sequence_id_; }
void set_px_sequence_id(int64_t id) { px_sequence_id_ = id; }
int add_table_location_keys(common::ObIArray<const ObTableScanSpec*> &tscs);
private:
int deep_copy_range(ObIAllocator *allocator, const ObNewRange &src, ObNewRange &dst);
@ -69,6 +71,7 @@ public:
ObGranulePump *pump_;
//for partition pruning
common::ObSEArray<uint64_t, 2> table_location_keys_;
int64_t px_sequence_id_;
private:
common::ObIAllocator *deserialize_allocator_;
};

View File

@ -516,7 +516,7 @@ int ObPxBloomFilterManager::get_px_bf_for_merge_filter(ObPXBloomFilterHashWrappe
OB_SERIALIZE_MEMBER(ObPxBFStaticInfo, is_inited_, tenant_id_, filter_id_,
server_id_, is_shared_, skip_subpart_);
OB_SERIALIZE_MEMBER(ObPXBloomFilterHashWrapper, tenant_id_, filter_id_,
server_id_, execution_id_, task_id_)
server_id_, px_sequence_id_, task_id_)
OB_SERIALIZE_MEMBER(ObPxBFSendBloomFilterArgs, bf_key_, bloom_filter_,
next_peer_addrs_, expect_bloom_filter_count_,
current_bloom_filter_count_, expect_phase_count_,

View File

@ -149,19 +149,19 @@ class ObPXBloomFilterHashWrapper
public:
ObPXBloomFilterHashWrapper() : tenant_id_(common::OB_INVALID_TENANT_ID),
filter_id_(common::OB_INVALID_ID), server_id_(common::OB_INVALID_ID),
execution_id_(common::OB_INVALID_ID), task_id_(common::OB_INVALID_ID) {}
px_sequence_id_(common::OB_INVALID_ID), task_id_(common::OB_INVALID_ID) {}
explicit ObPXBloomFilterHashWrapper(int64_t tenant_id, int64_t filter_id,
int64_t server_id, int64_t execution_id, int64_t task_id) :
int64_t server_id, int64_t px_sequence_id, int64_t task_id) :
tenant_id_(tenant_id), filter_id_(filter_id),
server_id_(server_id), execution_id_(execution_id), task_id_(task_id) {}
server_id_(server_id), px_sequence_id_(px_sequence_id), task_id_(task_id) {}
~ObPXBloomFilterHashWrapper(){}
void init(int64_t tenant_id, int64_t filter_id,
int64_t server_id, int64_t execution_id, int64_t task_id = 0)
int64_t server_id, int64_t px_sequence_id, int64_t task_id = 0)
{
tenant_id_ = tenant_id;
filter_id_ = filter_id;
server_id_ = server_id;
execution_id_ = execution_id;
px_sequence_id_ = px_sequence_id;
task_id_ = task_id;
}
inline bool operator==(const ObPXBloomFilterHashWrapper &other) const
@ -169,16 +169,16 @@ public:
return (tenant_id_ == other.tenant_id_ &&
filter_id_ == other.filter_id_ &&
server_id_ == other.server_id_ &&
execution_id_ == other.execution_id_ &&
px_sequence_id_ == other.px_sequence_id_ &&
task_id_ == other.task_id_);
}
inline uint64_t hash() const;
int64_t tenant_id_;
int64_t filter_id_;
int64_t server_id_;
int64_t execution_id_;
int64_t px_sequence_id_;
int64_t task_id_;
TO_STRING_KV(K_(tenant_id), K_(filter_id), K_(server_id), K_(execution_id), K_(task_id))
TO_STRING_KV(K_(tenant_id), K_(filter_id), K_(server_id), K_(px_sequence_id), K_(task_id))
};
@ -189,7 +189,7 @@ inline uint64_t ObPXBloomFilterHashWrapper::hash() const
hash_ret = common::murmurhash(&tenant_id_, sizeof(uint64_t), 0);
hash_ret = common::murmurhash(&filter_id_, sizeof(uint64_t), hash_ret);
hash_ret = common::murmurhash(&server_id_, sizeof(uint64_t), hash_ret);
hash_ret = common::murmurhash(&execution_id_, sizeof(uint64_t), hash_ret);
hash_ret = common::murmurhash(&px_sequence_id_, sizeof(uint64_t), hash_ret);
hash_ret = common::murmurhash(&task_id_, sizeof(uint64_t), hash_ret);
return hash_ret;
}

View File

@ -35,7 +35,7 @@ OB_SERIALIZE_MEMBER((ObPxBloomFilterChInfo, dtl::ObDtlChTotalInfo), filter_id_);
OB_SERIALIZE_MEMBER((ObPxBloomFilterChSet, dtl::ObDtlChSet), filter_id_, sqc_id_);
OB_SERIALIZE_MEMBER(ObPxCreateBloomFilterChannelMsg, sqc_count_, sqc_id_, ch_set_info_);
OB_SERIALIZE_MEMBER(ObPxBloomFilterData, filter_, tenant_id_, filter_id_,
server_id_, execution_id_, bloom_filter_count_);
server_id_, px_sequence_id_, bloom_filter_count_);
OB_SERIALIZE_MEMBER(ObPxDmlRowInfo, row_match_count_, row_duplicated_count_, row_deleted_count_);
OB_SERIALIZE_MEMBER(ObPxTabletRange, tablet_id_, range_cut_, range_weights_);

View File

@ -467,7 +467,7 @@ class ObPxBloomFilterData: public dtl::ObDtlMsgTemp<dtl::ObDtlMsgType::PX_BLOOM_
public:
ObPxBloomFilterData() : filter_(), tenant_id_(common::OB_INVALID_TENANT_ID),
filter_id_(common::OB_INVALID_ID), server_id_(common::OB_INVALID_ID),
execution_id_(common::OB_INVALID_ID), bloom_filter_count_(0) {}
px_sequence_id_(common::OB_INVALID_ID), bloom_filter_count_(0) {}
virtual ~ObPxBloomFilterData() = default;
void reset()
{
@ -475,16 +475,16 @@ public:
tenant_id_ = OB_INVALID_TENANT_ID;
filter_id_ = OB_INVALID_ID;
server_id_ = OB_INVALID_ID;
execution_id_ = OB_INVALID_ID;
px_sequence_id_ = OB_INVALID_ID;
bloom_filter_count_ = 0;
}
TO_STRING_KV(K_(filter), K_(server_id), K_(execution_id));
TO_STRING_KV(K_(filter), K_(server_id), K_(px_sequence_id));
public:
ObPxBloomFilter filter_;
int64_t tenant_id_;
int64_t filter_id_;
int64_t server_id_;
int64_t execution_id_;
int64_t px_sequence_id_;
int64_t bloom_filter_count_;
};

View File

@ -268,7 +268,7 @@ int ObPxSubCoord::pre_setup_op_input(ObExecContext &ctx,
ObPxBloomFilter *filter_use = NULL;
ObPXBloomFilterHashWrapper bf_key;
bf_key.init(ctx.get_my_session()->get_effective_tenant_id(), filter_op.get_filter_id(),
filter_op.get_server_id(), ctx.get_my_session()->get_current_execution_id());
filter_op.get_server_id(), sqc_arg_.sqc_.get_px_sequence_id());
if (OB_FAIL(ObPxBloomFilterManager::init_px_bloom_filter(filter_op.get_filter_length(),
ctx.get_allocator(),
filter_use))) {

View File

@ -550,6 +550,7 @@ int ObPxTaskProcess::OpPreparation::apply(ObExecContext &ctx,
LOG_WARN("the partition-wise join's subplan contain a gi operator", K(*gi), K(ret));
} else {
input->set_worker_id(task_id_);
input->set_px_sequence_id(task_->px_int_id_.px_interrupt_id_.first_);
if (ObGranuleUtil::pwj_gi(gi->gi_attri_flag_)) {
pw_gi_spec_ = gi;
on_set_tscs_ = true;
@ -585,15 +586,16 @@ int ObPxTaskProcess::OpPreparation::apply(ObExecContext &ctx,
}
} else if (IS_PX_BLOOM_FILTER(op.get_type())) {
ObJoinFilterSpec *filter_spec = reinterpret_cast<ObJoinFilterSpec *>(&op);
if (filter_spec->is_shared_join_filter()) {
/*do nothing*/
} else if (OB_ISNULL(kit->input_)) {
if (OB_ISNULL(kit->input_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("operator is NULL", K(ret), K(op.id_), KP(kit));
} else {
ObJoinFilterOpInput *input = static_cast<ObJoinFilterOpInput *>(kit->input_);
input->set_px_sequence_id(task_->px_int_id_.px_interrupt_id_.first_);
if (!filter_spec->is_shared_join_filter()) {
input->set_task_id(task_id_);
}
}
} else if (PHY_TEMP_TABLE_INSERT == op.type_) {
ObOperatorKit *kit = ctx.get_operator_kit(op.id_);
if (OB_ISNULL(kit) || OB_ISNULL(kit->op_) || OB_ISNULL(kit->input_)) {