diff --git a/src/sql/engine/join/ob_join_filter_op.cpp b/src/sql/engine/join/ob_join_filter_op.cpp index 8e79bce62e..8a60b3addf 100644 --- a/src/sql/engine/join/ob_join_filter_op.cpp +++ b/src/sql/engine/join/ob_join_filter_op.cpp @@ -178,7 +178,7 @@ int ObJoinFilterOpInput::init_shared_msgs( px_sequence_id_, 0/*task_id*/, tenant_id, timeout_ts, register_dm_info_))) { LOG_WARN("fail to init msg", K(ret)); } else if (OB_FAIL(construct_msg_details(spec, sqc_proxy, config_, *msg_ptr, sqc_count))) { - LOG_WARN("fail to construct msg details", K(ret)); + LOG_WARN("fail to construct msg details", K(ret), K(tenant_id)); } else if (OB_FAIL(array_ptr->push_back(msg_ptr))) { LOG_WARN("fail to push back array ptr", K(ret)); } @@ -211,6 +211,7 @@ int ObJoinFilterOpInput::construct_msg_details( ObPxSQCProxy::SQCP2PDhMap &dh_map = sqc_proxy->get_p2p_dh_map(); if (OB_FAIL(bf_msg.bloom_filter_.init(spec.filter_len_, bf_msg.get_allocator(), + bf_msg.get_tenant_id(), config.bloom_filter_ratio_))) { LOG_WARN("failed to init bloom filter", K(ret)); } else if (!spec.is_shared_join_filter() || !spec.is_shuffle_) { diff --git a/src/sql/engine/px/ob_px_bloom_filter.cpp b/src/sql/engine/px/ob_px_bloom_filter.cpp index 044c8d9ab8..869a54b561 100644 --- a/src/sql/engine/px/ob_px_bloom_filter.cpp +++ b/src/sql/engine/px/ob_px_bloom_filter.cpp @@ -35,6 +35,21 @@ using namespace obrpc; #define LOG_HASH_COUNT 2 // = log2(FIXED_HASH_COUNT) #define WORD_SIZE 64 // WORD_SIZE * FIXED_HASH_COUNT = BF_BLOCK_SIZE +// before assign, please set allocator for channel_ids_ first +int BloomFilterIndex::assign(const BloomFilterIndex &other) +{ + int ret = OB_SUCCESS; + if (this != &other) { + channel_id_ = other.channel_id_; + begin_idx_ = other.begin_idx_; + end_idx_ = other.end_idx_; + if (OB_FAIL(channel_ids_.assign(other.channel_ids_))) { + LOG_WARN("failed to assign channel_ids_"); + } + } + return ret; +} + ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), bits_count_(0), fpp_(0.0), hash_func_count_(0), is_inited_(false), bits_array_length_(0), bits_array_(NULL), true_count_(0), begin_idx_(0), end_idx_(0), allocator_(), @@ -43,9 +58,10 @@ ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), bits_count_(0), fpp_(0.0), } -int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, double fpp /*= 0.01 */) +int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, int64_t tenant_id, double fpp /*= 0.01 */) { int ret = OB_SUCCESS; + set_allocator_attr(tenant_id); data_length = max(data_length, 1); if (fpp <= 0) { ret = OB_ERR_UNEXPECTED; @@ -78,9 +94,10 @@ int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, double f return ret; } -int ObPxBloomFilter::assign(const ObPxBloomFilter &filter) +int ObPxBloomFilter::assign(const ObPxBloomFilter &filter, int64_t tenant_id) { int ret = OB_SUCCESS; + set_allocator_attr(tenant_id); data_length_ = filter.data_length_; bits_count_ = filter.bits_count_; fpp_ = filter.fpp_; @@ -104,6 +121,12 @@ int ObPxBloomFilter::assign(const ObPxBloomFilter &filter) return ret; } +void ObPxBloomFilter::set_allocator_attr(int64_t tenant_id) +{ + ObMemAttr attr(tenant_id, "PxBfAlloc", ObCtxIds::DEFAULT_CTX_ID); + allocator_.set_attr(attr); +} + int ObPxBloomFilter::init(const ObPxBloomFilter *filter) { int ret = OB_SUCCESS; diff --git a/src/sql/engine/px/ob_px_bloom_filter.h b/src/sql/engine/px/ob_px_bloom_filter.h index 1d488954c4..6e8f3d71c1 100644 --- a/src/sql/engine/px/ob_px_bloom_filter.h +++ b/src/sql/engine/px/ob_px_bloom_filter.h @@ -41,6 +41,7 @@ struct BloomFilterReceiveCount struct BloomFilterIndex { + int assign(const BloomFilterIndex &other); BloomFilterIndex() : channel_id_(0), begin_idx_(0), end_idx_(0) {} BloomFilterIndex(int64_t channel_id, int64_t beigin_idx, int64_t end_idx) : channel_id_(channel_id), begin_idx_(beigin_idx), @@ -48,7 +49,7 @@ struct BloomFilterIndex int64_t channel_id_;// join filter send channel id int64_t begin_idx_; // join filter begin position in full bloom filter int64_t end_idx_; // join filter end position in full bloom filter - ObArray channel_ids_; + ObFixedArray channel_ids_; TO_STRING_KV(K_(begin_idx), K_(end_idx), K_(channel_id), K_(channel_ids)); }; @@ -58,7 +59,7 @@ OB_UNIS_VERSION_V(1); public: ObPxBloomFilter(); virtual ~ObPxBloomFilter() {}; - int init(int64_t data_length, common::ObIAllocator &allocator, double fpp = 0.01); + int init(int64_t data_length, common::ObIAllocator &allocator, int64_t tenant_id, double fpp = 0.01); int init(const ObPxBloomFilter *filter); void reset_filter(); inline int might_contain(uint64_t hash, bool &is_match) { @@ -90,8 +91,9 @@ public: typedef int (ObPxBloomFilter::*GetFunc)(uint64_t hash, bool &is_match); int generate_receive_count_array(int64_t piece_size); void reset(); - int assign(const ObPxBloomFilter &filter); + int assign(const ObPxBloomFilter &filter, int64_t tenant_id); int regenerate(); + void set_allocator_attr(int64_t tenant_id); TO_STRING_KV(K_(data_length), K_(bits_count), K_(fpp), K_(hash_func_count), K_(is_inited), K_(bits_array_length), K_(true_count)); private: diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp index a25595c57d..6c769ae0db 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp @@ -254,6 +254,9 @@ int ObRFBloomFilterMsg::generate_receive_count_array(int64_t piece_size, int64_t int64_t bits_array_length = ceil((double)bloom_filter_.get_bits_count() / 64); int64_t count = ceil(bits_array_length / (double)piece_size); int64_t begin_idx = 0; + if (OB_FAIL(receive_count_array_.init(count))) { + LOG_WARN("fail to init receive_count_array_", K(ret)); + } for (int i = 0; OB_SUCC(ret) && i < count; ++i) { begin_idx = i * piece_size; if (begin_idx >= bits_array_length) { @@ -392,9 +395,23 @@ int ObRFBloomFilterMsg::assign(const ObP2PDatahubMsgBase &msg) LOG_WARN("failed to assign base data", K(ret)); } else if (OB_FAIL(next_peer_addrs_.assign(other_msg.next_peer_addrs_))) { LOG_WARN("fail to assign bf msg", K(ret)); - } else if (OB_FAIL(bloom_filter_.assign(other_msg.bloom_filter_))) { + } else if (OB_FAIL(bloom_filter_.assign(other_msg.bloom_filter_, msg.get_tenant_id()))) { LOG_WARN("fail to assign bf msg", K(ret)); - } else if (OB_FAIL(filter_indexes_.assign(other_msg.filter_indexes_))) { + } else if (OB_FAIL(filter_indexes_.prepare_allocate(other_msg.filter_indexes_.count()))) { + LOG_WARN("failed to prepare_allocate filter indexes", K(ret)); + } else { + // The reason we don't use filter_indexes_.assign(other_msg.filter_indexes_) here is that: + // channel_ids_ is an ObFixedArray in BloomFilterIndex, we need to set allocator before assign channel_ids_ + for (int64_t i = 0; i < other_msg.filter_indexes_.count() && OB_SUCC(ret); ++i) { + filter_indexes_.at(i).channel_ids_.set_allocator(&allocator_); + const BloomFilterIndex &other_filter_index = other_msg.filter_indexes_.at(i); + if (OB_FAIL(filter_indexes_.at(i).assign(other_filter_index))) { + LOG_WARN("fail to assign BloomFilterIndex", K(ret)); + } + } + } + + if (OB_FAIL(filter_indexes_.assign(other_msg.filter_indexes_))) { LOG_WARN("failed to assign filter indexes", K(ret)); } return ret; @@ -722,6 +739,9 @@ int ObRFBloomFilterMsg::broadcast(ObIArray &target_addrs, auto addr_filter_idx = filter_indexes_.at(cur_idx); msg.bloom_filter_.set_begin_idx(addr_filter_idx.begin_idx_); msg.bloom_filter_.set_end_idx(addr_filter_idx.end_idx_); + if (OB_FAIL(msg.next_peer_addrs_.init(addr_filter_idx.channel_ids_.count()))) { + LOG_WARN("fail to init next_peer_addrs_", K(ret)); + } for (int i = 0; OB_SUCC(ret) && i < addr_filter_idx.channel_ids_.count(); ++i) { if (OB_FAIL(msg.next_peer_addrs_.push_back( target_addrs.at(addr_filter_idx.channel_ids_.at(i))))) { @@ -757,6 +777,11 @@ int ObRFBloomFilterMsg::generate_filter_indexes( int64_t group_addr_cnt = each_group_size > addr_cnt ? addr_cnt : each_group_size; BloomFilterIndex filter_index; + ObSEArray tmp_filter_indexes; + lib::ObMemAttr attr(tenant_id_, "TmpBFIdxAlloc"); + common::ObArenaAllocator tmp_allocator(attr); + filter_index.channel_ids_.set_allocator(&tmp_allocator); + BloomFilterIndex *filter_index_ptr = nullptr; for (int i = 0; OB_SUCC(ret) && i < count; ++i) { start_idx = i * piece_size; end_idx = (i + 1) * piece_size; @@ -783,10 +808,33 @@ int ObRFBloomFilterMsg::generate_filter_indexes( } else { filter_index.channel_id_ = (i % group_addr_cnt) + pos; } + if (OB_FAIL(filter_index.channel_ids_.init(min(addr_cnt, pos + group_addr_cnt) - pos + 1))) { + LOG_WARN("failed to init channel_ids_"); + } for (int k = pos; OB_SUCC(ret) && k < addr_cnt && k < pos + group_addr_cnt; ++k) { OZ(filter_index.channel_ids_.push_back(k)); } - OZ(filter_indexes_.push_back(filter_index)); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(filter_index_ptr = OB_NEWx(BloomFilterIndex, &tmp_allocator))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc BloomFilterIndex"); + } else if (FALSE_IT(filter_index_ptr->channel_ids_.set_allocator(&tmp_allocator))) { + } else if (OB_FAIL(filter_index_ptr->assign(filter_index))) { + LOG_WARN("failed to assign"); + } else if (OB_FAIL(tmp_filter_indexes.push_back(filter_index_ptr))) { + LOG_WARN("failed to push_back"); + } + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(filter_indexes_.prepare_allocate(tmp_filter_indexes.count()))) { + LOG_WARN("failed to prepare_allocate filter_indexes_"); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < tmp_filter_indexes.count(); ++i) { + filter_indexes_.at(i).channel_ids_.set_allocator(&allocator_); + if (OB_FAIL(filter_indexes_.at(i).assign(*tmp_filter_indexes.at(i)))) { + LOG_WARN("failed to assign filter_indexes", K(i)); + } } } return ret; diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h index 1aee60d40d..69d4e6d1ef 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h @@ -37,8 +37,8 @@ public: SECOND_LEVEL }; ObRFBloomFilterMsg() : phase_(), bloom_filter_(), - next_peer_addrs_(), expect_first_phase_count_(0), - piece_size_(0), filter_indexes_(), receive_count_array_(), + next_peer_addrs_(allocator_), expect_first_phase_count_(0), + piece_size_(0), filter_indexes_(allocator_), receive_count_array_(allocator_), filter_idx_(nullptr), create_finish_(nullptr), need_send_msg_(true), is_finish_regen_(false) {} ~ObRFBloomFilterMsg() { destroy(); } virtual int assign(const ObP2PDatahubMsgBase &) final; @@ -91,11 +91,11 @@ int generate_receive_count_array(int64_t piece_size, int64_t cur_begin_idx); public: ObSendBFPhase phase_; ObPxBloomFilter bloom_filter_; - common::ObSArray next_peer_addrs_; + common::ObFixedArray next_peer_addrs_; int64_t expect_first_phase_count_; int64_t piece_size_; - common::ObArray filter_indexes_; - common::ObArray receive_count_array_; + common::ObFixedArray filter_indexes_; + common::ObFixedArray receive_count_array_; int64_t *filter_idx_; //for shared msg bool *create_finish_; //for shared msg bool need_send_msg_; //for shared msg, when drain_exch, msg is not need to be sent