fix bug: config runtime_bloom_filter_max_size is not effective
This commit is contained in:
@ -225,7 +225,8 @@ int ObJoinFilterOpInput::construct_msg_details(
|
||||
if (OB_FAIL(bf_msg.bloom_filter_.init(spec.filter_len_,
|
||||
bf_msg.get_allocator(),
|
||||
bf_msg.get_tenant_id(),
|
||||
config.bloom_filter_ratio_))) {
|
||||
config.bloom_filter_ratio_,
|
||||
config.runtime_bloom_filter_max_size_))) {
|
||||
LOG_WARN("failed to init bloom filter", K(ret));
|
||||
} else if (!spec.is_shared_join_filter() || !spec.is_shuffle_) {
|
||||
bf_msg.set_msg_expect_cnt(1);
|
||||
|
||||
@ -50,7 +50,7 @@ int BloomFilterIndex::assign(const BloomFilterIndex &other)
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), bits_count_(0), fpp_(0.0),
|
||||
ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), max_bit_count_(0), bits_count_(0), fpp_(0.0),
|
||||
hash_func_count_(0), is_inited_(false), bits_array_length_(0),
|
||||
bits_array_(NULL), true_count_(0), begin_idx_(0), end_idx_(0), allocator_(),
|
||||
px_bf_recieve_count_(0), px_bf_recieve_size_(0), px_bf_merge_filter_count_(0)
|
||||
@ -58,7 +58,8 @@ ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), bits_count_(0), fpp_(0.0),
|
||||
|
||||
}
|
||||
|
||||
int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, int64_t tenant_id, double fpp /*= 0.01 */)
|
||||
int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, int64_t tenant_id,
|
||||
double fpp /*= 0.01 */, int64_t max_filter_size /* =2147483648 */)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
set_allocator_attr(tenant_id);
|
||||
@ -69,6 +70,7 @@ int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, int64_t
|
||||
} else {
|
||||
data_length_ = data_length;
|
||||
fpp_ = fpp;
|
||||
align_max_bit_count(max_filter_size);
|
||||
(void)calc_num_of_bits();
|
||||
(void)calc_num_of_hash_func();
|
||||
bits_array_length_ = ceil((double)bits_count_ / 64);
|
||||
@ -99,6 +101,7 @@ int ObPxBloomFilter::assign(const ObPxBloomFilter &filter, int64_t tenant_id)
|
||||
int ret = OB_SUCCESS;
|
||||
set_allocator_attr(tenant_id);
|
||||
data_length_ = filter.data_length_;
|
||||
max_bit_count_ = filter.max_bit_count_;
|
||||
bits_count_ = filter.bits_count_;
|
||||
fpp_ = filter.fpp_;
|
||||
hash_func_count_ = filter.hash_func_count_;
|
||||
@ -135,6 +138,7 @@ int ObPxBloomFilter::init(const ObPxBloomFilter *filter)
|
||||
LOG_WARN("the filter is null", K(ret));
|
||||
} else {
|
||||
data_length_ = filter->data_length_;
|
||||
max_bit_count_ = filter->max_bit_count_;
|
||||
bits_count_ = filter->bits_count_;
|
||||
fpp_ = filter->fpp_;
|
||||
hash_func_count_ = filter->hash_func_count_;
|
||||
@ -170,10 +174,20 @@ void ObPxBloomFilter::calc_num_of_bits()
|
||||
n |= n >> 32;
|
||||
|
||||
// min size is block size = 256.
|
||||
bits_count_ = ((n < MIN_FILTER_SIZE) ? MIN_FILTER_SIZE : (n >= MAX_BIT_COUNT) ? MAX_BIT_COUNT : n + 1);
|
||||
bits_count_ = ((n < MIN_FILTER_SIZE) ? MIN_FILTER_SIZE : (n >= max_bit_count_) ? max_bit_count_ : n + 1);
|
||||
LOG_TRACE("calc num of bits", K(data_length_), K(fpp_), K(old_n), K(ori_n), K(bits_count_));
|
||||
}
|
||||
|
||||
void ObPxBloomFilter::align_max_bit_count(int64_t max_filter_size)
|
||||
{
|
||||
int64_t max_bit_count = max_filter_size * CHAR_BIT;
|
||||
if (MAX_BIT_COUNT == max_bit_count) {
|
||||
max_bit_count_ = max_bit_count;
|
||||
} else {
|
||||
max_bit_count_ = next_pow2(max_bit_count);
|
||||
}
|
||||
}
|
||||
|
||||
// previous versino: hash_func_nums = bits_num / data_length * log(2)
|
||||
// hash_func_count_ = BF_BLOCK_SIZE / REG_SIZE = 256 / 64 = 4
|
||||
void ObPxBloomFilter::calc_num_of_hash_func()
|
||||
@ -391,7 +405,8 @@ OB_DEF_SERIALIZE(ObPxBloomFilter)
|
||||
bits_array_length_,
|
||||
true_count_,
|
||||
begin_idx_,
|
||||
end_idx_);
|
||||
end_idx_,
|
||||
max_bit_count_);
|
||||
for (int i = begin_idx_; OB_SUCC(ret) && i <= end_idx_; ++i) {
|
||||
if (OB_FAIL(serialization::encode(buf, buf_len, pos, bits_array_[i]))) {
|
||||
LOG_WARN("fail to encode bits data", K(ret), K(bits_array_[i]));
|
||||
@ -412,7 +427,8 @@ OB_DEF_DESERIALIZE(ObPxBloomFilter)
|
||||
bits_array_length_,
|
||||
true_count_,
|
||||
begin_idx_,
|
||||
end_idx_);
|
||||
end_idx_,
|
||||
max_bit_count_);
|
||||
int64_t real_len = end_idx_ - begin_idx_ + 1;
|
||||
bits_array_length_ = real_len;
|
||||
void *bits_array_buf = NULL;
|
||||
@ -450,7 +466,8 @@ OB_DEF_SERIALIZE_SIZE(ObPxBloomFilter)
|
||||
bits_array_length_,
|
||||
true_count_,
|
||||
begin_idx_,
|
||||
end_idx_);
|
||||
end_idx_,
|
||||
max_bit_count_);
|
||||
for (int i = begin_idx_; i <= end_idx_; ++i) {
|
||||
len += serialization::encoded_length(bits_array_[i]);
|
||||
}
|
||||
|
||||
@ -59,7 +59,8 @@ OB_UNIS_VERSION_V(1);
|
||||
public:
|
||||
ObPxBloomFilter();
|
||||
virtual ~ObPxBloomFilter() {};
|
||||
int init(int64_t data_length, common::ObIAllocator &allocator, int64_t tenant_id, double fpp = 0.01);
|
||||
int init(int64_t data_length, common::ObIAllocator &allocator, int64_t tenant_id,
|
||||
double fpp = 0.01, int64_t max_filter_size = 2147483648 /*2G*/);
|
||||
int init(const ObPxBloomFilter *filter);
|
||||
void reset_filter();
|
||||
inline int might_contain(uint64_t hash, bool &is_match) {
|
||||
@ -101,11 +102,13 @@ private:
|
||||
bool set(uint64_t block_begin, uint64_t index);
|
||||
void calc_num_of_hash_func();
|
||||
void calc_num_of_bits();
|
||||
void align_max_bit_count(int64_t max_filter_size);
|
||||
int might_contain_nonsimd(uint64_t hash, bool &is_match);
|
||||
int might_contain_simd(uint64_t hash, bool &is_match);
|
||||
|
||||
private:
|
||||
int64_t data_length_; //原始数据长度
|
||||
int64_t max_bit_count_; // max filter size, default 2GB, so the max bit count = 17179869184;
|
||||
int64_t bits_count_; //filter的位个数
|
||||
double fpp_; //误判率
|
||||
int64_t hash_func_count_; //哈希函数个数
|
||||
|
||||
Reference in New Issue
Block a user