Files
oceanbase/unittest/storage/tx/ob_mailbox.h
wangzelin.wzl 93a1074b0c patch 4.0
2022-10-24 17:57:12 +08:00

309 lines
7.2 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_UNITTEST_STORAGE_TX_OB_MAILBOX
#define OCEANBASE_UNITTEST_STORAGE_TX_OB_MAILBOX
#include <deque>
#include <map>
#include "lib/ob_errno.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/utility/ob_print_utils.h"
#include "storage/tx/ob_committer_define.h"
namespace oceanbase
{
namespace transaction
{
template <typename MailType>
class ObMailBoxMgr;
template <typename MailType>
class ObMail
{
public:
int init(int64_t from,
int64_t to,
uint64_t size,
const MailType &mail)
{
if (NULL != mail_) {
std::free(mail_);
}
from_ = from;
to_ = to;
size_ = size;
mail_ = (MailType*)std::malloc(size);
std::memcpy((void*)mail_, (void*)(&mail), size);
return OB_SUCCESS;
}
ObMail()
{
mail_ = NULL;
}
ObMail(const ObMail &other)
{
from_ = other.from_;
to_ = other.to_;
size_ = other.size_;
mail_ = (MailType*)std::malloc(size_);
std::memcpy((void*)mail_, (void*)(other.mail_), size_);
}
~ObMail()
{
if (NULL != mail_) {
std::free(mail_);
}
}
ObMail& operator=(const ObMail& other)
{
if (NULL != mail_) {
std::free(mail_);
}
from_ = other.from_;
to_ = other.to_;
size_ = other.size_;
mail_ = (MailType*)std::malloc(size_);
std::memcpy((void*)mail_, (void*)(other.mail_), size_);
return *this;
}
/* ObMail& operator=(const ObMail &other) */
/* { */
/* from_ = other.from_; */
/* to_ = other.to_; */
/* size_ = other.size_; */
/* mail_ = (MailType*)std::malloc(size_); */
/* std::memcpy((void*)mail_, (void*)(other.mail_), size_); */
/* } */
int64_t from_;
int64_t to_;
uint64_t size_;
MailType* mail_;
TO_STRING_KV(K_(from), K_(to), K_(*mail));
};
template <typename MailType>
class ObMailHandler
{
public:
virtual int handle(const ObMail<MailType>& mail) = 0;
int64_t sign_ = 39;
virtual TO_STRING_KV(K_(sign));
};
template <typename MailType>
class ObMailBox
{
public:
int64_t addr_;
std::deque<ObMail<MailType>> mailbox_;
ObMailHandler<MailType> *ctx_;
ObMailBoxMgr<MailType> *mailbox_mgr_;
~ObMailBox()
{
mailbox_.clear();
}
int init(int64_t addr,
ObMailBoxMgr<MailType> *mailbox_mgr,
ObMailHandler<MailType> *ctx);
int handle(const bool must_have = true);
int handle_all();
int send(const ObMail<MailType>& mail,
const int64_t receiver);
int send_to_head(const ObMail<MailType>& mail,
const int64_t receiver);
int fetch_mail(ObMail<MailType>& mail);
int64_t to_string(char *buffer, const int64_t size) const;
};
template <typename MailType>
class ObMailBoxMgr
{
public:
int64_t counter_ = 0;
std::map<int64_t, ObMailBox<MailType>*> mgr_;
int register_mailbox(int64_t &addr,
ObMailBox<MailType> &mailbox,
ObMailHandler<MailType> *ctx);
int send(const ObMail<MailType>& mail,
const int64_t receive);
int send_to_head(const ObMail<MailType>& mail,
const int64_t receive);
void reset();
};
template <typename MailType>
int ObMailBox<MailType>::init(int64_t addr,
ObMailBoxMgr<MailType> *mailbox_mgr,
ObMailHandler<MailType> *ctx)
{
int ret = OB_SUCCESS;
mailbox_.clear();
addr_ = addr;
ctx_ = ctx;
mailbox_mgr_ = mailbox_mgr;
return ret;
}
template <typename MailType>
int ObMailBox<MailType>::fetch_mail(ObMail<MailType> &mail)
{
int ret = OB_SUCCESS;
if (mailbox_.empty()) {
TRANS_LOG(ERROR, "mailbox is empty, but must handle", K(*this));
ob_abort();
} else {
mail = mailbox_.front();
mailbox_.pop_front();
}
return ret;
}
template <typename MailType>
int ObMailBox<MailType>::handle(const bool must_have)
{
int ret = OB_SUCCESS;
if (must_have && mailbox_.empty()) {
TRANS_LOG(ERROR, "mailbox is empty, but must handle", K(*this));
ob_abort();
} else if (mailbox_.empty()) {
ret = OB_SUCCESS;
} else {
ObMail<MailType> mail = mailbox_.front();
mailbox_.pop_front();
ret = ctx_->handle(mail);
}
return ret;
}
template <typename MailType>
int ObMailBox<MailType>::handle_all()
{
int ret = OB_SUCCESS;
while (OB_SUCC(ret) && !mailbox_.empty()) {
ObMail<MailType> mail = mailbox_.front();
mailbox_.pop_front();
ret = ctx_->handle(mail);
}
return ret;
}
template <typename MailType>
int ObMailBox<MailType>::send(const ObMail<MailType>& mail,
const int64_t receiver)
{
int ret = OB_SUCCESS;
ret = mailbox_mgr_->send(mail, receiver);
return ret;
}
template <typename MailType>
int ObMailBox<MailType>::send_to_head(const ObMail<MailType>& mail,
const int64_t receiver)
{
int ret = OB_SUCCESS;
ret = mailbox_mgr_->send_to_head(mail, receiver);
return ret;
}
template <typename MailType>
int64_t ObMailBox<MailType>::to_string(char *buffer, const int64_t size) const
{
int64_t pos = 0;
if (nullptr != buffer && size > 0) {
databuff_printf(buffer, size, pos, "{addr: %ld, DEQUE: [", addr_);
for (auto it = mailbox_.begin(); it != mailbox_.end(); ++it) {
databuff_printf(buffer, size, pos, "(%s), ", to_cstring(*it));
}
databuff_printf(buffer, size, pos, "]}");
}
return pos;
}
template <typename MailType>
int ObMailBoxMgr<MailType>::register_mailbox(int64_t &addr,
ObMailBox<MailType> &mailbox,
ObMailHandler<MailType> *ctx)
{
int ret = OB_SUCCESS;
addr = ++counter_;
ret = mailbox.init(addr, this, ctx);
mgr_[addr] = &mailbox;
TRANS_LOG(INFO, "register mailbox", K(mailbox), KP(ctx));
return ret;
}
template <typename MailType>
void ObMailBoxMgr<MailType>::reset()
{
counter_ = 0;
mgr_.clear();
TRANS_LOG(INFO, "reset mailbox",K(this));
}
template <typename MailType>
int ObMailBoxMgr<MailType>::send(const ObMail<MailType>& mail,
const int64_t receiver)
{
int ret = OB_SUCCESS;
if (mgr_.count(mail.to_) != 0) {
mgr_[receiver]->mailbox_.push_back(mail);
TRANS_LOG(INFO, "send mailbox success", K(ret), K(mail),
K(*mgr_[receiver]));
}
return ret;
}
template <typename MailType>
int ObMailBoxMgr<MailType>::send_to_head(const ObMail<MailType>& mail,
const int64_t receiver)
{
int ret = OB_SUCCESS;
if (mgr_.count(mail.to_) != 0) {
mgr_[receiver]->mailbox_.push_front(mail);
TRANS_LOG(INFO, "send to mailbox front success", K(ret), K(mail),
K(*mgr_[receiver]));
}
return ret;
}
} // namespace transaction
} // namespace oceanbase
#endif // OCEANBASE_UNITTEST_STORAGE_TX_OB_MAILBOX