Fix rf msg crash problem

This commit is contained in:
qianchanger
2023-05-08 05:08:42 +00:00
committed by ob-robot
parent a9230e1550
commit c1a19e5d4f
8 changed files with 181 additions and 36 deletions

View File

@ -90,6 +90,8 @@ int ObPxBloomFilter::assign(const ObPxBloomFilter &filter)
true_count_ = filter.true_count_;
might_contain_ = filter.might_contain_;
void *bits_array_buf = NULL;
begin_idx_ = filter.get_begin_idx();
end_idx_ = filter.get_end_idx();
if (OB_ISNULL(bits_array_buf = allocator_.alloc((bits_array_length_ + CACHE_LINE_SIZE)* sizeof(int64_t)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc filter", K(bits_array_length_), K(begin_idx_), K(end_idx_), K(ret));
@ -313,6 +315,34 @@ int ObPxBloomFilter::generate_receive_count_array(int64_t piece_size)
return ret;
}
int ObPxBloomFilter::regenerate()
{
int ret = OB_SUCCESS;
int64_t bits_array_length = ceil((double)bits_count_ / 64);
void *bits_array_buf = NULL;
if (bits_array_length <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected bits array length", K(ret));
} else if (OB_ISNULL(bits_array_buf = allocator_.alloc((bits_array_length + CACHE_LINE_SIZE)* sizeof(int64_t)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc filter", K(bits_array_length), K(ret));
} else {
// cache line aligned address.
int64_t align_addr = ((reinterpret_cast<int64_t>(bits_array_buf)
+ CACHE_LINE_SIZE - 1) >> LOG_CACHE_LINE_SIZE) << LOG_CACHE_LINE_SIZE;
int64_t *bits_array = reinterpret_cast<int64_t *>(align_addr);
MEMSET(bits_array, 0, bits_array_length * sizeof(int64_t));
for (int i = 0; i < bits_array_length_; ++i) {
bits_array[i + begin_idx_] |= bits_array_[i];
}
bits_array_length_ = bits_array_length;
bits_array_ = bits_array;
begin_idx_ = 0;
end_idx_ = bits_array_length - 1;
}
return ret;
}
void ObPxBloomFilter::reset()
{
// need reset memory

View File

@ -49,7 +49,7 @@ struct BloomFilterIndex
int64_t begin_idx_; // join filter begin position in full bloom filter
int64_t end_idx_; // join filter end position in full bloom filter
ObArray<int64_t> channel_ids_;
TO_STRING_KV(K_(begin_idx), K_(end_idx));
TO_STRING_KV(K_(begin_idx), K_(end_idx), K_(channel_id), K_(channel_ids));
};
class ObPxBloomFilter
@ -81,15 +81,17 @@ public:
void dec_merge_filter_count() { ATOMIC_DEC(&px_bf_merge_filter_count_); }
bool is_merge_filter_finish() const { return 0 == px_bf_merge_filter_count_; }
int64_t get_bits_array_length() const { return bits_array_length_; }
int64_t get_bits_count() const { return bits_count_; }
void set_begin_idx(int64_t idx) { begin_idx_ = idx; }
void set_end_idx(int64_t idx) { end_idx_ = idx; }
int64_t get_begin_idx() { return begin_idx_; }
int64_t get_end_idx() { return end_idx_; }
int64_t get_begin_idx() const { return begin_idx_; }
int64_t get_end_idx() const { return end_idx_; }
void prefetch_bits_block(uint64_t hash);
typedef int (ObPxBloomFilter::*GetFunc)(uint64_t hash, bool &is_match);
int generate_receive_count_array(int64_t piece_size);
void reset();
int assign(const ObPxBloomFilter &filter);
int regenerate();
TO_STRING_KV(K_(data_length), K_(bits_count), K_(fpp), K_(hash_func_count), K_(is_inited),
K_(bits_array_length), K_(true_count));
private:
@ -112,7 +114,7 @@ private:
int64_t begin_idx_; // join filter begin position
int64_t end_idx_; // join filter end position
GetFunc might_contain_; // function pointer for might contain
private:
public:
common::ObArenaAllocator allocator_;
public:
//无需序列化

View File

@ -71,7 +71,6 @@ int ObP2PDatahubManager::process_msg(ObP2PDatahubMsgBase &msg)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected new msg", K(ret));
} else if (OB_FAIL(new_msg->process_msg_internal(need_free))) {
need_free = true;
LOG_WARN("fail to process msg", K(ret));
}
if (need_free && OB_NOT_NULL(new_msg)) {
@ -189,6 +188,18 @@ int ObP2PDatahubManager::P2PMsgMergeCall::operator() (common::hash::HashMapPair<
return ret;
}
int ObP2PDatahubManager::P2PRegenerateCall::operator() (common::hash::HashMapPair<ObP2PDhKey,
ObP2PDatahubMsgBase *> &entry)
{
int ret = OB_SUCCESS;
if (OB_FAIL(entry.second->regenerate())) {
LOG_WARN("fail to do regenerate", K(ret));
}
ret_ = ret;
return ret;
}
int ObP2PDatahubManager::send_local_msg(ObP2PDatahubMsgBase *msg)
{
int ret = OB_SUCCESS;

View File

@ -36,6 +36,14 @@ public:
ObP2PDatahubMsgBase &dh_msg_;
bool need_free_;
};
struct P2PRegenerateCall
{
P2PRegenerateCall(ObP2PDatahubMsgBase &db_msg) : ret_(OB_SUCCESS), dh_msg_(db_msg) {};
~P2PRegenerateCall() = default;
int operator() (common::hash::HashMapPair<ObP2PDhKey, ObP2PDatahubMsgBase *> &entry);
int ret_;
ObP2PDatahubMsgBase &dh_msg_;
};
struct P2PMsgGetCall
{
P2PMsgGetCall(ObP2PDatahubMsgBase *&db_msg) : dh_msg_(db_msg), ret_(OB_SUCCESS) {};

View File

@ -128,6 +128,7 @@ int ObP2PDatahubMsgBase::process_msg_internal(bool &need_free)
}
}
} else if (OB_SUCCESS == ret) {
(void)check_finish_receive();
// set_refactored success, means this msg is in map, so register check item into dm
int reg_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(register_dm_info_,
dh_key, dm_cb_node_seq_id_);

View File

@ -85,6 +85,7 @@ public:
virtual int process_receive_count(ObP2PDatahubMsgBase &);
virtual int process_msg_internal(bool &need_free);
virtual int reuse() { return OB_SUCCESS; }
virtual int regenerate() { return OB_SUCCESS; }
void check_finish_receive();
bool check_ready() const { return is_ready_; }
ObP2PDatahubMsgType get_msg_type() const { return msg_type_; }

View File

@ -26,11 +26,50 @@ using namespace oceanbase::common;
using namespace oceanbase::sql;
using namespace oceanbase::share;
OB_SERIALIZE_MEMBER((ObRFBloomFilterMsg, ObP2PDatahubMsgBase),
phase_, bloom_filter_, next_peer_addrs_,
expect_first_phase_count_, piece_size_);
OB_SERIALIZE_MEMBER(ObRFRangeFilterMsg::MinMaxCellSize, min_datum_buf_size_, max_datum_buf_size_);
OB_DEF_SERIALIZE(ObRFBloomFilterMsg)
{
int ret = OB_SUCCESS;
BASE_SER((ObRFBloomFilterMsg, ObP2PDatahubMsgBase));
LST_DO_CODE(OB_UNIS_ENCODE,
phase_,
bloom_filter_,
next_peer_addrs_,
expect_first_phase_count_,
piece_size_);
return ret;
}
OB_DEF_DESERIALIZE(ObRFBloomFilterMsg)
{
int ret = OB_SUCCESS;
BASE_DESER((ObRFBloomFilterMsg, ObP2PDatahubMsgBase));
bloom_filter_.allocator_.set_tenant_id(tenant_id_);
bloom_filter_.allocator_.set_label("ObPxBFDESER");
LST_DO_CODE(OB_UNIS_DECODE,
phase_,
bloom_filter_,
next_peer_addrs_,
expect_first_phase_count_,
piece_size_);
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObRFBloomFilterMsg)
{
int64_t len = 0;
BASE_ADD_LEN((ObRFBloomFilterMsg, ObP2PDatahubMsgBase));
LST_DO_CODE(OB_UNIS_ADD_LEN,
phase_,
bloom_filter_,
next_peer_addrs_,
expect_first_phase_count_,
piece_size_);
return len;
}
OB_DEF_SERIALIZE(ObRFRangeFilterMsg)
{
int ret = OB_SUCCESS;
@ -171,42 +210,63 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free)
int ret = OB_SUCCESS;
ObP2PDhKey dh_key(p2p_datahub_id_, px_sequence_id_, task_id_);
ObP2PDatahubManager::P2PMsgMergeCall call(*this);
ObP2PDatahubManager::P2PRegenerateCall regen_call(*this);
ObP2PDatahubManager::MsgMap &map = PX_P2P_DH.get_map();
start_time_ = ObTimeUtility::current_time();
if (OB_FAIL(generate_receive_count_array(piece_size_))) {
bool need_merge = true;
if (OB_FAIL(generate_receive_count_array(piece_size_, bloom_filter_.get_begin_idx()))) {
LOG_WARN("fail to generate receive count array", K(ret));
}
ObP2PDatahubMsgGuard guard(this);
do {
if (OB_HASH_EXIST == (ret = map.set_refactored(dh_key, this))) {
if (OB_FAIL(map.read_atomic(dh_key, call))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to merge p2p dh msg", K(ret));
}
} else {
//set msg
ObP2PDatahubMsgGuard guard(this);
if (OB_FAIL(map.set_refactored(dh_key, this))) {
if (OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to set refactored", K(ret));
}
} else if (OB_SUCCESS == ret) {
need_free = true;
} else {
need_merge = false;
// set_refactored success, means this msg is in map, so register check item into dm
int reg_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(register_dm_info_,
dh_key, dm_cb_node_seq_id_);
dh_key, dm_cb_node_seq_id_);
if (OB_SUCCESS != reg_ret) {
LOG_WARN("[DM] failed to register check item to dm", K(reg_ret));
}
LOG_TRACE("[DM] rf register check item to dm", K(reg_ret), K(register_dm_info_),
K(dh_key), K(dm_cb_node_seq_id_), K(this));
}
} while (ret == OB_HASH_NOT_EXIST);
if (call.need_free_) {
need_free = true;
// msg not in map, dec ref count
guard.dec_msg_ref_count();
// create whole bloom filter, no need wait
if (OB_SUCC(ret)) {
if (map.atomic_refactored(dh_key, regen_call)) {
LOG_WARN("fail to update bloom filter msg", K(ret));
}
}
// merge piece bloom filter
if (OB_SUCC(ret) && need_merge) {
if (OB_FAIL(map.read_atomic(dh_key, call))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to merge p2p dh msg", K(ret));
}
}
}
if (OB_SUCC(ret) && !need_merge) {
(void)check_finish_receive();
}
if (need_free) {
// msg not in map, dec ref count
guard.dec_msg_ref_count();
}
}
return ret;
}
int ObRFBloomFilterMsg::generate_receive_count_array(int64_t piece_size)
int ObRFBloomFilterMsg::generate_receive_count_array(int64_t piece_size, int64_t cur_begin_idx)
{
int ret = OB_SUCCESS;
int64_t bits_array_length = bloom_filter_.get_bits_array_length();
int64_t bits_array_length = ceil((double)bloom_filter_.get_bits_count() / 64);
int64_t count = ceil(bits_array_length / (double)piece_size);
int64_t begin_idx = 0;
for (int i = 0; OB_SUCC(ret) && i < count; ++i) {
@ -214,7 +274,12 @@ int ObRFBloomFilterMsg::generate_receive_count_array(int64_t piece_size)
if (begin_idx >= bits_array_length) {
begin_idx = bits_array_length - 1;
}
OZ(receive_count_array_.push_back(BloomFilterReceiveCount(begin_idx, 0)));
if (cur_begin_idx != begin_idx) {
OZ(receive_count_array_.push_back(BloomFilterReceiveCount(begin_idx, 0)));
} else {
OZ(receive_count_array_.push_back(BloomFilterReceiveCount(begin_idx, 1)));
}
}
return ret;
}
@ -265,6 +330,7 @@ int ObRFBloomFilterMsg::process_receive_count(ObP2PDatahubMsgBase &rf_msg)
bool first_phase_end = false;
ObRFBloomFilterMsg &bf_msg = static_cast<ObRFBloomFilterMsg &>(rf_msg);
auto process_second_phase = [&](ObRFBloomFilterMsg &bf_msg) {
LOG_WARN("process second phase", K(ret));
if (OB_FAIL(ObP2PDatahubMsgBase::process_receive_count(bf_msg))) {
LOG_WARN("fail to process receive count", K(ret));
}
@ -278,7 +344,7 @@ int ObRFBloomFilterMsg::process_receive_count(ObP2PDatahubMsgBase &rf_msg)
}
return ret;
};
if (is_first_phase()) {
if (bf_msg.is_first_phase()) {
if (OB_FAIL(process_first_phase(bf_msg))) {
LOG_WARN("fail to process first phase", K(ret));
} else if (first_phase_end && !bf_msg.get_next_phase_addrs().empty()) {
@ -289,7 +355,10 @@ int ObRFBloomFilterMsg::process_receive_count(ObP2PDatahubMsgBase &rf_msg)
if (OB_FAIL(second_phase_msg.shadow_copy(*this))) {
LOG_WARN("fail to shadow copy second phase msg", K(ret));
} else {
second_phase_msg.phase_ = SECOND_LEVEL;
second_phase_msg.set_msg_cur_cnt(expect_first_phase_count_);
second_phase_msg.bloom_filter_.set_begin_idx(bf_msg.bloom_filter_.get_begin_idx());
second_phase_msg.bloom_filter_.set_end_idx(bf_msg.bloom_filter_.get_end_idx());
}
for (int i = 0; OB_SUCC(ret) && i < bf_msg.get_next_phase_addrs().count(); ++i) {
if (bf_msg.get_next_phase_addrs().at(i) != GCTX.self_addr()) {
@ -302,6 +371,7 @@ int ObRFBloomFilterMsg::process_receive_count(ObP2PDatahubMsgBase &rf_msg)
}
}
}
(void)check_finish_receive();
} else if (bf_msg.get_next_phase_addrs().empty()) {
(void)check_finish_receive();
}
@ -358,6 +428,24 @@ int ObRFBloomFilterMsg::shadow_copy(const ObRFBloomFilterMsg &other_msg)
return ret;
}
int ObRFBloomFilterMsg::regenerate()
{
int ret = OB_SUCCESS;
if (!is_finish_regen_) {
if (receive_count_array_.empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to reset receive count array", K(ret));
} else if (1 == receive_count_array_.count()) {
is_finish_regen_ = true;
} else if (OB_FAIL(bloom_filter_.regenerate())) {
LOG_WARN("fail to to regnerate bloom filter", K(ret));
} else {
is_finish_regen_ = true;
}
}
return ret;
}
int ObRFBloomFilterMsg::merge(ObP2PDatahubMsgBase &msg)
{
int ret = OB_SUCCESS;
@ -636,12 +724,7 @@ int ObRFBloomFilterMsg::broadcast(ObIArray<ObAddr> &target_addrs,
timeout_ts_,
p2p_datahub_id_);
ObPxP2PDatahubArg arg;
if (OB_FAIL(msg.shadow_copy(*this))) {
LOG_WARN("fail to shadow copy second phase msg", K(ret));
} else if (OB_ISNULL(create_finish_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected create finish ptr", K(ret));
}
arg.msg_ = &msg;
while (!*create_finish_ && OB_SUCC(ret)) {
if (OB_FAIL(THIS_WORKER.check_status())) {
@ -649,6 +732,13 @@ int ObRFBloomFilterMsg::broadcast(ObIArray<ObAddr> &target_addrs,
}
ob_usleep(10);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(msg.shadow_copy(*this))) {
LOG_WARN("fail to shadow copy second phase msg", K(ret));
} else if (OB_ISNULL(create_finish_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected create finish ptr", K(ret));
}
while (*filter_idx_ < filter_indexes_.count() && OB_SUCC(ret)) {
cur_idx = ATOMIC_FAA(filter_idx_, 1);
if (cur_idx < filter_indexes_.count()) {

View File

@ -39,7 +39,7 @@ public:
ObRFBloomFilterMsg() : phase_(), bloom_filter_(),
next_peer_addrs_(), expect_first_phase_count_(0),
piece_size_(0), filter_indexes_(), receive_count_array_(),
filter_idx_(nullptr), create_finish_(nullptr) {}
filter_idx_(nullptr), create_finish_(nullptr), is_finish_regen_(false) {}
~ObRFBloomFilterMsg() { destroy(); }
virtual int assign(const ObP2PDatahubMsgBase &) final;
virtual int merge(ObP2PDatahubMsgBase &) final;
@ -78,6 +78,7 @@ public:
int process_first_phase_recieve_count(
ObRFBloomFilterMsg &msg, bool &first_phase_end);
virtual int process_msg_internal(bool &need_free);
virtual int regenerate() override;
private:
int calc_hash_value(
const common::ObIArray<ObExpr *> &expr_array,
@ -86,7 +87,7 @@ int calc_hash_value(
ObEvalCtx &eval_ctx,
uint64_t &hash_value, bool &ignore);
int shadow_copy(const ObRFBloomFilterMsg &msg);
int generate_receive_count_array(int64_t piece_size);
int generate_receive_count_array(int64_t piece_size, int64_t cur_begin_idx);
public:
ObSendBFPhase phase_;
ObPxBloomFilter bloom_filter_;
@ -97,6 +98,7 @@ public:
common::ObArray<BloomFilterReceiveCount> receive_count_array_;
int64_t *filter_idx_; //for shared msg
bool *create_finish_; //for shared msg
bool is_finish_regen_;
};
class ObRFRangeFilterMsg : public ObP2PDatahubMsgBase