diff --git a/src/sql/engine/join/ob_join_filter_op.cpp b/src/sql/engine/join/ob_join_filter_op.cpp index b12d20a0b8..48cd43b5bb 100644 --- a/src/sql/engine/join/ob_join_filter_op.cpp +++ b/src/sql/engine/join/ob_join_filter_op.cpp @@ -225,7 +225,8 @@ int ObJoinFilterOpInput::construct_msg_details( if (OB_FAIL(bf_msg.bloom_filter_.init(spec.filter_len_, bf_msg.get_allocator(), bf_msg.get_tenant_id(), - config.bloom_filter_ratio_))) { + config.bloom_filter_ratio_, + config.runtime_bloom_filter_max_size_))) { LOG_WARN("failed to init bloom filter", K(ret)); } else if (!spec.is_shared_join_filter() || !spec.is_shuffle_) { bf_msg.set_msg_expect_cnt(1); diff --git a/src/sql/engine/px/ob_px_bloom_filter.cpp b/src/sql/engine/px/ob_px_bloom_filter.cpp index 869a54b561..e097486928 100644 --- a/src/sql/engine/px/ob_px_bloom_filter.cpp +++ b/src/sql/engine/px/ob_px_bloom_filter.cpp @@ -50,7 +50,7 @@ int BloomFilterIndex::assign(const BloomFilterIndex &other) return ret; } -ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), bits_count_(0), fpp_(0.0), +ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), max_bit_count_(0), bits_count_(0), fpp_(0.0), hash_func_count_(0), is_inited_(false), bits_array_length_(0), bits_array_(NULL), true_count_(0), begin_idx_(0), end_idx_(0), allocator_(), px_bf_recieve_count_(0), px_bf_recieve_size_(0), px_bf_merge_filter_count_(0) @@ -58,7 +58,8 @@ ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), bits_count_(0), fpp_(0.0), } -int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, int64_t tenant_id, double fpp /*= 0.01 */) +int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, int64_t tenant_id, + double fpp /*= 0.01 */, int64_t max_filter_size /* =2147483648 */) { int ret = OB_SUCCESS; set_allocator_attr(tenant_id); @@ -69,6 +70,7 @@ int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, int64_t } else { data_length_ = data_length; fpp_ = fpp; + align_max_bit_count(max_filter_size); (void)calc_num_of_bits(); (void)calc_num_of_hash_func(); bits_array_length_ = ceil((double)bits_count_ / 64); @@ -99,6 +101,7 @@ int ObPxBloomFilter::assign(const ObPxBloomFilter &filter, int64_t tenant_id) int ret = OB_SUCCESS; set_allocator_attr(tenant_id); data_length_ = filter.data_length_; + max_bit_count_ = filter.max_bit_count_; bits_count_ = filter.bits_count_; fpp_ = filter.fpp_; hash_func_count_ = filter.hash_func_count_; @@ -135,6 +138,7 @@ int ObPxBloomFilter::init(const ObPxBloomFilter *filter) LOG_WARN("the filter is null", K(ret)); } else { data_length_ = filter->data_length_; + max_bit_count_ = filter->max_bit_count_; bits_count_ = filter->bits_count_; fpp_ = filter->fpp_; hash_func_count_ = filter->hash_func_count_; @@ -170,10 +174,20 @@ void ObPxBloomFilter::calc_num_of_bits() n |= n >> 32; // min size is block size = 256. - bits_count_ = ((n < MIN_FILTER_SIZE) ? MIN_FILTER_SIZE : (n >= MAX_BIT_COUNT) ? MAX_BIT_COUNT : n + 1); + bits_count_ = ((n < MIN_FILTER_SIZE) ? MIN_FILTER_SIZE : (n >= max_bit_count_) ? max_bit_count_ : n + 1); LOG_TRACE("calc num of bits", K(data_length_), K(fpp_), K(old_n), K(ori_n), K(bits_count_)); } +void ObPxBloomFilter::align_max_bit_count(int64_t max_filter_size) +{ + int64_t max_bit_count = max_filter_size * CHAR_BIT; + if (MAX_BIT_COUNT == max_bit_count) { + max_bit_count_ = max_bit_count; + } else { + max_bit_count_ = next_pow2(max_bit_count); + } +} + // previous versino: hash_func_nums = bits_num / data_length * log(2) // hash_func_count_ = BF_BLOCK_SIZE / REG_SIZE = 256 / 64 = 4 void ObPxBloomFilter::calc_num_of_hash_func() @@ -391,7 +405,8 @@ OB_DEF_SERIALIZE(ObPxBloomFilter) bits_array_length_, true_count_, begin_idx_, - end_idx_); + end_idx_, + max_bit_count_); for (int i = begin_idx_; OB_SUCC(ret) && i <= end_idx_; ++i) { if (OB_FAIL(serialization::encode(buf, buf_len, pos, bits_array_[i]))) { LOG_WARN("fail to encode bits data", K(ret), K(bits_array_[i])); @@ -412,7 +427,8 @@ OB_DEF_DESERIALIZE(ObPxBloomFilter) bits_array_length_, true_count_, begin_idx_, - end_idx_); + end_idx_, + max_bit_count_); int64_t real_len = end_idx_ - begin_idx_ + 1; bits_array_length_ = real_len; void *bits_array_buf = NULL; @@ -450,7 +466,8 @@ OB_DEF_SERIALIZE_SIZE(ObPxBloomFilter) bits_array_length_, true_count_, begin_idx_, - end_idx_); + end_idx_, + max_bit_count_); for (int i = begin_idx_; i <= end_idx_; ++i) { len += serialization::encoded_length(bits_array_[i]); } diff --git a/src/sql/engine/px/ob_px_bloom_filter.h b/src/sql/engine/px/ob_px_bloom_filter.h index 6e8f3d71c1..f1a4bbfc80 100644 --- a/src/sql/engine/px/ob_px_bloom_filter.h +++ b/src/sql/engine/px/ob_px_bloom_filter.h @@ -59,7 +59,8 @@ OB_UNIS_VERSION_V(1); public: ObPxBloomFilter(); virtual ~ObPxBloomFilter() {}; - int init(int64_t data_length, common::ObIAllocator &allocator, int64_t tenant_id, double fpp = 0.01); + int init(int64_t data_length, common::ObIAllocator &allocator, int64_t tenant_id, + double fpp = 0.01, int64_t max_filter_size = 2147483648 /*2G*/); int init(const ObPxBloomFilter *filter); void reset_filter(); inline int might_contain(uint64_t hash, bool &is_match) { @@ -101,11 +102,13 @@ private: bool set(uint64_t block_begin, uint64_t index); void calc_num_of_hash_func(); void calc_num_of_bits(); + void align_max_bit_count(int64_t max_filter_size); int might_contain_nonsimd(uint64_t hash, bool &is_match); int might_contain_simd(uint64_t hash, bool &is_match); private: int64_t data_length_; //原始数据长度 + int64_t max_bit_count_; // max filter size, default 2GB, so the max bit count = 17179869184; int64_t bits_count_; //filter的位个数 double fpp_; //误判率 int64_t hash_func_count_; //哈希函数个数