
Co-authored-by: Charles0429 <xiezhenjiang@gmail.com> Co-authored-by: tino247 <tino247@126.com> Co-authored-by: chaser-ch <chaser.ch@antgroup.com>
250 lines
8.1 KiB
C++
250 lines
8.1 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include <gtest/gtest.h>
|
|
#define private public
|
|
#define protected public
|
|
#include "logservice/palf/election/interface/election.h"
|
|
#include "logservice/palf/election/message/election_message.h"
|
|
#include "share/ob_occam_timer.h"
|
|
#include "lib/utility/serialization.h"
|
|
#include <thread>
|
|
#include <mutex>
|
|
#include <assert.h>
|
|
#include <unordered_map>
|
|
|
|
|
|
namespace oceanbase
|
|
{
|
|
namespace unittest
|
|
{
|
|
|
|
extern int64_t MSG_DELAY;
|
|
|
|
// 定义用于hash map的判等方法
|
|
struct LogStreamKey
|
|
{
|
|
LogStreamKey(const common::ObAddr &addr, const int64_t logstream_id) : addr_(addr), logstream_id_(logstream_id) {}
|
|
bool operator==(const LogStreamKey &rhs) const { return addr_ == rhs.addr_ && logstream_id_ == rhs.logstream_id_; }
|
|
const common::ObAddr addr_;
|
|
const int64_t logstream_id_;
|
|
TO_STRING_KV(KP(this), K_(addr), K_(logstream_id))
|
|
};
|
|
|
|
}
|
|
}
|
|
|
|
// 定义用于hash map的hash方法
|
|
namespace std
|
|
{
|
|
|
|
template <>
|
|
struct hash<std::pair<oceanbase::common::ObAddr, oceanbase::common::ObAddr>>
|
|
{
|
|
std::size_t operator()(const std::pair<oceanbase::common::ObAddr, oceanbase::common::ObAddr>& addr) const
|
|
{
|
|
return addr.first.hash() + addr.second.hash();
|
|
}
|
|
};
|
|
|
|
}
|
|
|
|
namespace oceanbase {
|
|
namespace unittest {
|
|
|
|
using namespace common;
|
|
using namespace palf::election;
|
|
using namespace std;
|
|
|
|
// 线程局部的buffer,用于序列化消息
|
|
constexpr int BUFFER_SIZE = 1024;
|
|
struct MsgBuffer {
|
|
char buffer_[BUFFER_SIZE];
|
|
TO_STRING_KV(K_(buffer));
|
|
};
|
|
thread_local MsgBuffer TH_BUFFER;
|
|
|
|
// 全局的timer 和 thread pool,用于模拟网络延迟和接收端的工作线程
|
|
ObOccamTimer TIMER;
|
|
ObOccamThreadPool THREAD_POOL;
|
|
|
|
// 这是收发消息的方法
|
|
class MockNetService : public ElectionMsgSender
|
|
{
|
|
public:
|
|
virtual int broadcast(const ElectionPrepareRequestMsg &msg, const common::ObIArray<common::ObAddr> &list) const override
|
|
{
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
broadcast_(msg, list);
|
|
// ELECT_LOG(INFO, "send message", K(msg));
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int broadcast(const ElectionAcceptRequestMsg &msg, const common::ObIArray<common::ObAddr> &list) const override
|
|
{
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
broadcast_(msg, list);
|
|
// ELECT_LOG(INFO, "send message", K(msg));
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int send(const ElectionPrepareResponseMsg &msg) const override
|
|
{
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
send_(msg);
|
|
// ELECT_LOG(INFO, "send message", K(msg));
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int send(const ElectionAcceptResponseMsg &msg) const override
|
|
{
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
send_(msg);
|
|
// ELECT_LOG(INFO, "send message", K(msg));
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int send(const ElectionChangeLeaderMsg &msg) const override
|
|
{
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
send_(msg);
|
|
// ELECT_LOG(INFO, "send message", K(msg));
|
|
return OB_SUCCESS;
|
|
}
|
|
void decode_and_process_buffer(const MsgBuffer &buffer, Election *election_) const
|
|
{
|
|
int64_t msg_type;
|
|
int64_t pos = 0;
|
|
serialization::decode(buffer.buffer_, BUFFER_SIZE, pos, msg_type);
|
|
ElectionMsgType type = static_cast<ElectionMsgType>(msg_type);
|
|
switch (type) {
|
|
case ElectionMsgType::PREPARE_REQUEST:
|
|
{
|
|
ElectionPrepareRequestMsg msg;
|
|
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
|
|
// ELECT_LOG(INFO, "receive preapre request", K(msg));
|
|
election_->handle_message(msg);
|
|
break;
|
|
}
|
|
case ElectionMsgType::PREPARE_RESPONSE:
|
|
{
|
|
ElectionPrepareResponseMsg msg;
|
|
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
|
|
// ELECT_LOG(INFO, "receive preapre response", K(msg));
|
|
election_->handle_message(msg);
|
|
break;
|
|
}
|
|
case ElectionMsgType::ACCEPT_REQUEST:
|
|
{
|
|
ElectionAcceptRequestMsg msg;
|
|
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
|
|
election_->handle_message(msg);
|
|
break;
|
|
}
|
|
case ElectionMsgType::ACCEPT_RESPONSE:
|
|
{
|
|
ElectionAcceptResponseMsg msg;
|
|
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
|
|
election_->handle_message(msg);
|
|
break;
|
|
}
|
|
case ElectionMsgType::CHANGE_LEADER:
|
|
{
|
|
ElectionChangeLeaderMsg msg;
|
|
msg.deserialize(buffer.buffer_, BUFFER_SIZE, pos);
|
|
election_->handle_message(msg);
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
static void init() {
|
|
share::ObTenantSwitchGuard guard;
|
|
guard.switch_to(OB_SYS_TENANT_ID);
|
|
THREAD_POOL.init_and_start(3);
|
|
TIMER.init_and_start(THREAD_POOL, 1_ms, "election timer");
|
|
}
|
|
void connect(Election *left, Election *right) {
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
map_.insert({{left->get_self_addr(), right->get_self_addr()}, right});
|
|
ELECT_LOG(INFO, "connect", K(left->get_self_addr()), K(right->get_self_addr()));
|
|
}
|
|
void connect_two_side(Election *left, Election *right) {
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
map_.insert({{left->get_self_addr(), right->get_self_addr()}, right});
|
|
map_.insert({{right->get_self_addr(), left->get_self_addr()}, left});
|
|
ELECT_LOG(INFO, "disconnect two side", K(left->get_self_addr()), K(right->get_self_addr()));
|
|
}
|
|
void disconnect(const Election *left, const Election *right) {
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
map_.erase({left->get_self_addr(), right->get_self_addr()});
|
|
ELECT_LOG(INFO, "disconnect", K(left->get_self_addr()), K(right->get_self_addr()));
|
|
}
|
|
void disconnect_two_side(const Election *left, const Election *right) {
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
map_.erase({left->get_self_addr(), right->get_self_addr()});
|
|
map_.erase({right->get_self_addr(), left->get_self_addr()});
|
|
ELECT_LOG(INFO, "disconnect two side", K(left->get_self_addr()), K(right->get_self_addr()));
|
|
}
|
|
void clear() {
|
|
std::lock_guard<std::mutex> lg(mutex_);
|
|
map_.clear();
|
|
}
|
|
private:
|
|
template <typename MSG>
|
|
void broadcast_(const MSG &msg, const common::ObIArray<common::ObAddr> &list) const {
|
|
for (int64_t idx = 0; idx < list.count(); ++idx) {
|
|
const_cast<MSG &>(msg).set_receiver(list.at(idx));
|
|
send_(msg);
|
|
}
|
|
}
|
|
template <typename MSG>
|
|
void send_(const MSG &msg) const {
|
|
// ELECT_LOG(INFO, "call send message", K(msg));
|
|
int64_t pos = 0;
|
|
serialization::encode(TH_BUFFER.buffer_, BUFFER_SIZE, pos, int64_t(msg.get_msg_type()));
|
|
msg.serialize(TH_BUFFER.buffer_, BUFFER_SIZE, pos);
|
|
auto iter = map_.find({msg.get_sender(), msg.get_receiver()});
|
|
if (iter != map_.end()) {
|
|
auto election = iter->second;
|
|
auto buffer = TH_BUFFER;
|
|
int ret = common::OB_SUCCESS;
|
|
if (MSG_DELAY == 0) {
|
|
ret = THREAD_POOL.commit_task_ignore_ret([buffer, election, ret, this]() mutable {
|
|
int64_t begin = ObClockGenerator::getRealClock();
|
|
this->decode_and_process_buffer(buffer, election);
|
|
int64_t end = ObClockGenerator::getRealClock();
|
|
if (end - begin > 1_ms) {
|
|
ELECT_LOG(WARN, "execute task cost too much time", K(end - begin));
|
|
}
|
|
});
|
|
} else {
|
|
ret = TIMER.schedule_task_ignore_handle_after(MSG_DELAY, [buffer, election, ret, this]() mutable {
|
|
int64_t begin = ObClockGenerator::getRealClock();
|
|
this->decode_and_process_buffer(buffer, election);
|
|
int64_t end = ObClockGenerator::getRealClock();
|
|
if (end - begin > 1_ms) {
|
|
ELECT_LOG(WARN, "execute task cost too much time", K(end - begin));
|
|
}
|
|
return false;
|
|
});
|
|
}
|
|
assert(ret == OB_SUCCESS);
|
|
// ELECT_LOG(INFO, "send message success", K(msg));
|
|
} else {
|
|
ELECT_LOG_RET(WARN, OB_ERR_UNEXPECTED, "send message failed", K(msg));
|
|
}
|
|
}
|
|
unordered_map<std::pair<ObAddr,ObAddr>, Election*> map_;
|
|
mutable std::mutex mutex_;
|
|
};
|
|
|
|
}
|
|
}
|