Files
oceanbase/unittest/logservice/mock_logservice_container/mock_election_user.h
stdliu f8c5c2647f [FEAT MERGE] Merge syslog user experience improvement to master
Co-authored-by: Charles0429 <xiezhenjiang@gmail.com>
Co-authored-by: tino247 <tino247@126.com>
Co-authored-by: chaser-ch <chaser.ch@antgroup.com>
2023-02-06 15:52:24 +08:00

250 lines
8.1 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define private public
#define protected public
#include "logservice/palf/election/interface/election.h"
#include "logservice/palf/election/message/election_message.h"
#include "share/ob_occam_timer.h"
#include "lib/utility/serialization.h"
#include <thread>
#include <mutex>
#include <assert.h>
#include <unordered_map>
namespace oceanbase
{
namespace unittest
{
extern int64_t MSG_DELAY;
// 定义用于hash map的判等方法
struct LogStreamKey
{
LogStreamKey(const common::ObAddr &addr, const int64_t logstream_id) : addr_(addr), logstream_id_(logstream_id) {}
bool operator==(const LogStreamKey &rhs) const { return addr_ == rhs.addr_ && logstream_id_ == rhs.logstream_id_; }
const common::ObAddr addr_;
const int64_t logstream_id_;
TO_STRING_KV(KP(this), K_(addr), K_(logstream_id))
};
}
}
// 定义用于hash map的hash方法
namespace std
{
template <>
struct hash<std::pair<oceanbase::common::ObAddr, oceanbase::common::ObAddr>>
{
std::size_t operator()(const std::pair<oceanbase::common::ObAddr, oceanbase::common::ObAddr>& addr) const
{
return addr.first.hash() + addr.second.hash();
}
};
}
namespace oceanbase {
namespace unittest {
using namespace common;
using namespace palf::election;
using namespace std;
// 线程局部的buffer,用于序列化消息
constexpr int BUFFER_SIZE = 1024;
struct MsgBuffer {
char buffer_[BUFFER_SIZE];
TO_STRING_KV(K_(buffer));
};
thread_local MsgBuffer TH_BUFFER;
// 全局的timer 和 thread pool,用于模拟网络延迟和接收端的工作线程
ObOccamTimer TIMER;
ObOccamThreadPool THREAD_POOL;
// 这是收发消息的方法
class MockNetService : public ElectionMsgSender
{
public:
virtual int broadcast(const ElectionPrepareRequestMsg &msg, const common::ObIArray<common::ObAddr> &list) const override
{
std::lock_guard<std::mutex> lg(mutex_);
broadcast_(msg, list);
// ELECT_LOG(INFO, "send message", K(msg));
return OB_SUCCESS;
}
virtual int broadcast(const ElectionAcceptRequestMsg &msg, const common::ObIArray<common::ObAddr> &list) const override
{
std::lock_guard<std::mutex> lg(mutex_);
broadcast_(msg, list);
// ELECT_LOG(INFO, "send message", K(msg));
return OB_SUCCESS;
}
virtual int send(const ElectionPrepareResponseMsg &msg) const override
{
std::lock_guard<std::mutex> lg(mutex_);
send_(msg);
// ELECT_LOG(INFO, "send message", K(msg));
return OB_SUCCESS;
}
virtual int send(const ElectionAcceptResponseMsg &msg) const override
{
std::lock_guard<std::mutex> lg(mutex_);
send_(msg);
// ELECT_LOG(INFO, "send message", K(msg));
return OB_SUCCESS;
}
virtual int send(const ElectionChangeLeaderMsg &msg) const override
{
std::lock_guard<std::mutex> lg(mutex_);
send_(msg);
// ELECT_LOG(INFO, "send message", K(msg));
return OB_SUCCESS;
}
void decode_and_process_buffer(const MsgBuffer &buffer, Election *election_) const
{
int64_t msg_type;
int64_t pos = 0;
serialization::decode(buffer.buffer_, BUFFER_SIZE, pos, msg_type);
ElectionMsgType type = static_cast<ElectionMsgType>(msg_type);
switch (type) {
case ElectionMsgType::PREPARE_REQUEST:
{
ElectionPrepareRequestMsg msg;
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
// ELECT_LOG(INFO, "receive preapre request", K(msg));
election_->handle_message(msg);
break;
}
case ElectionMsgType::PREPARE_RESPONSE:
{
ElectionPrepareResponseMsg msg;
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
// ELECT_LOG(INFO, "receive preapre response", K(msg));
election_->handle_message(msg);
break;
}
case ElectionMsgType::ACCEPT_REQUEST:
{
ElectionAcceptRequestMsg msg;
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
election_->handle_message(msg);
break;
}
case ElectionMsgType::ACCEPT_RESPONSE:
{
ElectionAcceptResponseMsg msg;
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
election_->handle_message(msg);
break;
}
case ElectionMsgType::CHANGE_LEADER:
{
ElectionChangeLeaderMsg msg;
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
election_->handle_message(msg);
break;
}
default:
break;
}
}
static void init() {
share::ObTenantSwitchGuard guard;
guard.switch_to(OB_SYS_TENANT_ID);
THREAD_POOL.init_and_start(3);
TIMER.init_and_start(THREAD_POOL, 1_ms, "election timer");
}
void connect(Election *left, Election *right) {
std::lock_guard<std::mutex> lg(mutex_);
map_.insert({{left->get_self_addr(), right->get_self_addr()}, right});
ELECT_LOG(INFO, "connect", K(left->get_self_addr()), K(right->get_self_addr()));
}
void connect_two_side(Election *left, Election *right) {
std::lock_guard<std::mutex> lg(mutex_);
map_.insert({{left->get_self_addr(), right->get_self_addr()}, right});
map_.insert({{right->get_self_addr(), left->get_self_addr()}, left});
ELECT_LOG(INFO, "disconnect two side", K(left->get_self_addr()), K(right->get_self_addr()));
}
void disconnect(const Election *left, const Election *right) {
std::lock_guard<std::mutex> lg(mutex_);
map_.erase({left->get_self_addr(), right->get_self_addr()});
ELECT_LOG(INFO, "disconnect", K(left->get_self_addr()), K(right->get_self_addr()));
}
void disconnect_two_side(const Election *left, const Election *right) {
std::lock_guard<std::mutex> lg(mutex_);
map_.erase({left->get_self_addr(), right->get_self_addr()});
map_.erase({right->get_self_addr(), left->get_self_addr()});
ELECT_LOG(INFO, "disconnect two side", K(left->get_self_addr()), K(right->get_self_addr()));
}
void clear() {
std::lock_guard<std::mutex> lg(mutex_);
map_.clear();
}
private:
template <typename MSG>
void broadcast_(const MSG &msg, const common::ObIArray<common::ObAddr> &list) const {
for (int64_t idx = 0; idx < list.count(); ++idx) {
const_cast<MSG &>(msg).set_receiver(list.at(idx));
send_(msg);
}
}
template <typename MSG>
void send_(const MSG &msg) const {
// ELECT_LOG(INFO, "call send message", K(msg));
int64_t pos = 0;
serialization::encode(TH_BUFFER.buffer_, BUFFER_SIZE, pos, int64_t(msg.get_msg_type()));
msg.serialize(TH_BUFFER.buffer_, BUFFER_SIZE, pos);
auto iter = map_.find({msg.get_sender(), msg.get_receiver()});
if (iter != map_.end()) {
auto election = iter->second;
auto buffer = TH_BUFFER;
int ret = common::OB_SUCCESS;
if (MSG_DELAY == 0) {
ret = THREAD_POOL.commit_task_ignore_ret([buffer, election, ret, this]() mutable {
int64_t begin = ObClockGenerator::getRealClock();
this->decode_and_process_buffer(buffer, election);
int64_t end = ObClockGenerator::getRealClock();
if (end - begin > 1_ms) {
ELECT_LOG(WARN, "execute task cost too much time", K(end - begin));
}
});
} else {
ret = TIMER.schedule_task_ignore_handle_after(MSG_DELAY, [buffer, election, ret, this]() mutable {
int64_t begin = ObClockGenerator::getRealClock();
this->decode_and_process_buffer(buffer, election);
int64_t end = ObClockGenerator::getRealClock();
if (end - begin > 1_ms) {
ELECT_LOG(WARN, "execute task cost too much time", K(end - begin));
}
return false;
});
}
assert(ret == OB_SUCCESS);
// ELECT_LOG(INFO, "send message success", K(msg));
} else {
ELECT_LOG_RET(WARN, OB_ERR_UNEXPECTED, "send message failed", K(msg));
}
}
unordered_map<std::pair<ObAddr,ObAddr>, Election*> map_;
mutable std::mutex mutex_;
};
}
}