split ObRFBloomFilterMsg array memory to tenant

This commit is contained in:
obdev
2023-07-25 14:13:09 +00:00
committed by ob-robot
parent 8ba5949ac5
commit 4b0cf2f6f3
5 changed files with 88 additions and 14 deletions

View File

@ -178,7 +178,7 @@ int ObJoinFilterOpInput::init_shared_msgs(
px_sequence_id_, 0/*task_id*/, tenant_id, timeout_ts, register_dm_info_))) {
LOG_WARN("fail to init msg", K(ret));
} else if (OB_FAIL(construct_msg_details(spec, sqc_proxy, config_, *msg_ptr, sqc_count))) {
LOG_WARN("fail to construct msg details", K(ret));
LOG_WARN("fail to construct msg details", K(ret), K(tenant_id));
} else if (OB_FAIL(array_ptr->push_back(msg_ptr))) {
LOG_WARN("fail to push back array ptr", K(ret));
}
@ -211,6 +211,7 @@ int ObJoinFilterOpInput::construct_msg_details(
ObPxSQCProxy::SQCP2PDhMap &dh_map = sqc_proxy->get_p2p_dh_map();
if (OB_FAIL(bf_msg.bloom_filter_.init(spec.filter_len_,
bf_msg.get_allocator(),
bf_msg.get_tenant_id(),
config.bloom_filter_ratio_))) {
LOG_WARN("failed to init bloom filter", K(ret));
} else if (!spec.is_shared_join_filter() || !spec.is_shuffle_) {

View File

@ -35,6 +35,21 @@ using namespace obrpc;
#define LOG_HASH_COUNT 2 // = log2(FIXED_HASH_COUNT)
#define WORD_SIZE 64 // WORD_SIZE * FIXED_HASH_COUNT = BF_BLOCK_SIZE
// before assign, please set allocator for channel_ids_ first
int BloomFilterIndex::assign(const BloomFilterIndex &other)
{
int ret = OB_SUCCESS;
if (this != &other) {
channel_id_ = other.channel_id_;
begin_idx_ = other.begin_idx_;
end_idx_ = other.end_idx_;
if (OB_FAIL(channel_ids_.assign(other.channel_ids_))) {
LOG_WARN("failed to assign channel_ids_");
}
}
return ret;
}
ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), bits_count_(0), fpp_(0.0),
hash_func_count_(0), is_inited_(false), bits_array_length_(0),
bits_array_(NULL), true_count_(0), begin_idx_(0), end_idx_(0), allocator_(),
@ -43,9 +58,10 @@ ObPxBloomFilter::ObPxBloomFilter() : data_length_(0), bits_count_(0), fpp_(0.0),
}
int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, double fpp /*= 0.01 */)
int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, int64_t tenant_id, double fpp /*= 0.01 */)
{
int ret = OB_SUCCESS;
set_allocator_attr(tenant_id);
data_length = max(data_length, 1);
if (fpp <= 0) {
ret = OB_ERR_UNEXPECTED;
@ -78,9 +94,10 @@ int ObPxBloomFilter::init(int64_t data_length, ObIAllocator &allocator, double f
return ret;
}
int ObPxBloomFilter::assign(const ObPxBloomFilter &filter)
int ObPxBloomFilter::assign(const ObPxBloomFilter &filter, int64_t tenant_id)
{
int ret = OB_SUCCESS;
set_allocator_attr(tenant_id);
data_length_ = filter.data_length_;
bits_count_ = filter.bits_count_;
fpp_ = filter.fpp_;
@ -104,6 +121,12 @@ int ObPxBloomFilter::assign(const ObPxBloomFilter &filter)
return ret;
}
void ObPxBloomFilter::set_allocator_attr(int64_t tenant_id)
{
ObMemAttr attr(tenant_id, "PxBfAlloc", ObCtxIds::DEFAULT_CTX_ID);
allocator_.set_attr(attr);
}
int ObPxBloomFilter::init(const ObPxBloomFilter *filter)
{
int ret = OB_SUCCESS;

View File

@ -41,6 +41,7 @@ struct BloomFilterReceiveCount
struct BloomFilterIndex
{
int assign(const BloomFilterIndex &other);
BloomFilterIndex() : channel_id_(0), begin_idx_(0), end_idx_(0) {}
BloomFilterIndex(int64_t channel_id, int64_t beigin_idx, int64_t end_idx) :
channel_id_(channel_id), begin_idx_(beigin_idx),
@ -48,7 +49,7 @@ struct BloomFilterIndex
int64_t channel_id_;// join filter send channel id
int64_t begin_idx_; // join filter begin position in full bloom filter
int64_t end_idx_; // join filter end position in full bloom filter
ObArray<int64_t> channel_ids_;
ObFixedArray<int64_t, common::ObIAllocator> channel_ids_;
TO_STRING_KV(K_(begin_idx), K_(end_idx), K_(channel_id), K_(channel_ids));
};
@ -58,7 +59,7 @@ OB_UNIS_VERSION_V(1);
public:
ObPxBloomFilter();
virtual ~ObPxBloomFilter() {};
int init(int64_t data_length, common::ObIAllocator &allocator, double fpp = 0.01);
int init(int64_t data_length, common::ObIAllocator &allocator, int64_t tenant_id, double fpp = 0.01);
int init(const ObPxBloomFilter *filter);
void reset_filter();
inline int might_contain(uint64_t hash, bool &is_match) {
@ -90,8 +91,9 @@ public:
typedef int (ObPxBloomFilter::*GetFunc)(uint64_t hash, bool &is_match);
int generate_receive_count_array(int64_t piece_size);
void reset();
int assign(const ObPxBloomFilter &filter);
int assign(const ObPxBloomFilter &filter, int64_t tenant_id);
int regenerate();
void set_allocator_attr(int64_t tenant_id);
TO_STRING_KV(K_(data_length), K_(bits_count), K_(fpp), K_(hash_func_count), K_(is_inited),
K_(bits_array_length), K_(true_count));
private:

View File

@ -254,6 +254,9 @@ int ObRFBloomFilterMsg::generate_receive_count_array(int64_t piece_size, int64_t
int64_t bits_array_length = ceil((double)bloom_filter_.get_bits_count() / 64);
int64_t count = ceil(bits_array_length / (double)piece_size);
int64_t begin_idx = 0;
if (OB_FAIL(receive_count_array_.init(count))) {
LOG_WARN("fail to init receive_count_array_", K(ret));
}
for (int i = 0; OB_SUCC(ret) && i < count; ++i) {
begin_idx = i * piece_size;
if (begin_idx >= bits_array_length) {
@ -392,9 +395,23 @@ int ObRFBloomFilterMsg::assign(const ObP2PDatahubMsgBase &msg)
LOG_WARN("failed to assign base data", K(ret));
} else if (OB_FAIL(next_peer_addrs_.assign(other_msg.next_peer_addrs_))) {
LOG_WARN("fail to assign bf msg", K(ret));
} else if (OB_FAIL(bloom_filter_.assign(other_msg.bloom_filter_))) {
} else if (OB_FAIL(bloom_filter_.assign(other_msg.bloom_filter_, msg.get_tenant_id()))) {
LOG_WARN("fail to assign bf msg", K(ret));
} else if (OB_FAIL(filter_indexes_.assign(other_msg.filter_indexes_))) {
} else if (OB_FAIL(filter_indexes_.prepare_allocate(other_msg.filter_indexes_.count()))) {
LOG_WARN("failed to prepare_allocate filter indexes", K(ret));
} else {
// The reason we don't use filter_indexes_.assign(other_msg.filter_indexes_) here is that:
// channel_ids_ is an ObFixedArray in BloomFilterIndex, we need to set allocator before assign channel_ids_
for (int64_t i = 0; i < other_msg.filter_indexes_.count() && OB_SUCC(ret); ++i) {
filter_indexes_.at(i).channel_ids_.set_allocator(&allocator_);
const BloomFilterIndex &other_filter_index = other_msg.filter_indexes_.at(i);
if (OB_FAIL(filter_indexes_.at(i).assign(other_filter_index))) {
LOG_WARN("fail to assign BloomFilterIndex", K(ret));
}
}
}
if (OB_FAIL(filter_indexes_.assign(other_msg.filter_indexes_))) {
LOG_WARN("failed to assign filter indexes", K(ret));
}
return ret;
@ -722,6 +739,9 @@ int ObRFBloomFilterMsg::broadcast(ObIArray<ObAddr> &target_addrs,
auto addr_filter_idx = filter_indexes_.at(cur_idx);
msg.bloom_filter_.set_begin_idx(addr_filter_idx.begin_idx_);
msg.bloom_filter_.set_end_idx(addr_filter_idx.end_idx_);
if (OB_FAIL(msg.next_peer_addrs_.init(addr_filter_idx.channel_ids_.count()))) {
LOG_WARN("fail to init next_peer_addrs_", K(ret));
}
for (int i = 0; OB_SUCC(ret) && i < addr_filter_idx.channel_ids_.count(); ++i) {
if (OB_FAIL(msg.next_peer_addrs_.push_back(
target_addrs.at(addr_filter_idx.channel_ids_.at(i))))) {
@ -757,6 +777,11 @@ int ObRFBloomFilterMsg::generate_filter_indexes(
int64_t group_addr_cnt = each_group_size > addr_cnt ?
addr_cnt : each_group_size;
BloomFilterIndex filter_index;
ObSEArray<BloomFilterIndex *, 64> tmp_filter_indexes;
lib::ObMemAttr attr(tenant_id_, "TmpBFIdxAlloc");
common::ObArenaAllocator tmp_allocator(attr);
filter_index.channel_ids_.set_allocator(&tmp_allocator);
BloomFilterIndex *filter_index_ptr = nullptr;
for (int i = 0; OB_SUCC(ret) && i < count; ++i) {
start_idx = i * piece_size;
end_idx = (i + 1) * piece_size;
@ -783,10 +808,33 @@ int ObRFBloomFilterMsg::generate_filter_indexes(
} else {
filter_index.channel_id_ = (i % group_addr_cnt) + pos;
}
if (OB_FAIL(filter_index.channel_ids_.init(min(addr_cnt, pos + group_addr_cnt) - pos + 1))) {
LOG_WARN("failed to init channel_ids_");
}
for (int k = pos; OB_SUCC(ret) && k < addr_cnt && k < pos + group_addr_cnt; ++k) {
OZ(filter_index.channel_ids_.push_back(k));
}
OZ(filter_indexes_.push_back(filter_index));
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(filter_index_ptr = OB_NEWx(BloomFilterIndex, &tmp_allocator))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc BloomFilterIndex");
} else if (FALSE_IT(filter_index_ptr->channel_ids_.set_allocator(&tmp_allocator))) {
} else if (OB_FAIL(filter_index_ptr->assign(filter_index))) {
LOG_WARN("failed to assign");
} else if (OB_FAIL(tmp_filter_indexes.push_back(filter_index_ptr))) {
LOG_WARN("failed to push_back");
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(filter_indexes_.prepare_allocate(tmp_filter_indexes.count()))) {
LOG_WARN("failed to prepare_allocate filter_indexes_");
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_filter_indexes.count(); ++i) {
filter_indexes_.at(i).channel_ids_.set_allocator(&allocator_);
if (OB_FAIL(filter_indexes_.at(i).assign(*tmp_filter_indexes.at(i)))) {
LOG_WARN("failed to assign filter_indexes", K(i));
}
}
}
return ret;

View File

@ -37,8 +37,8 @@ public:
SECOND_LEVEL
};
ObRFBloomFilterMsg() : phase_(), bloom_filter_(),
next_peer_addrs_(), expect_first_phase_count_(0),
piece_size_(0), filter_indexes_(), receive_count_array_(),
next_peer_addrs_(allocator_), expect_first_phase_count_(0),
piece_size_(0), filter_indexes_(allocator_), receive_count_array_(allocator_),
filter_idx_(nullptr), create_finish_(nullptr), need_send_msg_(true), is_finish_regen_(false) {}
~ObRFBloomFilterMsg() { destroy(); }
virtual int assign(const ObP2PDatahubMsgBase &) final;
@ -91,11 +91,11 @@ int generate_receive_count_array(int64_t piece_size, int64_t cur_begin_idx);
public:
ObSendBFPhase phase_;
ObPxBloomFilter bloom_filter_;
common::ObSArray<common::ObAddr> next_peer_addrs_;
common::ObFixedArray<common::ObAddr, common::ObIAllocator> next_peer_addrs_;
int64_t expect_first_phase_count_;
int64_t piece_size_;
common::ObArray<BloomFilterIndex> filter_indexes_;
common::ObArray<BloomFilterReceiveCount> receive_count_array_;
common::ObFixedArray<BloomFilterIndex, common::ObIAllocator> filter_indexes_;
common::ObFixedArray<BloomFilterReceiveCount, common::ObIAllocator> receive_count_array_;
int64_t *filter_idx_; //for shared msg
bool *create_finish_; //for shared msg
bool need_send_msg_; //for shared msg, when drain_exch, msg is not need to be sent